class BeadsMCPServer:
"""MCP server that exposes bd (beads) commands as tools."""
def __init__(self):
"""Initialize the MCP server."""
# Find the bd wrapper script dynamically (relative to this module)
# This module is in beads_clickup/, so go up one level to find bd
module_dir = Path(__file__).parent.parent
self.bd_wrapper_path = str(module_dir / "bd")
# Fallback to environment variable if wrapper not found
if not Path(self.bd_wrapper_path).exists():
env_wrapper = os.environ.get("BD_WRAPPER_PATH")
if env_wrapper and Path(env_wrapper).exists():
self.bd_wrapper_path = env_wrapper
else:
# Last resort: try to find bd in PATH
import shutil
bd_in_path = shutil.which("bd")
if bd_in_path:
self.bd_wrapper_path = bd_in_path
# Workspace override - set via bd_switch_workspace tool
self._workspace_override: str | None = None
# Workspace detected from MCP protocol roots/list response
self._roots_workspace: str | None = None
# Whether the client declared roots capability during initialize
self._client_supports_roots: bool = False
# Find and store project directory
try:
self.project_dir = Path(self._find_beads_project_dir())
except ValueError:
# If no beads project found, set to None (will fail gracefully later)
self.project_dir = None
# Load template sections for dynamic schema generation
self._template_sections = get_template_sections(self.project_dir)
self._section_keys = list(self._template_sections.keys())
# Load ClickUp config for dynamic custom field schema
self._clickup_config = self._load_clickup_config()
# Initialize CustomFieldStore
if self.project_dir:
self.custom_field_store = CustomFieldStore(self.project_dir)
else:
self.custom_field_store = None
self.tools = [
self._build_bd_create_tool(),
{
"name": "bd_list",
"description": "List all beads issues",
"inputSchema": {
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "Filter by status: open, in_progress, completed, closed",
},
"priority": {
"type": "string",
"description": "Filter by priority: p0, p1, p2, p3, p4",
},
},
},
},
{
"name": "bd_show",
"description": "Show details of a specific beads issue",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID (e.g., project-abc)",
},
},
"required": ["issue_id"],
},
},
self._build_bd_update_tool(),
{
"name": "bd_close",
"description": "Close a beads issue with automatic ClickUp sync (sets end date, adds commit links)",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID (e.g., project-abc)",
},
"reason": {
"type": "string",
"description": "Close reason (e.g., 'Fixed in PR #123', 'Completed and deployed')",
},
},
"required": ["issue_id"],
},
},
{
"name": "bd_search",
"description": "Search beads issues by keyword",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
},
"required": ["query"],
},
},
{
"name": "bd_sync",
"description": "Sync issues between beads and ClickUp bidirectionally. Runs incremental sync by default (only changed issues). Pass full=true to import everything.",
"inputSchema": {
"type": "object",
"properties": {
"full": {
"type": "boolean",
"description": "Full sync of all issues. Default: false (incremental). Set to true to force a full re-sync.",
},
"force_import": {
"type": "boolean",
"description": "Import ALL ClickUp tasks including those created by beads in other projects. Default: false.",
},
"webhook": {
"type": "boolean",
"description": "Fetch webhook events first from Cloud Run webhook server before syncing",
},
"webhook_url": {
"type": "string",
"description": "Webhook server URL (optional, uses config/env if not provided)",
},
},
"required": [],
},
},
{
"name": "bd_switch_workspace",
"description": "Switch to a different beads workspace/project. Lists available workspaces if no path provided.",
"inputSchema": {
"type": "object",
"properties": {
"project_dir": {
"type": "string",
"description": "Path to the beads project directory (e.g., /home/user/projects/my-project). If not provided, lists available workspaces.",
},
},
"required": [],
},
},
{
"name": "bd_clickup_setup",
"description": "Setup ClickUp board/list with intelligent auto-detection. Creates list if missing, provisions custom fields and statuses from config.yaml. This is the primary setup command for ClickUp integration - run this first before using other bd commands.",
"inputSchema": {
"type": "object",
"properties": {
"list_name": {
"type": "string",
"description": "Name for new list (optional, defaults to project name). Only used if creating a new list.",
},
"space_id": {
"type": "string",
"description": "ClickUp space ID to create list in (optional, auto-selects Sandbox or first space if not provided)",
},
"force_provision": {
"type": "boolean",
"description": "Force re-provisioning even if fields already exist (default: false)",
},
},
"required": [],
},
},
{
"name": "bd_sync_dependencies",
"description": "Sync beads dependencies to ClickUp (blocks, related, discovered-from). Handles bidirectional sync of dependency relationships.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID (optional, syncs all if omitted)",
},
"direction": {
"type": "string",
"enum": ["bidirectional", "to-clickup", "from-clickup"],
"description": "Sync direction (default: bidirectional)",
},
},
"required": [],
},
},
{
"name": "bd_dep_add",
"description": "Add dependency between issues (wraps: bd dep add). Creates a dependency relationship that will be synced to ClickUp.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID that will be blocked or related",
},
"depends_on_id": {
"type": "string",
"description": "Issue ID that issue_id depends on or is related to",
},
"dep_type": {
"type": "string",
"enum": ["blocks", "related", "parent-child", "discovered-from"],
"description": "Dependency type (default: blocks)",
},
},
"required": ["issue_id", "depends_on_id"],
},
},
{
"name": "bd_dep_remove",
"description": "Remove dependency between issues (wraps: bd dep remove). Removes dependency from both beads and ClickUp.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Source issue ID",
},
"depends_on_id": {
"type": "string",
"description": "Target issue ID to remove dependency from",
},
},
"required": ["issue_id", "depends_on_id"],
},
},
{
"name": "bd_dep_tree",
"description": "Get dependency tree for an issue (wraps: bd dep tree). Shows all dependencies and dependents.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID to get dependency tree for",
},
},
"required": ["issue_id"],
},
},
{
"name": "bd_comment_add",
"description": "Add comment to issue (syncs to ClickUp if enabled). Use this to add notes, updates, or status messages.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID to add comment to",
},
"text": {
"type": "string",
"description": "Comment text",
},
"author": {
"type": "string",
"description": "Comment author (optional, defaults to current user)",
},
},
"required": ["issue_id", "text"],
},
},
{
"name": "bd_comments_list",
"description": "List all comments on an issue. Shows comment history from both beads and ClickUp.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID to get comments for",
},
},
"required": ["issue_id"],
},
},
{
"name": "bd_ready",
"description": "Get work-ready issues (no blockers, open or in_progress). Perfect for AI task selection.",
"inputSchema": {
"type": "object",
"properties": {
"limit": {
"type": "number",
"description": "Maximum issues to show (default: 10)",
},
"priority": {
"type": "string",
"description": "Filter by priority (p0-p4)",
},
"assignee": {
"type": "string",
"description": "Filter by assignee",
},
},
"required": [],
},
},
{
"name": "bd_blocked",
"description": "Show blocked issues (waiting on dependencies). Use to identify bottlenecks.",
"inputSchema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "bd_stale",
"description": "Find stale issues not updated recently. Helps identify abandoned work.",
"inputSchema": {
"type": "object",
"properties": {
"days": {
"type": "number",
"description": "Issues not updated in this many days (default: 30)",
},
"status": {
"type": "string",
"description": "Filter by status (open, in_progress, blocked)",
},
"limit": {
"type": "number",
"description": "Maximum issues to show (default: 50)",
},
},
"required": [],
},
},
{
"name": "bd_stats",
"description": "Get project statistics (issue counts, status breakdown, priority distribution).",
"inputSchema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "bd_label_add",
"description": "Add label to issue (syncs to ClickUp tags).",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID",
},
"label": {
"type": "string",
"description": "Label to add",
},
},
"required": ["issue_id", "label"],
},
},
{
"name": "bd_label_remove",
"description": "Remove label from issue (syncs to ClickUp).",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID",
},
"label": {
"type": "string",
"description": "Label to remove",
},
},
"required": ["issue_id", "label"],
},
},
{
"name": "bd_label_list",
"description": "List labels on an issue.",
"inputSchema": {
"type": "object",
"properties": {
"issue_id": {
"type": "string",
"description": "Issue ID (optional, lists all labels if omitted)",
},
},
"required": [],
},
},
]
def handle_request(self, request: dict[str, Any]) -> dict[str, Any] | None:
"""Handle incoming MCP requests.
Args:
request: MCP request dictionary
Returns:
MCP response dictionary, or None for notifications (id is None)
"""
method = request.get("method")
request_id = request.get("id")
# Handle JSON-RPC responses (no "method" key — these are replies to server-initiated requests)
if method is None:
if request_id == "beads-roots-1":
self._process_roots_list_response(request)
# Responses never get a reply
return None
# Handle notifications (JSON-RPC messages with no id)
# Notifications must not receive a response per JSON-RPC 2.0 spec
if request_id is None:
if method == "notifications/initialized":
# Acknowledge the initialized notification silently
return None
# Ignore other unknown notifications
return None
if method == "initialize":
# Check if client declared roots capability — we'll request roots after handshake
capabilities = request.get("params", {}).get("capabilities", {})
self._client_supports_roots = "roots" in capabilities
# Echo back the client's protocol version (MCP spec requirement)
client_protocol = request.get("params", {}).get(
"protocolVersion", "2024-11-05"
)
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"protocolVersion": client_protocol,
"capabilities": {
"tools": {},
},
"serverInfo": {
"name": "beads-mcp-server",
"version": "1.0.0",
},
},
}
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": self.tools,
},
}
elif method == "tools/call":
tool_name = request.get("params", {}).get("name")
arguments = request.get("params", {}).get("arguments", {})
try:
result = self._execute_tool(tool_name, arguments)
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": result,
}
],
},
}
except Exception as e:
error_msg = str(e)
# If it's a CalledProcessError, include stderr for better debugging
if isinstance(e, subprocess.CalledProcessError):
if e.stderr:
error_msg = f"{error_msg}\n\nError details:\n{e.stderr}"
elif e.stdout:
error_msg = f"{error_msg}\n\nOutput:\n{e.stdout}"
logger.error(
f"Error executing tool {tool_name}: {error_msg}", exc_info=True
)
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": error_msg,
},
}
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {method}",
},
}
def _is_beads_project_dir(self, path: Path) -> bool:
"""Check if a directory is a beads project (not just the global registry).
A beads project directory contains .beads/ with project-specific files
like issues.jsonl, config.yaml, or metadata.json. The global registry
at ~/.beads only has registry.json and registry.lock.
Args:
path: Directory path to check
Returns:
True if this is a beads project directory
"""
beads_dir = path / ".beads"
if not beads_dir.exists() or not beads_dir.is_dir():
return False
# Check for project-specific files (not present in global registry)
project_indicators = ["issues.jsonl", "config.yaml", "metadata.json"]
for indicator in project_indicators:
if (beads_dir / indicator).exists():
return True
return False
# ------------------------------------------------------------------
# Dynamic tool schema builders (driven by .beads/templates.yaml)
# ------------------------------------------------------------------
def _build_section_schema_properties(self) -> dict[str, Any]:
"""Build JSON Schema properties for each template section.
Returns a dict of property-name -> schema-object suitable for
inclusion in a tool's ``inputSchema.properties``.
"""
props: dict[str, Any] = {}
for key, default_content in self._template_sections.items():
label = SECTION_KEY_MAP.get(key, key.replace("_", " ").title())
hint = default_content.replace("\n", " ").strip()
if len(hint) > 80:
hint = hint[:77] + "..."
props[key] = {
"type": "string",
"description": f"{label} section. Default: {hint}",
}
return props
def _load_clickup_config(self) -> dict:
"""Load .beads/integrations/clickup/config.yaml for this project.
Returns parsed config dict, or empty dict if not found or invalid.
"""
if self.project_dir is None:
return {}
config_path = (
self.project_dir / ".beads" / "integrations" / "clickup" / "config.yaml"
)
if not config_path.exists():
return {}
try:
with open(config_path) as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.warning("Could not load clickup config for schema: %s", e)
return {}
def _build_custom_field_schema_properties(self) -> dict[str, Any]:
"""Build JSON Schema properties for custom fields from config.yaml.
Reads the custom_fields: block from .beads/integrations/clickup/config.yaml.
For drop_down fields, includes an enum of option names.
For other field types, emits a plain string property.
pr_link is always included as a first-class beads field.
Returns empty dict (except pr_link) if no config or no custom_fields block.
"""
props: dict[str, Any] = {}
custom_fields = self._clickup_config.get("field_mapping", {}).get("custom_fields", {})
for field_key, field_cfg in custom_fields.items():
if not isinstance(field_cfg, dict):
continue
field_name = field_cfg.get("name", field_key.replace("_", " ").title())
field_type = field_cfg.get("type", "text")
if field_type == "drop_down":
options_raw = field_cfg.get("options", field_cfg.get("type_config", {}).get("options", []))
option_names = []
for o in options_raw:
if isinstance(o, str):
option_names.append(o)
elif isinstance(o, dict):
name = o.get("name")
if name:
option_names.append(name)
prop: dict[str, Any] = {
"type": "string",
"description": f"{field_name} (optional)",
}
if option_names:
prop["enum"] = option_names
prop["description"] = f"{field_name}: {', '.join(option_names)}"
else:
prop = {
"type": "string",
"description": f"{field_name} (optional)",
}
props[field_key] = prop
# pr_link is always present — it's a first-class beads field
props["pr_link"] = {
"type": "string",
"description": "Pull request/merge request URL (optional)",
}
return props
def _build_bd_create_tool(self) -> dict[str, Any]:
"""Build the bd_create tool definition with per-section params."""
section_names = ", ".join(
SECTION_KEY_MAP.get(k, k.replace("_", " ").title())
for k in self._section_keys
)
properties: dict[str, Any] = {
"title": {
"type": "string",
"description": "Issue title",
},
"priority": {
"type": "string",
"description": "Priority: p0 (urgent), p1 (high), p2 (normal), p3 (low), p4 (low)",
"enum": ["p0", "p1", "p2", "p3", "p4"],
},
"type": {
"type": "string",
"description": "Issue type: bug, feature, task, etc.",
},
}
# Per-section parameters from template
properties.update(self._build_section_schema_properties())
# Deprecated fallback
properties["description"] = {
"type": "string",
"description": (
"Full description (deprecated — prefer per-section params: "
f"{', '.join(self._section_keys)}). "
"If both section params and description are provided, section params win."
),
}
# Custom field params — read from config.yaml dynamically
properties.update(self._build_custom_field_schema_properties())
return {
"name": "bd_create",
"description": (
"Create a new beads issue with automatic ClickUp sync. "
f"Provide content via per-section params ({section_names}) "
"or omit them to use template defaults from .beads/templates.yaml."
),
"inputSchema": {
"type": "object",
"properties": properties,
"required": ["title"],
},
}
def _build_bd_update_tool(self) -> dict[str, Any]:
"""Build the bd_update tool definition with per-section params."""
section_names = ", ".join(
SECTION_KEY_MAP.get(k, k.replace("_", " ").title())
for k in self._section_keys
)
properties: dict[str, Any] = {
"issue_id": {
"type": "string",
"description": "Issue ID (e.g., project-abc)",
},
"status": {
"type": "string",
"description": "New status: open, in_progress, completed, closed",
},
"priority": {
"type": "string",
"description": "New priority: p0, p1, p2, p3, p4",
},
"title": {
"type": "string",
"description": "New title",
},
}
# Per-section parameters from template
properties.update(self._build_section_schema_properties())
# Deprecated fallback
properties["description"] = {
"type": "string",
"description": (
"Full description (deprecated — prefer per-section params: "
f"{', '.join(self._section_keys)}). "
"If both section params and description are provided, section params win."
),
}
# Custom field params — read from config.yaml dynamically
properties.update(self._build_custom_field_schema_properties())
return {
"name": "bd_update",
"description": (
"Update a beads issue with automatic ClickUp sync. "
f"Update description via per-section params ({section_names}). "
"Only provided sections are updated; others keep existing content."
),
"inputSchema": {
"type": "object",
"properties": properties,
"required": ["issue_id"],
},
}
def _reload_template(self) -> None:
"""Reload template sections and rebuild tool schemas.
Called after workspace switch to reflect the new project's template.
"""
self._template_sections = get_template_sections(self.project_dir)
self._section_keys = list(self._template_sections.keys())
# Rebuild tool list entries for bd_create and bd_update
for i, tool in enumerate(self.tools):
if tool["name"] == "bd_create":
self.tools[i] = self._build_bd_create_tool()
elif tool["name"] == "bd_update":
self.tools[i] = self._build_bd_update_tool()
def _find_beads_project_dir(self) -> str:
"""Find the beads project directory.
Search order:
1. Workspace override (set via bd_switch_workspace tool)
2. Workspace from MCP protocol roots (set on initialize by Claude Code)
3. BEADS_PROJECT_DIR environment variable (if valid beads project)
4. Walk up from current working directory to find .beads (like git)
5. Most recently modified .beads in ~/projects (auto-detect)
Returns:
Path to the project directory
Raises:
ValueError: If no beads project found
"""
# First, check workspace override (set via bd_switch_workspace)
if self._workspace_override:
override_path = Path(self._workspace_override)
if (
override_path.exists()
and override_path.is_dir()
and self._is_beads_project_dir(override_path)
):
logger.debug(f"Using workspace override: {override_path}")
return str(override_path)
# If override is invalid, log warning but continue to other methods
logger.warning(
f"Workspace override {self._workspace_override} is not a valid beads project"
)
# Check workspace detected from MCP protocol roots (sent by Claude Code on initialize)
if self._roots_workspace:
roots_path = Path(self._roots_workspace)
if (
roots_path.exists()
and roots_path.is_dir()
and self._is_beads_project_dir(roots_path)
):
logger.debug(f"Using workspace from MCP roots: {roots_path}")
return str(roots_path)
logger.warning(
f"MCP roots workspace {self._roots_workspace} is not a valid beads project"
)
# Check BEADS_PROJECT_DIR environment variable (if it's a valid beads project)
env_project_dir = os.environ.get("BEADS_PROJECT_DIR")
if env_project_dir:
project_path = Path(env_project_dir)
if (
project_path.exists()
and project_path.is_dir()
and self._is_beads_project_dir(project_path)
):
logger.debug(f"Using BEADS_PROJECT_DIR: {project_path}")
return str(project_path)
# Don't fail - continue to other detection methods
# Fall back to walking up from cwd (like git does)
cwd = Path.cwd()
current = cwd
while current != current.parent:
if self._is_beads_project_dir(current):
logger.debug(f"Found beads project at: {current}")
return str(current)
current = current.parent
# Check root as well
if self._is_beads_project_dir(current):
logger.debug(f"Found beads project at: {current}")
return str(current)
# Auto-detect: find most recently modified .beads in ~/projects
projects_dir = Path.home() / "projects"
if projects_dir.exists():
beads_dirs = []
for proj_dir in projects_dir.iterdir():
if proj_dir.is_dir():
beads_path = proj_dir / ".beads"
if beads_path.exists() and self._is_beads_project_dir(proj_dir):
try:
mtime = beads_path.stat().st_mtime
beads_dirs.append((mtime, proj_dir))
except OSError:
pass
if beads_dirs:
# Sort by modification time, most recent first
beads_dirs.sort(reverse=True)
most_recent = beads_dirs[0][1]
logger.info(f"Auto-detected beads project: {most_recent}")
return str(most_recent)
raise ValueError(
f"No beads project found in {cwd} or any parent directory. "
"Either run from a beads project directory, or set BEADS_PROJECT_DIR environment variable."
)
def _refresh_project_dir(self) -> Path | None:
"""Refresh and cache the active beads project directory.
This avoids stale workspace issues when the MCP server process is reused
across different projects in the same IDE session.
"""
try:
self.project_dir = Path(self._find_beads_project_dir())
except ValueError:
self.project_dir = None
self.custom_field_store = CustomFieldStore(self.project_dir) if self.project_dir else None
self._clickup_config = self._load_clickup_config()
self._reload_template()
return self.project_dir
def _update_results_section(
self, description: str, results: str, mark_dod_complete: bool = True
) -> str:
"""Update the Results section in a structured issue description.
Args:
description: The issue description text
results: The new results text to set
mark_dod_complete: If True, mark all DoD checkboxes as complete
Returns:
Updated description text
"""
sections = IssueDescriptionValidator.parse_description(description)
# Update Results section - find the LAST occurrence (actual section header)
if "Results" in sections:
old_results = sections["Results"]
if old_results.strip() == "TBD" or not old_results.strip():
# Find position of last **Results:** (the actual section header)
last_pos = description.rfind("**Results:**")
if last_pos != -1:
# Replace from last position to end
description = description[:last_pos] + f"**Results:**\n{results}"
else:
# Append Results section if not present
description = description.rstrip() + f"\n\n**Results:**\n{results}"
# Mark Definition of Done checkboxes as complete
if mark_dod_complete and "Definition of Done" in sections:
# Replace unchecked boxes with checked boxes
description = re.sub(r"- \[ \]", "- [x]", description)
return description
def _get_issue_description(self, issue_id: str) -> str | None:
"""Get the description of an issue.
Args:
issue_id: The issue ID
Returns:
Issue description or None if not found
"""
try:
result = self._run_bd_command(["bd", "show", issue_id, "--json"])
data = json.loads(result.stdout)
if isinstance(data, list) and len(data) > 0:
return data[0].get("description", "")
elif isinstance(data, dict):
return data.get("description", "")
except (json.JSONDecodeError, subprocess.CalledProcessError):
pass
return None
def _run_bd_command(self, cmd: list[str]) -> subprocess.CompletedProcess:
"""Run a bd command using the wrapper script directly.
Args:
cmd: Command list starting with 'bd'
Returns:
CompletedProcess result
"""
# Find the beads project directory from BEADS_PROJECT_DIR
try:
beads_project_dir = self._find_beads_project_dir()
except ValueError as e:
error_msg = str(e)
logger.error(error_msg)
raise subprocess.CalledProcessError(
1, cmd, output="", stderr=error_msg
) from e
# Use the bd wrapper script directly (no need for devbox)
# The wrapper auto-detects the project and handles ClickUp sync
bd_wrapper_path = self.bd_wrapper_path
# Replace 'bd' with the full path to the wrapper
if cmd[0] == "bd":
final_cmd = [bd_wrapper_path] + cmd[1:]
else:
# If cmd doesn't start with 'bd', prepend the wrapper path
final_cmd = [bd_wrapper_path] + cmd
try:
# Set PYTHON_EXECUTABLE and PYTHONPATH so the wrapper uses the same Python as MCP server
# This ensures dependencies (requests, etc.) are available
env = os.environ.copy()
env["PYTHON_EXECUTABLE"] = sys.executable
# MCP server syncs to ClickUp explicitly - skip wrapper sync to avoid duplicate comments
env["BD_SKIP_WRAPPER_SYNC"] = "1"
# Add the bd-clickup-integration directory to PYTHONPATH so the module can be found
module_dir = Path(__file__).parent.parent
pythonpath = env.get("PYTHONPATH", "")
if pythonpath:
env["PYTHONPATH"] = f"{module_dir}:{pythonpath}"
else:
env["PYTHONPATH"] = str(module_dir)
# Run the command from the beads project directory
# The wrapper will auto-detect the project and sync to ClickUp
result = subprocess.run(
final_cmd,
capture_output=True,
text=True,
check=True,
cwd=beads_project_dir,
env=env,
)
return result
except subprocess.CalledProcessError as e:
# Include stderr in error message for debugging
error_msg = f"Command failed: {' '.join(e.cmd)}\n"
if e.stdout:
error_msg += f"stdout: {e.stdout}\n"
if e.stderr:
error_msg += f"stderr: {e.stderr}\n"
if not e.stderr and not e.stdout:
error_msg += f"Return code: {e.returncode}\n"
logger.error(f"bd command failed: {error_msg}")
# Raise with more informative error
raise subprocess.CalledProcessError(
e.returncode, e.cmd, output=e.stdout, stderr=e.stderr
) from e
def _execute_tool(self, tool_name: str, arguments: dict[str, Any]) -> str:
"""Execute a bd command based on tool name and arguments.
Args:
tool_name: Name of the tool to execute
arguments: Tool arguments
Returns:
Command output as string
"""
# Keep workspace detection fresh for every tool call.
# bd_switch_workspace intentionally manages project_dir directly.
if tool_name != "bd_switch_workspace":
self._refresh_project_dir()
if tool_name == "bd_create":
return self._bd_create(arguments)
elif tool_name == "bd_list":
return self._bd_list(arguments)
elif tool_name == "bd_show":
return self._bd_show(arguments)
elif tool_name == "bd_update":
return self._bd_update(arguments)
elif tool_name == "bd_close":
return self._bd_close(arguments)
elif tool_name == "bd_search":
return self._bd_search(arguments)
elif tool_name == "bd_sync":
return self._bd_sync(arguments)
elif tool_name == "bd_switch_workspace":
return self._bd_switch_workspace(arguments)
elif tool_name == "bd_clickup_setup":
return self._bd_clickup_setup(arguments)
elif tool_name == "bd_sync_dependencies":
return self._bd_sync_dependencies(arguments)
elif tool_name == "bd_dep_add":
return self._bd_dep_add(arguments)
elif tool_name == "bd_dep_remove":
return self._bd_dep_remove(arguments)
elif tool_name == "bd_dep_tree":
return self._bd_dep_tree(arguments)
elif tool_name == "bd_comment_add":
return self._bd_comment_add(arguments)
elif tool_name == "bd_comments_list":
return self._bd_comments_list(arguments)
elif tool_name == "bd_ready":
return self._bd_ready(arguments)
elif tool_name == "bd_blocked":
return self._bd_blocked(arguments)
elif tool_name == "bd_stale":
return self._bd_stale(arguments)
elif tool_name == "bd_stats":
return self._bd_stats(arguments)
elif tool_name == "bd_label_add":
return self._bd_label_add(arguments)
elif tool_name == "bd_label_remove":
return self._bd_label_remove(arguments)
elif tool_name == "bd_label_list":
return self._bd_label_list(arguments)
else:
raise ValueError(f"Unknown tool: {tool_name}")
def _get_custom_field_names_from_config(self) -> set[str]:
"""Get all custom field names from config.
Returns the exact keys as they appear in config (e.g. "01_lifecycle_stage").
Callers must use these exact keys when matching against incoming args so that
prefixed schema parameter names match without any normalization step.
"""
config_path = (
self.project_dir
/ ".beads/integrations/clickup/config.yaml"
)
if not config_path.exists():
return set()
try:
with open(config_path) as f:
config = yaml.safe_load(f)
except (OSError, yaml.YAMLError) as e:
logger.warning("Failed to load config: %s", e)
return set()
custom_fields_config = config.get("field_mapping", {}).get("custom_fields", {})
return set(custom_fields_config.keys())
def _bd_create(self, args: dict[str, Any]) -> str:
"""Create a new beads issue.
If the description lacks structure (no **Objective:** section),
it will be wrapped in a minimal template based on issue type.
"""
logger.info("MCP server v2: project_dir=%s", self.project_dir)
cmd = ["bd", "create", args["title"]]
if "priority" in args:
cmd.extend(["--priority", args["priority"]])
if "type" in args:
cmd.extend(["--type", args["type"]])
# Build description from per-section params or fallback to description/template
section_args = {
k: args[k] for k in self._section_keys if k in args
}
if section_args:
# Merge caller-provided sections with template defaults
merged = self._template_sections.copy()
merged.update(section_args)
description = render_template(merged, title=args["title"])
logger.info("Built description from per-section params: %s", list(section_args))
elif args.get("description"):
description = enforce_template_structure(
args["description"],
project_dir=self.project_dir,
title=args["title"],
)
logger.info("Enforced template structure on description fallback")
else:
description = get_template(
title=args["title"], project_dir=self.project_dir
)
logger.info("Applied template for new issue")
cmd.extend(["--description", description])
result = self._run_bd_command(cmd)
output = result.stdout + result.stderr
# Extract issue ID from output
issue_id = None
match = re.search(r"Created issue: ([\w-]+)", output)
if match:
issue_id = match.group(1)
# Extract custom fields from args and store separately.
# _get_custom_field_names_from_config() returns exact config keys (e.g.
# "01_lifecycle_stage"), which match the MCP schema parameter names directly.
custom_field_names = self._get_custom_field_names_from_config()
custom_fields = {k: v for k, v in args.items() if k in custom_field_names}
# Store custom fields in CustomFieldStore
if custom_fields and issue_id and self.custom_field_store:
self.custom_field_store.set(
issue_id,
custom_fields=custom_fields,
clickup_metadata={},
merge=False # Creating new issue, not merging
)
logger.info("Stored %d custom fields for %s", len(custom_fields), issue_id)
if issue_id:
sync_result = self._sync_to_clickup(issue_id)
output += f"\n{sync_result}"
return output
def _get_sync_engine(self) -> Optional[Any]:
"""Return a SyncEngine instance for the current project, or None if unavailable."""
try:
from beads_clickup.sync_engine import SyncEngine
beads_project_dir = self._find_beads_project_dir()
project_dir = Path(beads_project_dir)
project_env = project_dir / ".env"
if project_env.exists():
load_dotenv(project_env)
config_path = (
project_dir / ".beads" / "integrations" / "clickup" / "config.yaml"
)
if not config_path.exists():
return None
return SyncEngine(config_path=str(config_path))
except Exception as exc:
logger.debug("_get_sync_engine: could not instantiate SyncEngine: %s", exc)
return None
def _sync_to_clickup(self, issue_id: str) -> str:
"""Sync an issue to ClickUp.
Args:
issue_id: The beads issue ID to sync
Returns:
Sync result message
"""
try:
from beads_clickup.sync_engine import SyncEngine
# Find the beads project directory to get the correct config path
beads_project_dir = self._find_beads_project_dir()
project_dir = Path(beads_project_dir)
# Load project .env so CLICKUP_API_TOKEN is available (MCP may start with different cwd)
project_env = project_dir / ".env"
if project_env.exists():
load_dotenv(project_env)
config_path = (
project_dir
/ ".beads"
/ "integrations"
/ "clickup"
/ "config.yaml"
)
if not config_path.exists():
return f"Warning: ClickUp config not found at {config_path}"
engine = SyncEngine(config_path=str(config_path))
# Always refresh issue in issues.jsonl from beads db before syncing.
# The bd daemon writes to beads.db, but the sync engine reads from
# issues.jsonl which may be stale (e.g. status still "in_progress"
# after bd close set it to "closed" in the db).
try:
show_result = self._run_bd_command(["bd", "show", issue_id, "--json"])
show_data = json.loads(show_result.stdout)
if isinstance(show_data, list) and show_data:
issue_data = show_data[0]
elif isinstance(show_data, dict):
issue_data = show_data
else:
return f"Warning: Issue {issue_id} not found"
# Update or append in issues.jsonl so sync engine has fresh data
issues_file = project_dir / ".beads" / "issues.jsonl"
self._upsert_issue_in_jsonl(issues_file, issue_id, issue_data)
logger.info("Refreshed %s in issues.jsonl for sync", issue_id)
except Exception as e:
logger.warning("Failed to refresh issue %s: %s", issue_id, e)
# Fall through — sync engine will try with whatever is in jsonl
task_id = engine.sync_issue_to_clickup(issue_id, force=True)
if task_id:
task_url = f"https://app.clickup.com/t/{task_id}"
return f"[Sync] Synced to ClickUp: {task_url}"
else:
list_id = engine.clickup_config.get("default_list_id")
return f"Warning: ClickUp sync returned None (project={beads_project_dir}, list_id={list_id})"
except Exception as e:
logger.warning("ClickUp sync failed for %s: %s", issue_id, e, exc_info=True)
return f"Warning: ClickUp sync failed: {e}"
@staticmethod
def _upsert_issue_in_jsonl(issues_file: Path, issue_id: str, issue_data: dict) -> None:
"""Update an existing issue in issues.jsonl or append if not found.
Reads the file, replaces the matching line, and writes back.
If the issue is not found, appends it.
"""
lines = []
found = False
if issues_file.exists():
with open(issues_file) as f:
for line in f:
stripped = line.strip()
if not stripped:
continue
try:
existing = json.loads(stripped)
if existing.get("id") == issue_id:
# Merge: keep custom_fields from existing if not in new data
existing_cf = existing.get("custom_fields", {})
new_cf = issue_data.get("custom_fields", {})
merged_cf = {**existing_cf, **new_cf}
issue_data["custom_fields"] = merged_cf
lines.append(json.dumps(issue_data) + "\n")
found = True
continue
except json.JSONDecodeError:
pass
lines.append(stripped + "\n")
if not found:
lines.append(json.dumps(issue_data) + "\n")
with open(issues_file, "w") as f:
f.writelines(lines)
def _bd_list(self, args: dict[str, Any]) -> str:
"""List beads issues."""
cmd = ["bd", "list", "--json"]
if "status" in args:
cmd.extend(["--status", args["status"]])
if "priority" in args:
cmd.extend(["--priority", args["priority"]])
result = self._run_bd_command(cmd)
# Parse and format the JSON output
try:
issues = json.loads(result.stdout)
if not issues:
return "No issues found."
# Format as readable text
output = f"Found {len(issues)} issue(s):\n\n"
for issue in issues:
output += (
f"-{issue.get('id', 'N/A')}: {issue.get('title', 'Untitled')}\n"
)
output += f" Status: {issue.get('status', 'N/A')} | Priority: {issue.get('priority', 'N/A')}\n"
if issue.get("type"):
output += f" Type: {issue.get('type')}\n"
output += "\n"
return output
except json.JSONDecodeError:
return result.stdout
def _bd_show(self, args: dict[str, Any]) -> str:
"""Show issue details."""
cmd = ["bd", "show", args["issue_id"]]
result = self._run_bd_command(cmd)
return result.stdout
def _bd_update(self, args: dict[str, Any]) -> str:
"""Update a beads issue.
Args:
args: Dictionary containing issue_id and fields to update.
Custom fields are extracted and stored separately.
Returns:
String describing the update result.
"""
issue_id = args.get("issue_id")
if not issue_id:
return "Error: issue_id required"
# 1. Extract custom fields from args using exact key matching.
# See _bd_create for details — same approach applies here.
custom_field_names = self._get_custom_field_names_from_config()
custom_fields = {k: v for k, v in args.items() if k in custom_field_names}
# 2. Build bd CLI command for core fields only
cmd = ["bd", "update", issue_id]
# Core fields (not custom fields)
core_field_updates = {}
if "status" in args and "status" not in custom_field_names:
cmd.extend(["--status", args["status"]])
core_field_updates["status"] = args["status"]
if "priority" in args and "priority" not in custom_field_names:
cmd.extend(["--priority", args["priority"]])
core_field_updates["priority"] = args["priority"]
if "title" in args and "title" not in custom_field_names:
cmd.extend(["--title", args["title"]])
core_field_updates["title"] = args["title"]
# Build description from per-section params or fallback to description
section_args = {
k: args[k] for k in self._section_keys if k in args
}
if section_args:
# Merge caller-provided sections with template defaults
merged = self._template_sections.copy()
merged.update(section_args)
description = render_template(merged, title=args.get("title", ""))
cmd.extend(["--description", description])
core_field_updates["description"] = True
logger.info("Built update description from per-section params: %s", list(section_args))
elif "description" in args:
enforced = enforce_template_structure(
args["description"],
project_dir=self.project_dir,
title=args.get("title", ""),
)
cmd.extend(["--description", enforced])
core_field_updates["description"] = True
# Add any other core fields like assignee
if "assignee" in args and "assignee" not in custom_field_names:
cmd.extend(["--assignee", args["assignee"]])
core_field_updates["assignee"] = args["assignee"]
# 3. Run bd CLI command if there are core field updates
output = ""
if core_field_updates:
result = self._run_bd_command(cmd)
output = result.stdout + result.stderr
# 4. Update custom fields separately with merge semantics
if custom_fields and self.custom_field_store:
existing = self.custom_field_store.get(issue_id) or {}
existing_custom_fields = existing.get("custom_fields", {})
existing_custom_fields.update(custom_fields) # Merge, don't replace
self.custom_field_store.set(
issue_id,
custom_fields=existing_custom_fields,
clickup_metadata=existing.get("clickup_metadata", {}),
merge=True # Use merge semantics
)
logger.info("Updated %d custom fields for %s", len(custom_fields), issue_id)
# 5. Sync to ClickUp if custom fields changed
sync_result = self._sync_to_clickup(issue_id)
output += f"\n{sync_result}"
elif not custom_fields:
# Sync to ClickUp even if no custom fields (core fields may have changed)
sync_result = self._sync_to_clickup(issue_id)
output += f"\n{sync_result}"
if not core_field_updates and not custom_fields:
return "No updates specified"
return output or f"Updated issue: {issue_id}"
def _bd_close(self, args: dict[str, Any]) -> str:
"""Close a beads issue.
If a reason is provided, this method will:
1. Update the issue description's Results section with the close reason
2. Mark Definition of Done checkboxes as complete
3. Close the issue with the reason
"""
issue_id = args["issue_id"]
reason = args.get("reason", "")
# PR approval gate — only enforced when ClickUp integration is configured
# and sync.require_pr_approval is true in config
engine = self._get_sync_engine()
if engine is not None:
allowed, gate_reason = engine.check_close_gate(issue_id)
if not allowed:
return (
f"Cannot close {issue_id}: {gate_reason}\n\n"
f"To proceed:\n"
f" 1. Set pr_link: bd_update(issue_id='{issue_id}', pr_link='https://github.com/owner/repo/pull/N')\n"
f" 2. Wait for PR approval/merge\n"
f" 3. Either run bd_sync (auto-completes) or retry bd_close after approval\n\n"
f"To bypass for non-PR work: add 'no-pr' label to the issue."
)
# If reason provided, update the Results section in description
if reason:
description = self._get_issue_description(issue_id)
if description:
# Build results text with timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
results_text = f"[OK] Completed ({timestamp})\n\n{reason}"
# Update description with results
updated_description = self._update_results_section(
description, results_text, mark_dod_complete=True
)
# Update the issue description if changed
if updated_description != description:
try:
update_cmd = [
"bd",
"update",
issue_id,
"--description",
updated_description,
]
self._run_bd_command(update_cmd)
logger.info("Updated Results section for issue %s", issue_id)
except subprocess.CalledProcessError as e:
logger.warning(
"Failed to update description for %s: %s", issue_id, e
)
# Close the issue
cmd = ["bd", "close", issue_id]
if reason:
cmd.extend(["--reason", reason])
result = self._run_bd_command(cmd)
output = result.stdout + result.stderr
# Explicitly sync to ClickUp
sync_result = self._sync_to_clickup(issue_id)
output += f"\n{sync_result}"
return output
def _bd_search(self, args: dict[str, Any]) -> str:
"""Search beads issues."""
cmd = ["bd", "search", args["query"]]
result = self._run_bd_command(cmd)
return result.stdout
def _bd_sync(self, args: dict[str, Any]) -> str:
"""Sync issues between beads and ClickUp bidirectionally."""
from beads_clickup.sync_engine import SyncEngine
full_sync = args.get("full", False)
force_import = args.get("force_import", False)
use_webhook = args.get("webhook", False)
webhook_url = args.get("webhook_url")
try:
# Find the beads project directory to get the correct config path
beads_project_dir = self._find_beads_project_dir()
config_path = (
Path(beads_project_dir)
/ ".beads"
/ "integrations"
/ "clickup"
/ "config.yaml"
)
# Initialize sync engine with explicit config path
engine = SyncEngine(config_path=str(config_path))
mode_str = "full" if full_sync else "incremental"
if force_import:
mode_str += " (force-import)"
result_lines = [f"[Sync] Starting {mode_str} sync..."]
# Fetch webhook events first if requested
webhook_stats = None
if use_webhook:
result_lines.append("[Webhook]Fetching webhook events...")
webhook_stats = engine.fetch_webhook_events(webhook_url)
if webhook_stats.get("processed", 0) > 0:
result_lines.append(
f" Processed {webhook_stats['processed']} webhook events"
)
# Run sync
stats = engine.sync_all(full_sync=full_sync, force_import=force_import)
# Format results
result_lines.append("")
result_lines.append("[Results] Sync Results:")
result_lines.append(f"Mode: {stats.get('mode', mode_str)}")
b2c = stats.get("beads_to_clickup", {})
result_lines.append("")
result_lines.append("Beads → ClickUp:")
result_lines.append(f" Created: {b2c.get('created', 0)}")
result_lines.append(f" Updated: {b2c.get('updated', 0)}")
if b2c.get("unchanged", 0) > 0:
result_lines.append(f" Unchanged: {b2c['unchanged']} (skipped)")
if b2c.get("skipped", 0) > 0:
result_lines.append(f" Filtered: {b2c['skipped']}")
result_lines.append(f" Failed: {b2c.get('failed', 0)}")
c2b = stats.get("clickup_to_beads", {})
result_lines.append("")
result_lines.append("ClickUp → Beads:")
result_lines.append(f" Created: {c2b.get('created', 0)}")
result_lines.append(f" Updated: {c2b.get('updated', 0)}")
if c2b.get("unchanged", 0) > 0:
result_lines.append(f" Unchanged: {c2b['unchanged']} (skipped)")
result_lines.append(f" Closed: {c2b.get('closed', 0)}")
result_lines.append(f" Failed: {c2b.get('failed', 0)}")
if webhook_stats and webhook_stats.get("processed", 0) > 0:
result_lines.append("")
result_lines.append("Webhook Events:")
result_lines.append(f" Processed: {webhook_stats['processed']}")
result_lines.append("")
result_lines.append("[OK] Sync complete")
return "\n".join(result_lines)
except Exception as e:
return f"Error: Sync failed: {str(e)}"
def _bd_switch_workspace(self, args: dict[str, Any]) -> str:
"""Switch to a different beads workspace or list available workspaces.
Args:
args: Dictionary with optional 'project_dir' key
Returns:
Status message with workspace info
"""
project_dir = args.get("project_dir")
# If no project_dir provided, list available workspaces
if not project_dir:
return self._list_available_workspaces()
# Validate the provided path
project_path = Path(project_dir).expanduser().resolve()
if not project_path.exists():
return f"Error: Path does not exist: {project_path}"
if not project_path.is_dir():
return f"Error: Path is not a directory: {project_path}"
if not self._is_beads_project_dir(project_path):
return f"Error: Not a beads project (no .beads/ directory): {project_path}"
# Set the workspace override
self._workspace_override = str(project_path)
# Update project_dir for methods that use it directly
self.project_dir = project_path
# Reload template sections and tool schemas for the new workspace
self._reload_template()
# Re-initialize CustomFieldStore for the new workspace
self.custom_field_store = CustomFieldStore(self.project_dir)
# Get the issue prefix from config if available
config_path = project_path / ".beads" / "config.yaml"
issue_prefix = "unknown"
if config_path.exists():
try:
import yaml
with open(config_path) as f:
config = yaml.safe_load(f)
issue_prefix = config.get("issue-prefix", "unknown")
except Exception:
pass
return (
f"[OK] Switched to workspace: {project_path}\n"
f" Issue prefix: {issue_prefix}\n"
f"\n"
f"All subsequent bd_* commands will use this workspace."
)
def _bd_clickup_setup(self, args: dict[str, Any]) -> str:
"""Setup ClickUp board/list with intelligent detection and provisioning.
Args:
args: Dictionary with optional keys:
- list_name: Name for new list
- space_id: Space ID to create in
- force_provision: Force re-provisioning
Returns:
Status message with setup results and ClickUp URL
"""
try:
if self.project_dir is None:
return "Error: No beads project found. Run bd_switch_workspace first."
config_path = (
self.project_dir / ".beads" / "integrations" / "clickup" / "config.yaml"
)
if not config_path.exists():
return f"Error: ClickUp config not found at {config_path}"
# Use project_dir to get the right config (not CWD-based SyncEngine())
engine = SyncEngine(config_path=str(config_path))
config = engine.config
# Initialize provisioner
provisioner = ClickUpProvisioner(
client=engine.mcp_client,
config=config,
config_path=config_path,
)
# Run setup
result = provisioner.setup_clickup_list(
list_name=args.get("list_name"),
space_id=args.get("space_id"),
)
# Hot-reload MCP schemas so bd_create/bd_update reflect new fields immediately
self._clickup_config = self._load_clickup_config()
self._reload_template()
# Format response
lines = ["[ClickUp Setup]"]
lines.append("")
lines.append(f"List: {result['list_name']}")
lines.append(f"List ID: {result['list_id']}")
lines.append("")
if result["created"]:
lines.append("✓ Created new list in ClickUp")
else:
lines.append("✓ Using existing list from config")
if result["provisioned"]:
lines.append("✓ Provisioned with custom fields and statuses")
else:
lines.append("✓ List already has custom fields")
# Count fields/statuses from freshly-loaded config
custom_fields = self._clickup_config.get("field_mapping", {}).get("custom_fields", {})
statuses = self._clickup_config.get("field_mapping", {}).get("status", {})
lines.append(
f"✓ Discovered and synced {len(custom_fields)} custom fields, "
f"{len(statuses)} statuses"
)
lines.append("")
lines.append(f"View in ClickUp: {result['list_url']}")
lines.append("")
lines.append(
"[OK] ClickUp setup complete. You can now use bd_create, bd_sync, and other commands."
)
return "\n".join(lines)
except Exception as e:
logger.error("ClickUp setup failed", exc_info=True)
return f"Error: ClickUp setup failed: {str(e)}"
def _bd_sync_dependencies(self, args: dict[str, Any]) -> str:
"""Sync dependencies to/from ClickUp.
Args:
args: Dictionary with optional keys:
- issue_id: Issue ID (syncs all if omitted)
- direction: Sync direction (bidirectional, to-clickup, from-clickup)
Returns:
Status message with sync results
"""
from beads_clickup.sync_engine import SyncEngine
try:
engine = SyncEngine()
issue_id = args.get("issue_id")
direction = args.get("direction", "bidirectional")
result = engine.sync_dependencies_only(issue_id=issue_id, direction=direction)
lines = ["[Dependency Sync]"]
lines.append(f"Direction: {direction}")
lines.append("")
if issue_id:
lines.append(f"Results for {issue_id}:")
lines.append(f" Synced: {result.get('synced', 0)}")
lines.append(f" Failed: {result.get('failed', 0)}")
lines.append(f" Skipped: {result.get('skipped', 0)}")
if result.get("errors"):
lines.append("")
lines.append("Errors:")
for error in result["errors"]:
lines.append(f" - {error}")
else:
lines.append("Results:")
lines.append(f" Total issues: {result.get('total_issues', 0)}")
lines.append(f" Issues synced: {result.get('synced', 0)}")
lines.append(f" Issues failed: {result.get('failed', 0)}")
lines.append(f" Issues skipped: {result.get('skipped', 0)}")
lines.append(f" Total dependencies: {result.get('total_dependencies', 0)}")
lines.append("")
lines.append("[OK] Dependency sync complete")
return "\n".join(lines)
except Exception as e:
logger.error("Dependency sync failed", exc_info=True)
return f"Error: Dependency sync failed: {str(e)}"
def _bd_dep_add(self, args: dict[str, Any]) -> str:
"""Add dependency between issues.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
- depends_on_id: Issue ID that issue_id depends on
- dep_type: Dependency type (blocks, related, etc.)
Returns:
Status message
"""
issue_id = args["issue_id"]
depends_on_id = args["depends_on_id"]
dep_type = args.get("dep_type", "blocks")
try:
# Run bd dep add command
cmd = ["bd", "dep", "add", issue_id, depends_on_id, "--type", dep_type]
result = self._run_bd_command(cmd)
# Sync dependencies to ClickUp if auto-sync enabled
from beads_clickup.sync_engine import SyncEngine
try:
engine = SyncEngine()
if engine.config.get("sync", {}).get("sync_dependencies", True):
dep_result = engine.sync_dependencies_only(issue_id=issue_id)
return (
f"[OK] Added {dep_type} dependency: {issue_id} -> {depends_on_id}\n"
f"Synced {dep_result.get('synced', 0)} dependencies to ClickUp"
)
except Exception as e:
logger.warning("Failed to auto-sync dependency: %s", e)
return f"[OK] Added {dep_type} dependency: {issue_id} -> {depends_on_id}"
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to add dependency: {error_msg}"
except Exception as e:
logger.error("bd dep add failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_dep_remove(self, args: dict[str, Any]) -> str:
"""Remove dependency between issues.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
- depends_on_id: Issue ID to remove dependency from
Returns:
Status message
"""
issue_id = args["issue_id"]
depends_on_id = args["depends_on_id"]
try:
# Run bd dep remove command
cmd = ["bd", "dep", "remove", issue_id, depends_on_id]
result = self._run_bd_command(cmd)
return f"[OK] Removed dependency: {issue_id} -> {depends_on_id}"
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to remove dependency: {error_msg}"
except Exception as e:
logger.error("bd dep remove failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_dep_tree(self, args: dict[str, Any]) -> str:
"""Get dependency tree for an issue.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
Returns:
Formatted dependency tree
"""
issue_id = args["issue_id"]
try:
# Run bd dep tree command
cmd = ["bd", "dep", "tree", issue_id]
result = self._run_bd_command(cmd)
return result.stdout
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to get dependency tree: {error_msg}"
except Exception as e:
logger.error("bd dep tree failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_comment_add(self, args: dict[str, Any]) -> str:
"""Add comment to issue.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
- text: Comment text
- author: Optional author name
Returns:
Status message
"""
issue_id = args["issue_id"]
text = args["text"]
author = args.get("author")
try:
# Build command
cmd = ["bd", "comments", "add", issue_id, text]
if author:
cmd.extend(["--author", author])
result = self._run_bd_command(cmd)
# Sync comment to ClickUp if enabled
from beads_clickup.sync_engine import SyncEngine
try:
# Use the MCP server's project_dir to find the correct config
config_path = self.project_dir / ".beads" / "integrations" / "clickup" / "config.yaml"
engine = SyncEngine(config_path=str(config_path))
if engine.config.get("advanced", {}).get("sync_comments", False):
# Get task ID
state = engine.sync_state.get(issue_id, {})
task_id = state.get("clickup_task_id")
if task_id:
from beads_clickup.comment_sync import CommentSync
comment_sync = CommentSync(engine.project_dir, engine.sync_state)
count = comment_sync.sync_comments_to_clickup(
issue_id, task_id, engine.mcp_client
)
return (
f"[OK] Comment added to {issue_id}\n"
f"Synced {count} comment(s) to ClickUp"
)
except Exception as e:
logger.warning("Failed to auto-sync comment: %s", e)
return f"[OK] Comment added to {issue_id}"
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to add comment: {error_msg}"
except Exception as e:
logger.error("bd comments add failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_comments_list(self, args: dict[str, Any]) -> str:
"""List comments on an issue.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
Returns:
Formatted list of comments
"""
issue_id = args["issue_id"]
try:
cmd = ["bd", "comments", issue_id, "--json"]
result = self._run_bd_command(cmd)
# Parse JSON output
comments = json.loads(result.stdout)
if not comments:
return f"[OK] No comments on {issue_id}"
lines = [f"[Comments on {issue_id}]"]
lines.append("")
for comment in comments:
author = comment.get("author", "unknown")
created = comment.get("created_at", "")
text = comment.get("text", "")
source = comment.get("source", "beads")
lines.append(f"Author: {author}")
lines.append(f"Date: {created}")
lines.append(f"Source: {source}")
lines.append(f"Text: {text}")
lines.append("")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to list comments: {error_msg}"
except Exception as e:
logger.error("bd comments list failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_ready(self, args: dict[str, Any]) -> str:
"""Get work-ready issues (no blockers).
Args:
args: Dictionary with optional keys:
- limit: Maximum issues to show
- priority: Filter by priority
- assignee: Filter by assignee
Returns:
List of ready issues
"""
try:
cmd = ["bd", "ready", "--json"]
if args.get("limit"):
cmd.extend(["--limit", str(args["limit"])])
if args.get("priority"):
cmd.extend(["--priority", args["priority"]])
if args.get("assignee"):
cmd.extend(["--assignee", args["assignee"]])
result = self._run_bd_command(cmd)
# Parse JSON output
issues = json.loads(result.stdout)
if not issues:
return "[OK] No ready issues (all are blocked or none match filters)"
lines = ["[Ready Issues - No Blockers]"]
lines.append("")
for issue in issues:
issue_id = issue.get("id", "")
title = issue.get("title", "")
priority = issue.get("priority", "")
status = issue.get("status", "")
lines.append(f"• {issue_id}: {title}")
lines.append(f" Priority: P{priority}, Status: {status}")
lines.append("")
lines.append(f"Total: {len(issues)} ready issue(s)")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to get ready issues: {error_msg}"
except Exception as e:
logger.error("bd ready failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_blocked(self, args: dict[str, Any]) -> str:
"""Get blocked issues.
Returns:
List of blocked issues
"""
try:
cmd = ["bd", "blocked", "--json"]
result = self._run_bd_command(cmd)
# Parse JSON output
issues = json.loads(result.stdout)
if not issues:
return "[OK] No blocked issues"
lines = ["[Blocked Issues]"]
lines.append("")
for issue in issues:
issue_id = issue.get("id", "")
title = issue.get("title", "")
priority = issue.get("priority", "")
lines.append(f"• {issue_id}: {title}")
lines.append(f" Priority: P{priority}")
# Show what it's blocked by if available
if "blocked_by" in issue:
lines.append(f" Blocked by: {', '.join(issue['blocked_by'])}")
lines.append("")
lines.append(f"Total: {len(issues)} blocked issue(s)")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to get blocked issues: {error_msg}"
except Exception as e:
logger.error("bd blocked failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_stale(self, args: dict[str, Any]) -> str:
"""Get stale issues not updated recently.
Args:
args: Dictionary with optional keys:
- days: Days threshold
- status: Filter by status
- limit: Maximum issues
Returns:
List of stale issues
"""
try:
cmd = ["bd", "stale", "--json"]
if args.get("days"):
cmd.extend(["--days", str(args["days"])])
if args.get("status"):
cmd.extend(["--status", args["status"]])
if args.get("limit"):
cmd.extend(["--limit", str(args["limit"])])
result = self._run_bd_command(cmd)
# Parse JSON output
issues = json.loads(result.stdout)
if not issues:
return "[OK] No stale issues found"
lines = ["[Stale Issues]"]
lines.append("")
for issue in issues:
issue_id = issue.get("id", "")
title = issue.get("title", "")
updated = issue.get("updated_at", "")
status = issue.get("status", "")
lines.append(f"• {issue_id}: {title}")
lines.append(f" Status: {status}, Last updated: {updated}")
lines.append("")
lines.append(f"Total: {len(issues)} stale issue(s)")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to get stale issues: {error_msg}"
except Exception as e:
logger.error("bd stale failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_stats(self, args: dict[str, Any]) -> str:
"""Get project statistics.
Returns:
Project statistics
"""
try:
cmd = ["bd", "stats", "--json"]
result = self._run_bd_command(cmd)
# Parse JSON output
stats = json.loads(result.stdout)
lines = ["[Project Statistics]"]
lines.append("")
lines.append(f"Total issues: {stats.get('total', 0)}")
lines.append("")
# Status breakdown
if "by_status" in stats:
lines.append("By Status:")
for status, count in stats["by_status"].items():
lines.append(f" {status}: {count}")
lines.append("")
# Priority breakdown
if "by_priority" in stats:
lines.append("By Priority:")
for priority, count in stats["by_priority"].items():
lines.append(f" P{priority}: {count}")
lines.append("")
# Type breakdown
if "by_type" in stats:
lines.append("By Type:")
for issue_type, count in stats["by_type"].items():
lines.append(f" {issue_type}: {count}")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to get stats: {error_msg}"
except Exception as e:
logger.error("bd stats failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_label_add(self, args: dict[str, Any]) -> str:
"""Add label to issue.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
- label: Label to add
Returns:
Status message
"""
issue_id = args["issue_id"]
label = args["label"]
try:
cmd = ["bd", "label", "add", issue_id, label]
result = self._run_bd_command(cmd)
# Trigger sync to update ClickUp tags
from beads_clickup.sync_engine import SyncEngine
try:
engine = SyncEngine()
task_id = engine.sync_issue_to_clickup(issue_id, force=True)
if task_id:
return (
f"[OK] Added label '{label}' to {issue_id}\n"
f"Synced to ClickUp task {task_id}"
)
except Exception as e:
logger.warning("Failed to sync after label add: %s", e)
return f"[OK] Added label '{label}' to {issue_id}"
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to add label: {error_msg}"
except Exception as e:
logger.error("bd label add failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_label_remove(self, args: dict[str, Any]) -> str:
"""Remove label from issue.
Args:
args: Dictionary with keys:
- issue_id: Issue ID
- label: Label to remove
Returns:
Status message
"""
issue_id = args["issue_id"]
label = args["label"]
try:
cmd = ["bd", "label", "remove", issue_id, label]
result = self._run_bd_command(cmd)
# Trigger sync to update ClickUp tags
from beads_clickup.sync_engine import SyncEngine
try:
engine = SyncEngine()
task_id = engine.sync_issue_to_clickup(issue_id, force=True)
if task_id:
return (
f"[OK] Removed label '{label}' from {issue_id}\n"
f"Synced to ClickUp task {task_id}"
)
except Exception as e:
logger.warning("Failed to sync after label remove: %s", e)
return f"[OK] Removed label '{label}' from {issue_id}"
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to remove label: {error_msg}"
except Exception as e:
logger.error("bd label remove failed", exc_info=True)
return f"Error: {str(e)}"
def _bd_label_list(self, args: dict[str, Any]) -> str:
"""List labels on issue or all labels.
Args:
args: Dictionary with optional keys:
- issue_id: Issue ID (if omitted, lists all labels)
Returns:
List of labels
"""
issue_id = args.get("issue_id")
try:
if issue_id:
cmd = ["bd", "label", "list", issue_id, "--json"]
else:
cmd = ["bd", "label", "list-all", "--json"]
result = self._run_bd_command(cmd)
# Parse JSON output
labels = json.loads(result.stdout)
if not labels:
return "[OK] No labels found"
if issue_id:
lines = [f"[Labels on {issue_id}]"]
lines.append("")
for label in labels:
lines.append(f" - {label}")
else:
lines = ["[All Labels in Project]"]
lines.append("")
for label in labels:
lines.append(f" - {label}")
return "\n".join(lines)
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
return f"Error: Failed to list labels: {error_msg}"
except Exception as e:
logger.error("bd label list failed", exc_info=True)
return f"Error: {str(e)}"
def _list_available_workspaces(self) -> str:
"""List all available beads workspaces in ~/projects.
Returns:
Formatted list of available workspaces
"""
projects_dir = Path.home() / "projects"
lines = ["[Workspaces]Available beads workspaces:\n"]
# Get current workspace for marking
try:
current = self._find_beads_project_dir()
except ValueError:
current = None
workspaces = []
if projects_dir.exists():
for proj_dir in projects_dir.iterdir():
if proj_dir.is_dir() and self._is_beads_project_dir(proj_dir):
beads_path = proj_dir / ".beads"
try:
mtime = beads_path.stat().st_mtime
# Get issue prefix from config
config_path = proj_dir / ".beads" / "config.yaml"
issue_prefix = None
if config_path.exists():
try:
import yaml
with open(config_path) as f:
config = yaml.safe_load(f)
issue_prefix = config.get("issue-prefix")
except Exception:
pass
workspaces.append(
{
"path": proj_dir,
"mtime": mtime,
"prefix": issue_prefix,
}
)
except OSError:
pass
if not workspaces:
return "No beads workspaces found in ~/projects"
# Sort by modification time, most recent first
workspaces.sort(key=lambda x: x["mtime"], reverse=True)
for ws in workspaces:
is_current = current and str(ws["path"]) == current
marker = " ← current" if is_current else ""
prefix_str = f" (prefix: {ws['prefix']})" if ws["prefix"] else ""
lines.append(f" • {ws['path']}{prefix_str}{marker}")
lines.append("")
lines.append('To switch: bd_switch_workspace(project_dir="/path/to/project")')
return "\n".join(lines)
def _fetch_roots_via_request(self) -> None:
"""Send roots/list request to the client for workspace detection.
Called after the MCP handshake is complete (notifications/initialized received).
The response arrives asynchronously and is handled by the main run() loop
via _process_roots_list_response().
"""
try:
roots_request = {
"jsonrpc": "2.0",
"id": "beads-roots-1",
"method": "roots/list",
"params": {},
}
sys.stdout.write(json.dumps(roots_request) + "\n")
sys.stdout.flush()
logger.debug("Sent roots/list request to client")
except Exception as e:
logger.warning(f"Failed to send roots/list request: {e}")
def _process_roots_list_response(self, response: dict) -> None:
"""Process the roots/list response from the client.
Called by handle_request() when a response to our roots/list request arrives.
"""
roots = response.get("result", {}).get("roots", [])
for root in roots:
uri = root.get("uri", "")
if uri.startswith("file://"):
root_path = Path(unquote(urlparse(uri).path))
if self._is_beads_project_dir(root_path):
self._roots_workspace = str(root_path)
self._refresh_project_dir()
logger.debug(f"Workspace set from roots/list: {root_path}")
break
def run(self):
"""Run the MCP server (stdio transport)."""
while True:
try:
# Read JSON-RPC request from stdin
line = sys.stdin.readline()
if not line:
break
request = json.loads(line)
response = self.handle_request(request)
# Write JSON-RPC response to stdout (skip for notifications)
if response is not None:
response_json = json.dumps(response)
sys.stdout.write(response_json + "\n")
sys.stdout.flush()
# After handshake complete, request workspace roots from client.
# Reset _roots_workspace on every notifications/initialized so that
# switching projects in a persistent MCP server process picks up the
# new workspace instead of staying locked to the original one.
if (
request.get("method") == "notifications/initialized"
and self._client_supports_roots
):
self._roots_workspace = None
# Also clear any manual override so the new session's roots
# detection takes effect rather than a stale override from a
# previous project session.
self._workspace_override = None
self._fetch_roots_via_request()
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
continue
except Exception as e:
logger.error(f"Error processing request: {e}", exc_info=True)
continue