Skip to content

WebhookServer

webhook_server

Webhook server for receiving real-time ClickUp events.

This module provides a lightweight HTTP server that receives webhook notifications from ClickUp when tasks are created, updated, or deleted.

Architecture: - Cloud Run receives ClickUp webhooks and stores events - Local clients poll /events endpoint to fetch pending events - Events are stored in memory with optional persistence to file

ClickUpWebhookHandler

Bases: BaseHTTPRequestHandler

HTTP request handler for ClickUp webhooks.

Source code in beads_clickup/webhook_server.py
class ClickUpWebhookHandler(BaseHTTPRequestHandler):
    """HTTP request handler for ClickUp webhooks."""

    # Class-level attributes set by WebhookServer
    event_queue: WebhookEventQueue | None = None
    webhook_secret: str | None = None
    allowed_events: set[str] | None = None

    def log_message(self, format: str, *args):
        """Override to use our logger."""
        logger.debug("Webhook HTTP: %s", format % args)

    def _send_response(self, status_code: int, message: str = ""):
        """Send an HTTP response."""
        try:
            self.send_response(status_code)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            response = {"status": "ok" if status_code == 200 else "error", "message": message}
            self.wfile.write(json.dumps(response).encode())
        except BrokenPipeError:
            # Client closed connection before we could respond - this is fine
            logger.debug("Client closed connection before response could be sent")
        except Exception as e:
            logger.debug("Failed to send response: %s", e)

    def _verify_signature(self, payload: bytes) -> bool:
        """Verify the webhook signature if a secret is configured."""
        if not self.webhook_secret:
            return True  # No secret configured, skip verification

        signature = self.headers.get("X-Signature")
        if not signature:
            logger.warning("Webhook request missing X-Signature header")
            return False

        # ClickUp uses HMAC-SHA256
        expected = hmac.new(
            self.webhook_secret.encode(),
            payload,
            hashlib.sha256,
        ).hexdigest()

        if not hmac.compare_digest(signature, expected):
            logger.warning("Webhook signature verification failed")
            return False

        return True

    def do_GET(self):
        """Handle GET requests (health check, events API)."""
        if self.path == "/health":
            self._send_response(200, "Webhook server is running")
        elif self.path == "/events/stats":
            # Return event store statistics
            stats = _get_event_stats()
            self._send_json_response(200, stats)
        elif self.path.startswith("/events"):
            # Get pending events
            # Parse query params: ?since=timestamp&limit=100
            from urllib.parse import urlparse, parse_qs
            parsed = urlparse(self.path)
            params = parse_qs(parsed.query)

            since = params.get("since", [None])[0]
            limit = int(params.get("limit", [100])[0])

            events = _get_pending_events(since=since, limit=limit)
            self._send_json_response(200, {"events": events, "count": len(events)})
        else:
            self._send_response(404, "Not found")

    def _send_json_response(self, status_code: int, data: dict):
        """Send a JSON response."""
        try:
            self.send_response(status_code)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(json.dumps(data).encode())
        except Exception as e:
            logger.debug("Failed to send JSON response: %s", e)

    def _handle_event_ack(self):
        """Handle event acknowledgment request."""
        content_length = int(self.headers.get("Content-Length", 0))
        if content_length == 0:
            self._send_response(400, "Empty request body")
            return

        try:
            payload = self.rfile.read(content_length)
            data = json.loads(payload)
            event_ids = data.get("event_ids", [])

            if not event_ids:
                self._send_response(400, "No event_ids provided")
                return

            count = _mark_events_processed(event_ids)
            self._send_json_response(200, {"acknowledged": count})

        except json.JSONDecodeError:
            self._send_response(400, "Invalid JSON")
        except Exception as e:
            logger.error("Failed to acknowledge events: %s", e)
            self._send_response(500, str(e))

    def do_POST(self):
        """Handle POST requests (webhook events, event acknowledgment)."""
        # Event acknowledgment endpoint
        if self.path == "/events/ack":
            self._handle_event_ack()
            return

        if self.path != "/webhook" and self.path != "/webhook/clickup":
            self._send_response(404, "Not found")
            return

        # Read request body
        content_length = int(self.headers.get("Content-Length", 0))
        if content_length == 0:
            self._send_response(400, "Empty request body")
            return

        try:
            payload = self.rfile.read(content_length)
        except Exception as e:
            logger.error("Failed to read webhook payload: %s", e)
            self._send_response(400, "Failed to read request body")
            return

        # Verify signature
        if not self._verify_signature(payload):
            self._send_response(401, "Invalid signature")
            return

        # Parse JSON payload
        try:
            data = json.loads(payload)
        except json.JSONDecodeError as e:
            logger.error("Invalid JSON in webhook payload: %s", e)
            self._send_response(400, "Invalid JSON")
            return

        # Respond immediately to avoid timeout, then process
        # ClickUp expects a quick 200 response
        self._send_response(200, "Event received")

        # Now process the event (after responding)
        try:
            event = self._parse_event(data)
            if event:
                # Store event for remote clients to fetch
                event_id = _store_event({
                    "event_type": event.event_type,
                    "task_id": event.task_id,
                    "workspace_id": event.workspace_id,
                    "task_data": event.task_data,
                    "raw_payload": event.raw_payload,
                })
                logger.info(
                    "Stored webhook event %s: %s for task %s",
                    event_id,
                    event.event_type,
                    event.task_id,
                )

                # Also add to local queue if available (for local processing)
                if self.event_queue:
                    self.event_queue.push(event)
        except Exception as e:
            logger.error("Failed to process webhook event: %s", e, exc_info=True)

    def _parse_event(self, data: dict[str, Any]) -> WebhookEvent | None:
        """Parse a webhook payload into a WebhookEvent."""
        # ClickUp webhook format
        event_type = data.get("event")

        # Filter by allowed events if configured
        if self.allowed_events and event_type not in self.allowed_events:
            logger.debug("Ignoring event type: %s", event_type)
            return None

        # Extract task data
        task_data = data.get("task_data") or data.get("task") or {}
        task_id = data.get("task_id") or task_data.get("id") or ""

        if not task_id:
            logger.warning("Webhook event missing task_id: %s", data)
            return None

        workspace_id = data.get("team_id") or data.get("workspace_id") or ""

        return WebhookEvent(
            event_type=event_type or "unknown",
            task_id=task_id,
            task_data=task_data,
            workspace_id=workspace_id,
            timestamp=datetime.utcnow(),
            raw_payload=data,
        )

do_GET()

Handle GET requests (health check, events API).

Source code in beads_clickup/webhook_server.py
def do_GET(self):
    """Handle GET requests (health check, events API)."""
    if self.path == "/health":
        self._send_response(200, "Webhook server is running")
    elif self.path == "/events/stats":
        # Return event store statistics
        stats = _get_event_stats()
        self._send_json_response(200, stats)
    elif self.path.startswith("/events"):
        # Get pending events
        # Parse query params: ?since=timestamp&limit=100
        from urllib.parse import urlparse, parse_qs
        parsed = urlparse(self.path)
        params = parse_qs(parsed.query)

        since = params.get("since", [None])[0]
        limit = int(params.get("limit", [100])[0])

        events = _get_pending_events(since=since, limit=limit)
        self._send_json_response(200, {"events": events, "count": len(events)})
    else:
        self._send_response(404, "Not found")

do_POST()

Handle POST requests (webhook events, event acknowledgment).

Source code in beads_clickup/webhook_server.py
def do_POST(self):
    """Handle POST requests (webhook events, event acknowledgment)."""
    # Event acknowledgment endpoint
    if self.path == "/events/ack":
        self._handle_event_ack()
        return

    if self.path != "/webhook" and self.path != "/webhook/clickup":
        self._send_response(404, "Not found")
        return

    # Read request body
    content_length = int(self.headers.get("Content-Length", 0))
    if content_length == 0:
        self._send_response(400, "Empty request body")
        return

    try:
        payload = self.rfile.read(content_length)
    except Exception as e:
        logger.error("Failed to read webhook payload: %s", e)
        self._send_response(400, "Failed to read request body")
        return

    # Verify signature
    if not self._verify_signature(payload):
        self._send_response(401, "Invalid signature")
        return

    # Parse JSON payload
    try:
        data = json.loads(payload)
    except json.JSONDecodeError as e:
        logger.error("Invalid JSON in webhook payload: %s", e)
        self._send_response(400, "Invalid JSON")
        return

    # Respond immediately to avoid timeout, then process
    # ClickUp expects a quick 200 response
    self._send_response(200, "Event received")

    # Now process the event (after responding)
    try:
        event = self._parse_event(data)
        if event:
            # Store event for remote clients to fetch
            event_id = _store_event({
                "event_type": event.event_type,
                "task_id": event.task_id,
                "workspace_id": event.workspace_id,
                "task_data": event.task_data,
                "raw_payload": event.raw_payload,
            })
            logger.info(
                "Stored webhook event %s: %s for task %s",
                event_id,
                event.event_type,
                event.task_id,
            )

            # Also add to local queue if available (for local processing)
            if self.event_queue:
                self.event_queue.push(event)
    except Exception as e:
        logger.error("Failed to process webhook event: %s", e, exc_info=True)

log_message(format, *args)

Override to use our logger.

Source code in beads_clickup/webhook_server.py
def log_message(self, format: str, *args):
    """Override to use our logger."""
    logger.debug("Webhook HTTP: %s", format % args)

WebhookEvent dataclass

Represents a ClickUp webhook event.

Source code in beads_clickup/webhook_server.py
@dataclass
class WebhookEvent:
    """Represents a ClickUp webhook event."""

    event_type: str
    task_id: str
    task_data: dict[str, Any]
    workspace_id: str
    timestamp: datetime
    raw_payload: dict[str, Any]

WebhookEventProcessor

Processes webhook events and triggers sync operations.

Source code in beads_clickup/webhook_server.py
class WebhookEventProcessor:
    """Processes webhook events and triggers sync operations."""

    def __init__(self, sync_engine: Any, config: dict[str, Any]):
        """Initialize the event processor.

        Args:
            sync_engine: SyncEngine instance for performing sync operations
            config: Configuration dictionary
        """
        self.sync_engine = sync_engine
        self.config = config
        self._processing = False
        self._processed_events: set[str] = set()  # Track processed event IDs
        self._max_processed_cache = 10000

    def process_event(self, event: WebhookEvent) -> bool:
        """Process a single webhook event.

        Args:
            event: WebhookEvent to process

        Returns:
            True if processing succeeded, False otherwise
        """
        # Generate event ID for deduplication
        event_id = f"{event.event_type}:{event.task_id}:{event.timestamp.isoformat()}"

        # Check if already processed (deduplication)
        if event_id in self._processed_events:
            logger.debug("Skipping duplicate event: %s", event_id)
            return True

        logger.info("Processing webhook event: %s for task %s", event.event_type, event.task_id)

        try:
            if event.event_type in ("taskCreated", "taskUpdated", "taskStatusUpdated"):
                # Sync task from ClickUp to beads
                self.sync_engine.sync_task_to_beads(event.task_id)

            elif event.event_type == "taskDeleted":
                # Handle task deletion (mark as closed in beads)
                self._handle_task_deleted(event)

            elif event.event_type == "taskCommentPosted":
                # Sync comment from ClickUp to beads
                self._handle_comment_posted(event)

            else:
                # For other events, do a general sync
                self.sync_engine.sync_task_to_beads(event.task_id)

            # Mark as processed
            self._processed_events.add(event_id)
            if len(self._processed_events) > self._max_processed_cache:
                # Clear oldest entries (simple approach - just clear half)
                to_remove = list(self._processed_events)[: self._max_processed_cache // 2]
                for item in to_remove:
                    self._processed_events.discard(item)

            return True

        except Exception as e:
            logger.error(
                "Failed to process webhook event %s: %s",
                event_id,
                e,
                exc_info=True,
            )
            return False

    def _handle_task_deleted(self, event: WebhookEvent):
        """Handle task deletion event."""
        task_id = event.task_id

        # Find corresponding beads issue
        issue_id = None
        for bid, state in self.sync_engine.sync_state.items():
            if isinstance(state, dict) and state.get("clickup_task_id") == task_id:
                issue_id = bid
                break

        if issue_id:
            logger.info("ClickUp task %s deleted, marking beads issue %s as closed", task_id, issue_id)
            # Update beads issue status to closed
            self.sync_engine._update_beads_issue(issue_id, {"status": "closed"})
        else:
            logger.debug("No beads issue found for deleted task %s", task_id)

    def _handle_comment_posted(self, event: WebhookEvent):
        """Handle comment posted event."""
        task_id = event.task_id

        # Find corresponding beads issue
        issue_id = None
        for bid, state in self.sync_engine.sync_state.items():
            if isinstance(state, dict) and state.get("clickup_task_id") == task_id:
                issue_id = bid
                break

        if issue_id and self.config.get("advanced", {}).get("sync_comments", False):
            # Sync comments from ClickUp
            self.sync_engine.comment_sync.sync_comments_from_clickup(
                issue_id, task_id, self.sync_engine.mcp_client
            )

__init__(sync_engine, config)

Initialize the event processor.

Parameters:

Name Type Description Default
sync_engine Any

SyncEngine instance for performing sync operations

required
config dict[str, Any]

Configuration dictionary

required
Source code in beads_clickup/webhook_server.py
def __init__(self, sync_engine: Any, config: dict[str, Any]):
    """Initialize the event processor.

    Args:
        sync_engine: SyncEngine instance for performing sync operations
        config: Configuration dictionary
    """
    self.sync_engine = sync_engine
    self.config = config
    self._processing = False
    self._processed_events: set[str] = set()  # Track processed event IDs
    self._max_processed_cache = 10000

process_event(event)

Process a single webhook event.

Parameters:

Name Type Description Default
event WebhookEvent

WebhookEvent to process

required

Returns:

Type Description
bool

True if processing succeeded, False otherwise

Source code in beads_clickup/webhook_server.py
def process_event(self, event: WebhookEvent) -> bool:
    """Process a single webhook event.

    Args:
        event: WebhookEvent to process

    Returns:
        True if processing succeeded, False otherwise
    """
    # Generate event ID for deduplication
    event_id = f"{event.event_type}:{event.task_id}:{event.timestamp.isoformat()}"

    # Check if already processed (deduplication)
    if event_id in self._processed_events:
        logger.debug("Skipping duplicate event: %s", event_id)
        return True

    logger.info("Processing webhook event: %s for task %s", event.event_type, event.task_id)

    try:
        if event.event_type in ("taskCreated", "taskUpdated", "taskStatusUpdated"):
            # Sync task from ClickUp to beads
            self.sync_engine.sync_task_to_beads(event.task_id)

        elif event.event_type == "taskDeleted":
            # Handle task deletion (mark as closed in beads)
            self._handle_task_deleted(event)

        elif event.event_type == "taskCommentPosted":
            # Sync comment from ClickUp to beads
            self._handle_comment_posted(event)

        else:
            # For other events, do a general sync
            self.sync_engine.sync_task_to_beads(event.task_id)

        # Mark as processed
        self._processed_events.add(event_id)
        if len(self._processed_events) > self._max_processed_cache:
            # Clear oldest entries (simple approach - just clear half)
            to_remove = list(self._processed_events)[: self._max_processed_cache // 2]
            for item in to_remove:
                self._processed_events.discard(item)

        return True

    except Exception as e:
        logger.error(
            "Failed to process webhook event %s: %s",
            event_id,
            e,
            exc_info=True,
        )
        return False

WebhookEventQueue

Thread-safe queue for webhook events.

Source code in beads_clickup/webhook_server.py
class WebhookEventQueue:
    """Thread-safe queue for webhook events."""

    def __init__(self, max_size: int = 1000):
        self._queue: list[WebhookEvent] = []
        self._lock = threading.Lock()
        self._max_size = max_size
        self._callbacks: list[Callable[[WebhookEvent], None]] = []

    def add_callback(self, callback: Callable[[WebhookEvent], None]):
        """Register a callback for when events are received."""
        self._callbacks.append(callback)

    def push(self, event: WebhookEvent):
        """Add an event to the queue."""
        with self._lock:
            if len(self._queue) >= self._max_size:
                # Remove oldest event if queue is full
                self._queue.pop(0)
                logger.warning("Webhook event queue full, dropped oldest event")
            self._queue.append(event)

        # Notify callbacks
        for callback in self._callbacks:
            try:
                callback(event)
            except Exception as e:
                logger.error("Webhook callback failed: %s", e)

    def pop(self) -> WebhookEvent | None:
        """Remove and return the oldest event."""
        with self._lock:
            if self._queue:
                return self._queue.pop(0)
            return None

    def peek(self) -> WebhookEvent | None:
        """Return the oldest event without removing it."""
        with self._lock:
            if self._queue:
                return self._queue[0]
            return None

    def size(self) -> int:
        """Return the number of events in the queue."""
        with self._lock:
            return len(self._queue)

    def clear(self):
        """Clear all events from the queue."""
        with self._lock:
            self._queue.clear()

add_callback(callback)

Register a callback for when events are received.

Source code in beads_clickup/webhook_server.py
def add_callback(self, callback: Callable[[WebhookEvent], None]):
    """Register a callback for when events are received."""
    self._callbacks.append(callback)

clear()

Clear all events from the queue.

Source code in beads_clickup/webhook_server.py
def clear(self):
    """Clear all events from the queue."""
    with self._lock:
        self._queue.clear()

peek()

Return the oldest event without removing it.

Source code in beads_clickup/webhook_server.py
def peek(self) -> WebhookEvent | None:
    """Return the oldest event without removing it."""
    with self._lock:
        if self._queue:
            return self._queue[0]
        return None

pop()

Remove and return the oldest event.

Source code in beads_clickup/webhook_server.py
def pop(self) -> WebhookEvent | None:
    """Remove and return the oldest event."""
    with self._lock:
        if self._queue:
            return self._queue.pop(0)
        return None

push(event)

Add an event to the queue.

Source code in beads_clickup/webhook_server.py
def push(self, event: WebhookEvent):
    """Add an event to the queue."""
    with self._lock:
        if len(self._queue) >= self._max_size:
            # Remove oldest event if queue is full
            self._queue.pop(0)
            logger.warning("Webhook event queue full, dropped oldest event")
        self._queue.append(event)

    # Notify callbacks
    for callback in self._callbacks:
        try:
            callback(event)
        except Exception as e:
            logger.error("Webhook callback failed: %s", e)

size()

Return the number of events in the queue.

Source code in beads_clickup/webhook_server.py
def size(self) -> int:
    """Return the number of events in the queue."""
    with self._lock:
        return len(self._queue)

WebhookServer

ClickUp webhook server for real-time sync.

Source code in beads_clickup/webhook_server.py
class WebhookServer:
    """ClickUp webhook server for real-time sync."""

    # ClickUp event types we care about
    TASK_EVENTS = {
        "taskCreated",
        "taskUpdated",
        "taskDeleted",
        "taskStatusUpdated",
        "taskAssigneeUpdated",
        "taskPriorityUpdated",
        "taskDueDateUpdated",
        "taskTimeEstimateUpdated",
        "taskMoved",
        "taskCommentPosted",
    }

    def __init__(
        self,
        host: str = "0.0.0.0",
        port: int = 8765,
        webhook_secret: str | None = None,
        event_filter: set[str] | None = None,
    ):
        """Initialize the webhook server.

        Args:
            host: Host to bind to (default: 0.0.0.0 for all interfaces)
            port: Port to listen on (default: 8765)
            webhook_secret: Secret for signature verification (optional)
            event_filter: Set of event types to accept (default: all task events)
        """
        self.host = host
        self.port = port
        self.webhook_secret = webhook_secret
        self.event_filter = event_filter or self.TASK_EVENTS

        self.event_queue = WebhookEventQueue()
        self._server: HTTPServer | None = None
        self._thread: threading.Thread | None = None
        self._running = False

    def start(self, background: bool = True):
        """Start the webhook server.

        Args:
            background: If True, run in a background thread
        """
        if self._running:
            logger.warning("Webhook server already running")
            return

        # Configure handler class
        ClickUpWebhookHandler.event_queue = self.event_queue
        ClickUpWebhookHandler.webhook_secret = self.webhook_secret
        ClickUpWebhookHandler.allowed_events = self.event_filter

        self._server = HTTPServer((self.host, self.port), ClickUpWebhookHandler)
        self._running = True

        logger.info("Starting webhook server on %s:%d", self.host, self.port)

        if background:
            self._thread = threading.Thread(target=self._serve, daemon=True)
            self._thread.start()
        else:
            self._serve()

    def _serve(self):
        """Serve webhook requests."""
        try:
            while self._running:
                self._server.handle_request()
        except Exception as e:
            logger.error("Webhook server error: %s", e)
        finally:
            self._running = False

    def stop(self):
        """Stop the webhook server."""
        self._running = False
        if self._server:
            self._server.shutdown()
            self._server = None
        if self._thread:
            self._thread.join(timeout=5)
            self._thread = None
        logger.info("Webhook server stopped")

    def is_running(self) -> bool:
        """Check if the server is running."""
        return self._running

    def get_webhook_url(self, external_host: str | None = None) -> str:
        """Get the webhook URL to register with ClickUp.

        Args:
            external_host: External hostname/IP if behind NAT/proxy

        Returns:
            Webhook URL string
        """
        host = external_host or self.host
        if host == "0.0.0.0":
            host = "localhost"
        return f"http://{host}:{self.port}/webhook/clickup"

__init__(host='0.0.0.0', port=8765, webhook_secret=None, event_filter=None)

Initialize the webhook server.

Parameters:

Name Type Description Default
host str

Host to bind to (default: 0.0.0.0 for all interfaces)

'0.0.0.0'
port int

Port to listen on (default: 8765)

8765
webhook_secret str | None

Secret for signature verification (optional)

None
event_filter set[str] | None

Set of event types to accept (default: all task events)

None
Source code in beads_clickup/webhook_server.py
def __init__(
    self,
    host: str = "0.0.0.0",
    port: int = 8765,
    webhook_secret: str | None = None,
    event_filter: set[str] | None = None,
):
    """Initialize the webhook server.

    Args:
        host: Host to bind to (default: 0.0.0.0 for all interfaces)
        port: Port to listen on (default: 8765)
        webhook_secret: Secret for signature verification (optional)
        event_filter: Set of event types to accept (default: all task events)
    """
    self.host = host
    self.port = port
    self.webhook_secret = webhook_secret
    self.event_filter = event_filter or self.TASK_EVENTS

    self.event_queue = WebhookEventQueue()
    self._server: HTTPServer | None = None
    self._thread: threading.Thread | None = None
    self._running = False

get_webhook_url(external_host=None)

Get the webhook URL to register with ClickUp.

Parameters:

Name Type Description Default
external_host str | None

External hostname/IP if behind NAT/proxy

None

Returns:

Type Description
str

Webhook URL string

Source code in beads_clickup/webhook_server.py
def get_webhook_url(self, external_host: str | None = None) -> str:
    """Get the webhook URL to register with ClickUp.

    Args:
        external_host: External hostname/IP if behind NAT/proxy

    Returns:
        Webhook URL string
    """
    host = external_host or self.host
    if host == "0.0.0.0":
        host = "localhost"
    return f"http://{host}:{self.port}/webhook/clickup"

is_running()

Check if the server is running.

Source code in beads_clickup/webhook_server.py
def is_running(self) -> bool:
    """Check if the server is running."""
    return self._running

start(background=True)

Start the webhook server.

Parameters:

Name Type Description Default
background bool

If True, run in a background thread

True
Source code in beads_clickup/webhook_server.py
def start(self, background: bool = True):
    """Start the webhook server.

    Args:
        background: If True, run in a background thread
    """
    if self._running:
        logger.warning("Webhook server already running")
        return

    # Configure handler class
    ClickUpWebhookHandler.event_queue = self.event_queue
    ClickUpWebhookHandler.webhook_secret = self.webhook_secret
    ClickUpWebhookHandler.allowed_events = self.event_filter

    self._server = HTTPServer((self.host, self.port), ClickUpWebhookHandler)
    self._running = True

    logger.info("Starting webhook server on %s:%d", self.host, self.port)

    if background:
        self._thread = threading.Thread(target=self._serve, daemon=True)
        self._thread.start()
    else:
        self._serve()

stop()

Stop the webhook server.

Source code in beads_clickup/webhook_server.py
def stop(self):
    """Stop the webhook server."""
    self._running = False
    if self._server:
        self._server.shutdown()
        self._server = None
    if self._thread:
        self._thread.join(timeout=5)
        self._thread = None
    logger.info("Webhook server stopped")

create_webhook_server_from_config(config_path=None)

Create a webhook server from configuration.

Parameters:

Name Type Description Default
config_path str | Path | None

Path to config.yaml (optional, uses default if not provided)

None

Returns:

Type Description
WebhookServer

Configured WebhookServer instance

Source code in beads_clickup/webhook_server.py
def create_webhook_server_from_config(config_path: str | Path | None = None) -> WebhookServer:
    """Create a webhook server from configuration.

    Args:
        config_path: Path to config.yaml (optional, uses default if not provided)

    Returns:
        Configured WebhookServer instance
    """
    import yaml

    # Load config
    if config_path:
        config_path = Path(config_path)
    else:
        config_path = Path.cwd() / ".beads" / "integrations" / "clickup" / "config.yaml"

    config = {}
    if config_path.exists():
        with open(config_path, encoding="utf-8") as f:
            config = yaml.safe_load(f) or {}

    # Get webhook settings
    webhook_config = config.get("webhook", {})

    return WebhookServer(
        host=webhook_config.get("host", "0.0.0.0"),
        port=webhook_config.get("port", 8765),
        webhook_secret=webhook_config.get("secret"),
        event_filter=set(webhook_config.get("events", WebhookServer.TASK_EVENTS)),
    )