class SyncEngine:
"""Handles bidirectional synchronization between beads and ClickUp."""
def __init__(self, config_path: Optional[str] = None):
"""Initialize sync engine.
Args:
config_path: Path to config.yaml (defaults to .beads/integrations/clickup/config.yaml)
"""
if config_path is None:
# Find .beads directory
beads_dir = self._find_beads_dir()
config_path = beads_dir / "integrations" / "clickup" / "config.yaml"
# Project directory is parent of .beads directory
self.project_dir = beads_dir.parent
else:
# Derive project directory from config_path: {project_dir}/.beads/integrations/clickup/config.yaml
self.project_dir = Path(config_path).parent.parent.parent.parent
self.config_path = Path(config_path)
self.config = self._load_config()
self.clickup_config = self.config.get("clickup", {})
self.sync_config = self.config.get("sync", {})
# Initialize clients
self.mcp_client = ClickUpClient(
api_token=self.clickup_config.get("api_token"),
)
self.field_mapper = FieldMapper(self.config)
self.custom_field_mapper = CustomFieldMapper(self.config)
# Initialize CustomFieldStore
self.custom_field_store = CustomFieldStore(self.project_dir)
# Discover custom field options if possible
self._discover_custom_field_options()
# Sync state file
self.state_file = self.config_path.parent / "sync_state.json"
self.sync_state = self._load_sync_state()
# Lock to serialize concurrent writes to issues.jsonl
self._issues_file_lock = threading.Lock()
# Conflict log file
self.conflict_file = Path(
self.config.get("conflict_resolution", {}).get(
"conflicts_file", self.config_path.parent / "conflicts.jsonl"
)
)
@staticmethod
def _parse_timestamp_utc(ts_value) -> datetime:
"""Parse various timestamp formats into datetime (UTC, naive).
Handles:
- ISO format with/without timezone (converts to UTC)
- Nanosecond precision (truncates to microseconds)
- Unix timestamps in milliseconds
Args:
ts_value: Timestamp as string, int, or datetime
Returns:
datetime object in UTC (naive, no tzinfo)
"""
if isinstance(ts_value, datetime):
if ts_value.tzinfo:
# Convert to UTC and remove tzinfo
from datetime import timezone
return ts_value.astimezone(timezone.utc).replace(tzinfo=None)
return ts_value
if isinstance(ts_value, (int, float)):
# Assume milliseconds timestamp (already UTC)
return datetime.utcfromtimestamp(ts_value / 1000)
if not isinstance(ts_value, str):
raise ValueError(f"Cannot parse timestamp: {ts_value}")
ts_str = ts_value
# Handle ISO format
if "T" in ts_str:
# Handle nanoseconds (Python only supports microseconds)
if "." in ts_str:
parts = ts_str.split(".")
base = parts[0]
frac_and_tz = parts[1]
# Find timezone offset if present
tz_idx = -1
for i, c in enumerate(frac_and_tz):
if c in "+-" and i > 0:
tz_idx = i
break
if tz_idx > 0:
# Has timezone: truncate fraction to 6 digits
frac = frac_and_tz[: min(6, tz_idx)].ljust(6, "0")
tz = frac_and_tz[tz_idx:]
ts_str = f"{base}.{frac}{tz}"
else:
# No timezone: truncate fraction to 6 digits
frac = frac_and_tz[:6].ljust(6, "0")
ts_str = f"{base}.{frac}"
parsed = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
# Convert to UTC naive
if parsed.tzinfo:
from datetime import timezone
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
# No timezone - assume already UTC
return parsed
else:
# No timezone info, assume UTC
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
@staticmethod
def _is_valid_beads_project(beads_dir: Path) -> bool:
"""Check if a .beads directory is a valid beads project (not just global registry)."""
# A valid beads project has at least one of these files
indicators = ["config.yaml", "issues.jsonl", "metadata.json", "beads.db"]
return any((beads_dir / f).exists() for f in indicators)
@staticmethod
def _find_beads_dir() -> Path:
"""Find the .beads directory by searching up from current directory.
Returns:
Path to .beads directory
Raises:
FileNotFoundError: If .beads directory not found
"""
# First, walk up from cwd
current = Path.cwd()
while current != current.parent:
beads_dir = current / ".beads"
if (
beads_dir.exists()
and beads_dir.is_dir()
and SyncEngine._is_valid_beads_project(beads_dir)
):
return beads_dir
current = current.parent
# Fallback: find most recently modified .beads in ~/projects
projects_dir = Path.home() / "projects"
if projects_dir.exists():
beads_dirs = []
for proj_dir in projects_dir.iterdir():
if proj_dir.is_dir():
beads_dir = proj_dir / ".beads"
if beads_dir.exists() and SyncEngine._is_valid_beads_project(
beads_dir
):
try:
mtime = beads_dir.stat().st_mtime
beads_dirs.append((mtime, beads_dir))
except OSError:
pass
if beads_dirs:
beads_dirs.sort(reverse=True)
return beads_dirs[0][1]
raise FileNotFoundError(
".beads directory not found. "
"Run 'bd init' to initialize beads in this repository."
)
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from config.yaml.
Search order:
1. Project-level: .beads/integrations/clickup/config.yaml
2. Global: ~/.config/beads/clickup.yaml
Returns:
Configuration dictionary
"""
# Check project-level config first
config_locations = [
self.config_path,
Path.home() / ".config" / "beads" / "clickup.yaml",
]
config_file = None
for loc in config_locations:
if loc.exists():
config_file = loc
break
if config_file is None:
raise FileNotFoundError(
f"ClickUp configuration not found. Checked:\n"
f" - {self.config_path} (project-level)\n"
f" - ~/.config/beads/clickup.yaml (global)\n"
f"Run 'bd clickup init' or create global config."
)
# Update config_path to actual location used
if config_file != self.config_path:
logger.info(f"Using global ClickUp config: {config_file}")
self.config_path = config_file
try:
import yaml
with open(config_file) as f:
return yaml.safe_load(f) or {}
except ImportError:
logger.error("PyYAML not installed. Install with: pip install pyyaml")
raise
except Exception as e:
logger.error("Failed to load config: %s", e)
raise
def _load_sync_state(self) -> Dict[str, Dict[str, Any]]:
"""Load sync state from file.
Returns:
Dictionary mapping issue IDs to sync metadata
"""
if not self.state_file.exists():
return {}
try:
with open(self.state_file) as f:
return json.load(f)
except Exception as e:
logger.warning("Failed to load sync state: %s", e)
return {}
def _save_sync_state(self):
"""Save sync state to file."""
try:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, "w") as f:
json.dump(self.sync_state, f, indent=2)
except Exception as e:
logger.error("Failed to save sync state: %s", e)
def _discover_custom_field_options(self):
"""Discover custom field options from ClickUp and cache them.
This populates the CustomFieldMapper with dropdown options
so it can convert option names to option IDs.
"""
list_id = self.clickup_config.get("default_list_id")
if not list_id:
logger.debug("No default_list_id configured, skipping field discovery")
return
try:
fields = self.mcp_client.get_list_custom_fields(list_id)
# Build mapping of ClickUp field ID to beads field name
id_to_name = {}
for (
field_name,
field_config,
) in self.custom_field_mapper.custom_fields.items():
field_id = field_config.get("clickup_field_id")
if field_id:
id_to_name[field_id] = field_name
# Cache dropdown options
for field in fields:
field_id = field.get("id")
field_type = field.get("type")
field_name = id_to_name.get(field_id)
if field_name and field_type == "drop_down":
options = field.get("type_config", {}).get("options", [])
option_map = {}
for opt in options:
opt_name = opt.get("name")
opt_id = opt.get("id") or opt.get("orderindex")
if opt_name and opt_id is not None:
option_map[opt_name] = opt_id
if option_map:
self.custom_field_mapper.set_dropdown_options(
field_name, option_map
)
logger.debug(
"Cached %d dropdown options for field '%s'",
len(option_map),
field_name,
)
except Exception as e:
logger.warning("Failed to discover custom field options: %s", e)
def _get_beads_issues(self) -> List[Dict[str, Any]]:
"""Get all beads issues using bd CLI.
Returns:
List of issue dictionaries
"""
try:
# Use bd list --json to get all issues
result = subprocess.run(
["bd", "list", "--json"],
capture_output=True,
text=True,
check=True,
cwd=str(self.project_dir),
)
issues = json.loads(result.stdout)
return issues if isinstance(issues, list) else []
except subprocess.CalledProcessError as e:
logger.error("Failed to get beads issues: %s", e.stderr)
return []
except Exception as e:
logger.error("Failed to parse beads issues: %s", e)
return []
def _get_beads_issue(self, issue_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific beads issue from the database.
Args:
issue_id: Beads issue ID
Returns:
Issue dictionary or None if not found
"""
import subprocess
# Use bd show --json to get fresh data from the database
# This ensures we get the latest labels, status, etc.
try:
result = subprocess.run(
["bd", "show", issue_id, "--json"],
cwd=self.project_dir,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
logger.warning("Failed to get issue %s: %s", issue_id, result.stderr)
return None
if not result.stdout.strip():
return None
data = json.loads(result.stdout)
# bd show --json returns a list
issue = data[0] if isinstance(data, list) else data
return issue
except subprocess.TimeoutExpired:
logger.error("Timeout getting issue %s", issue_id)
return None
except json.JSONDecodeError as e:
logger.warning("Failed to parse issue JSON for %s: %s", issue_id, e)
return None
except Exception as e:
logger.error("Failed to read issue %s: %s", issue_id, e)
return None
def _sync_task_links(
self,
issue_id: str,
clickup_task_id: str,
issue: Dict[str, Any],
changes: List[str],
) -> None:
"""Sync Task Links (neighbor relationships) to ClickUp.
Compares connected_to field in beads with existing ClickUp task links,
and adds/removes links as needed.
Args:
issue_id: Beads issue ID
clickup_task_id: ClickUp task ID
issue: Beads issue dictionary
changes: List to append change descriptions to
"""
logger.info("_sync_task_links called for %s (ClickUp: %s)", issue_id, clickup_task_id)
# Get desired neighbors from beads
issue_custom_fields = issue.get("custom_fields", {})
connected_to = issue_custom_fields.get("connected_to")
logger.debug("connected_to from beads: %s", connected_to)
# Normalize to list
if not connected_to:
desired_neighbors = []
elif isinstance(connected_to, str):
desired_neighbors = [connected_to]
elif isinstance(connected_to, list):
desired_neighbors = connected_to
else:
desired_neighbors = []
# Convert beads issue IDs to ClickUp task IDs
desired_task_ids = set()
logger.debug("Converting %d neighbor IDs to ClickUp task IDs", len(desired_neighbors))
for neighbor_id in desired_neighbors:
# Try sync_state first
neighbor_state = self.sync_state.get(neighbor_id, {})
neighbor_task_id = None
if isinstance(neighbor_state, dict):
neighbor_task_id = neighbor_state.get("clickup_task_id")
# Fallback to external_ref
if not neighbor_task_id:
neighbor_issue = self._get_beads_issue(neighbor_id)
if neighbor_issue:
ext = (neighbor_issue.get("external_ref") or "").strip()
if ext.startswith("clickup-"):
neighbor_task_id = ext.replace("clickup-", "")
if neighbor_task_id:
desired_task_ids.add(neighbor_task_id)
logger.debug("Mapped %s -> %s", neighbor_id, neighbor_task_id)
else:
logger.warning(
"Neighbor issue %s not synced to ClickUp; skipping task link for %s",
neighbor_id,
issue_id,
)
# Get current task links from ClickUp
try:
current_task = self.mcp_client.get_task(clickup_task_id)
linked_tasks = current_task.get("linked_tasks", [])
# ClickUp returns linked_tasks as array of {task_id: "...", link_id: "...", ...}
# Each link appears twice (A->B and B->A), so we collect unique task_ids
current_task_ids = set()
for link in linked_tasks:
if isinstance(link, dict):
link_task_id = link.get("task_id")
link_id = link.get("link_id")
# Skip self-references (same task appearing as both sides)
if link_task_id and link_task_id != clickup_task_id:
current_task_ids.add(link_task_id)
elif link_id and link_id != clickup_task_id:
current_task_ids.add(link_id)
except Exception as e:
logger.warning("Could not fetch current task links: %s", e)
current_task_ids = set()
# Calculate additions and removals
to_add = desired_task_ids - current_task_ids
to_remove = current_task_ids - desired_task_ids
logger.debug(
"Task link diff for %s: desired=%s, current=%s, add=%s, remove=%s",
clickup_task_id,
desired_task_ids,
current_task_ids,
to_add,
to_remove,
)
# Add new links
for neighbor_task_id in to_add:
try:
self.mcp_client.add_task_link(clickup_task_id, neighbor_task_id)
logger.info(
"Added task link: %s → %s", clickup_task_id, neighbor_task_id
)
changes.append(f"**Task Link added:** → `{neighbor_task_id}`")
except Exception as e:
logger.warning(
"Failed to add task link %s → %s: %s",
clickup_task_id,
neighbor_task_id,
e,
)
# Remove obsolete links
for neighbor_task_id in to_remove:
try:
self.mcp_client.delete_task_link(clickup_task_id, neighbor_task_id)
logger.info(
"Removed task link: %s → %s", clickup_task_id, neighbor_task_id
)
changes.append(f"**Task Link removed:** → `{neighbor_task_id}`")
except Exception as e:
logger.warning(
"Failed to remove task link %s → %s: %s",
clickup_task_id,
neighbor_task_id,
e,
)
if to_add or to_remove:
logger.info(
"Synced task links for %s: +%d, -%d",
clickup_task_id,
len(to_add),
len(to_remove),
)
def _extract_commit_pr_links(
self,
issue: Dict[str, Any],
issue_id: str,
) -> str:
"""Extract commit and PR links from issue or git.
Args:
issue: Beads issue dictionary
issue_id: Issue ID
Returns:
Formatted string with links or empty string
"""
links = []
# Check various fields where commit/PR info might be stored
pr_url = (
issue.get("pr_url")
or issue.get("pull_request")
or issue.get("github_pr")
or issue.get("merge_request")
)
if pr_url:
links.append(f"**Pull Request:** {pr_url}")
commit_url = (
issue.get("commit_url")
or issue.get("github_url")
or issue.get("commit_link")
)
if commit_url:
links.append(f"**Commit:** {commit_url}")
# Check for commit hash
commit_hash = issue.get("commit_hash") or issue.get("commit")
if commit_hash and not commit_url:
# Try to construct GitHub URL if we have a hash
github_repo = self._get_github_repo_url()
if github_repo:
commit_url = f"{github_repo}/commit/{commit_hash}"
links.append(f"**Commit:** {commit_url}")
else:
links.append(f"**Commit:** `{commit_hash}`")
# If no explicit links, try to find from git history
if not links:
git_info = self._get_recent_git_commits(issue_id)
if git_info:
links.append(git_info)
return "\n".join(links) if links else ""
def _parse_github_pr_url(self, url: str) -> Optional[tuple]:
"""Parse a GitHub PR URL into (owner, repo, pr_number).
Args:
url: GitHub PR URL, e.g. https://github.com/owner/repo/pull/123
Returns:
Tuple of (owner, repo, pr_number) or None if not a valid GitHub PR URL
"""
import re as _re
match = _re.match(
r"https://github\.com/([^/]+)/([^/]+)/pulls?/(\d+)", url
)
if not match:
return None
return match.group(1), match.group(2), int(match.group(3))
def _check_github_pr_approved(
self,
owner: str,
repo: str,
pr_number: int,
token: Optional[str],
) -> tuple:
"""Check if a GitHub PR has been approved or merged.
First checks the PR's merged state via /pulls/{pr_number}; if merged,
returns True immediately. Otherwise checks /reviews for an APPROVED review.
Args:
owner: GitHub repository owner
repo: GitHub repository name
pr_number: Pull request number
token: GitHub API token, or None for unauthenticated (public repos, 60 req/hr)
Returns:
Tuple of (approved: bool, reviewer: str | None, approved_at: str | None).
approved_at is formatted as YYYY-MM-DD.
"""
import requests as _requests
headers = {"Accept": "application/vnd.github+json"}
if token:
headers["Authorization"] = f"Bearer {token}"
# 1. Check merged state first — a merged PR is always acceptable.
pr_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
pr_response = _requests.get(pr_url, headers=headers, timeout=10)
pr_response.raise_for_status()
pr_data = pr_response.json()
if pr_data.get("merged"):
merger = (pr_data.get("merged_by") or {}).get("login", "unknown")
merged_at_raw = pr_data.get("merged_at", "")
try:
dt = datetime.strptime(merged_at_raw, "%Y-%m-%dT%H:%M:%SZ")
merged_at = dt.strftime("%Y-%m-%d")
except Exception:
merged_at = merged_at_raw
return True, merger, merged_at
# 2. Not yet merged — check for an APPROVED review.
reviews_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}/reviews"
response = _requests.get(reviews_url, headers=headers, timeout=10)
response.raise_for_status()
reviews = response.json()
approved_review = next(
(r for r in reviews if r.get("state") == "APPROVED"),
None,
)
if not approved_review:
return False, None, None
reviewer = approved_review.get("user", {}).get("login", "unknown")
approved_at_raw = approved_review.get("submitted_at", "")
try:
dt = datetime.strptime(approved_at_raw, "%Y-%m-%dT%H:%M:%SZ")
approved_at = dt.strftime("%Y-%m-%d")
except Exception:
approved_at = approved_at_raw
return True, reviewer, approved_at
def _get_github_repo_url(self) -> Optional[str]:
"""Get GitHub repository URL from git remote.
Returns:
GitHub repository URL without .git suffix, or None
"""
try:
result = subprocess.run(
["git", "remote", "get-url", "origin"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
remote_url = result.stdout.strip()
# Convert various formats to HTTPS URL
if remote_url.startswith("git@github.com:"):
# SSH format: git@github.com:user/repo.git
repo_path = remote_url.replace("git@github.com:", "").replace(
".git", ""
)
return f"https://github.com/{repo_path}"
elif "github.com" in remote_url:
# HTTPS format: https://github.com/user/repo.git
return remote_url.replace(".git", "")
return None
except Exception as e:
logger.debug("Could not get GitHub repo URL: %s", e)
return None
def _get_recent_git_commits(self, issue_id: str) -> Optional[str]:
"""Try to find recent git commits mentioning this issue.
Args:
issue_id: Issue ID to search for
Returns:
Formatted commit info or None
"""
try:
# Search git log for commits mentioning this issue ID
result = subprocess.run(
["git", "log", "--all", "--grep", issue_id, "-1", "--format=%H %s"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
if result.stdout.strip():
commit_info = result.stdout.strip()
commit_hash = commit_info.split()[0][:7] # Short hash
commit_msg = " ".join(commit_info.split()[1:])
github_repo = self._get_github_repo_url()
if github_repo:
commit_url = f"{github_repo}/commit/{commit_hash}"
return f"**Related Commit:** [{commit_hash}]({commit_url}) - {commit_msg}"
else:
return f"**Related Commit:** `{commit_hash}` - {commit_msg}"
return None
except Exception as e:
logger.debug("Could not search git commits: %s", e)
return None
def _update_beads_issue(
self,
issue_id: str,
updates: Dict[str, Any],
) -> bool:
"""Update a beads issue by directly modifying issues.jsonl.
This method preserves custom_fields that bd CLI would otherwise wipe.
Args:
issue_id: Beads issue ID
updates: Fields to update
Returns:
True if successful
"""
try:
issues_file = self.project_dir / ".beads" / "issues.jsonl"
if not issues_file.exists():
logger.error("Issues file not found: %s", issues_file)
return False
with self._issues_file_lock:
# Read all issues
issues = []
issue_found = False
with open(issues_file) as f:
for line in f:
if not line.strip():
continue
issue = json.loads(line)
# Update the target issue
if issue.get("id") == issue_id:
issue_found = True
# Preserve custom_fields before applying updates
preserved_custom_fields = issue.get("custom_fields", {})
# Apply updates
for key, value in updates.items():
issue[key] = value
# Restore custom_fields (they should not be overwritten by ClickUp sync)
if preserved_custom_fields:
issue["custom_fields"] = preserved_custom_fields
# Update timestamp (RFC3339 with timezone required by beads daemon)
issue["updated_at"] = datetime.utcnow().isoformat() + "Z"
issues.append(issue)
if not issue_found:
logger.error("Issue %s not found in issues.jsonl", issue_id)
return False
logger.info("Updated beads issue %s (preserved custom_fields)", issue_id)
# Write DB first (source of truth for daemon) — SQLite write is atomic.
# _issues_file_lock serialises JSONL reads/writes; SQLite manages its
# own concurrency via WAL locking on the connection level.
db_file = self.project_dir / ".beads" / "beads.db"
if db_file.exists():
if not self._update_beads_db(issue_id, updates):
return False # DB failed — do not write JSONL
# Write JSONL second (sync artifact) — non-fatal if it fails
try:
with open(issues_file, "w") as f:
for issue in issues:
f.write(json.dumps(issue) + "\n")
except Exception as jsonl_err:
logger.warning(
"JSONL write failed for %s (DB already updated): %s",
issue_id, jsonl_err,
)
# Don't raise — DB is source of truth; daemon auto-flush will re-sync JSONL
return True
return True
except Exception as e:
logger.error("Failed to update beads issue %s: %s", issue_id, e, exc_info=True)
return False
def _update_beads_db(self, issue_id: str, updates: Dict[str, Any]) -> bool:
"""Update beads.db to keep it in sync with issues.jsonl changes.
The beads daemon uses beads.db as the source of truth, so writes to
issues.jsonl alone are not reflected by `bd show` or other CLI commands.
Args:
issue_id: Beads issue ID
updates: Fields to update (same dict passed to _update_beads_issue)
"""
db_file = self.project_dir / ".beads" / "beads.db"
set_clauses = []
params = []
if "title" in updates:
set_clauses.append("title = ?")
params.append(updates["title"])
if "description" in updates:
set_clauses.append("description = ?")
params.append(updates["description"])
if "status" in updates:
new_status = updates["status"]
set_clauses.append("status = ?")
params.append(new_status)
if new_status == "closed":
set_clauses.append("closed_at = ?")
params.append(datetime.utcnow().isoformat() + "Z")
else:
# The db CHECK requires closed_at IS NULL when status != 'closed'
set_clauses.append("closed_at = ?")
params.append(None)
if "priority" in updates:
priority = updates["priority"]
if isinstance(priority, str):
priority = int(priority)
set_clauses.append("priority = ?")
params.append(priority)
if "external_ref" in updates:
set_clauses.append("external_ref = ?")
params.append(updates["external_ref"])
if not set_clauses:
return True # Nothing to update — treat as success
set_clauses.append("updated_at = ?")
params.append(datetime.utcnow().isoformat() + "Z")
params.append(issue_id)
try:
with sqlite3.connect(str(db_file)) as conn:
conn.execute(
f"UPDATE issues SET {', '.join(set_clauses)} WHERE id = ?",
params,
)
conn.commit()
logger.info("Updated beads.db for issue %s", issue_id)
return True
except Exception as e:
logger.error("Failed to update beads.db for issue %s: %s", issue_id, e)
return False
def _get_clickup_status_for_sync(self, beads_status: str, issue_id: str) -> str:
"""Get ClickUp status for syncing with smart preservation.
If a stored ClickUp status exists and its category matches the current
beads status, preserve the original. Otherwise, use config mapping.
Args:
beads_status: Current beads status
issue_id: Beads issue ID
Returns:
ClickUp status string
"""
# Check for stored ClickUp status
cf_data = self.custom_field_store.get(issue_id)
if cf_data:
clickup_metadata = cf_data.get("clickup_metadata", {})
stored_status = clickup_metadata.get("clickup_status")
stored_category = clickup_metadata.get("clickup_status_category")
# Smart preservation: use stored if category matches
if stored_status and stored_category == beads_status:
return stored_status
# Fallback: use config mapping
return self.field_mapper.beads_to_clickup_status(beads_status)
def _issue_has_pr_link(
self, issue: Dict[str, Any], issue_id: str
) -> Optional[str]:
"""Return the pr_link value if set on this issue, else None.
Checks custom_field_store first (set by bd_update with pr_link),
then the issue dict itself.
Args:
issue: Beads issue dictionary
issue_id: Beads issue ID
Returns:
PR link URL string if found, None otherwise
"""
# Check custom field store (primary storage for pr_link)
cf_data = self.custom_field_store.get(issue_id)
if cf_data:
pr_link = cf_data.get("custom_fields", {}).get("pr_link")
if pr_link:
return pr_link
# Check issue dict directly
pr_link = (
issue.get("custom_fields", {}).get("pr_link")
or issue.get("pr_link")
or issue.get("pr_url")
)
return pr_link or None
def sync_issue_to_clickup(
self,
issue_id: str,
force: bool = False,
parent_task_id: Optional[str] = None,
sync_dependencies: bool = True,
) -> Optional[str]:
"""Sync a beads issue to ClickUp (create or update).
Args:
issue_id: Beads issue ID
force: Force sync even if filters don't match
parent_task_id: Optional parent task ID (makes this a subtask)
sync_dependencies: Whether to sync dependencies after task sync
Returns:
ClickUp task ID if successful, None otherwise
"""
# Get issue from beads
issue = self._get_beads_issue(issue_id)
if not issue:
logger.error("Issue %s not found", issue_id)
return None
# Check if issue should be synced
if not force and not self.field_mapper.should_sync_issue(issue):
logger.info("Issue %s filtered out by sync rules", issue_id)
return None
# Get sync state for this issue
state = self.sync_state.get(issue_id, {})
clickup_task_id = state.get("clickup_task_id")
# Also check external-ref field in beads
if not clickup_task_id and self.field_mapper.use_external_ref:
external_ref = issue.get("external_ref", "")
if external_ref and external_ref.startswith("clickup-"):
clickup_task_id = external_ref.replace("clickup-", "")
# Get default list ID
list_id = self.clickup_config.get("default_list_id")
if not list_id:
logger.error("default_list_id not configured in config.yaml")
return None
# Initialize variables for custom field tracking
custom_fields_from_store = {}
merged_fields = {}
try:
# Verify stored task still exists; clear stale ID if deleted
current_task = {}
if clickup_task_id:
try:
current_task = self.mcp_client.get_task(clickup_task_id)
except ResourceNotFoundError:
logger.warning(
"ClickUp task %s no longer exists (deleted); "
"clearing stale sync state for %s and re-creating",
clickup_task_id,
issue_id,
)
if isinstance(self.sync_state.get(issue_id), dict):
self.sync_state[issue_id].pop("clickup_task_id", None)
clickup_task_id = None
except Exception as e:
logger.warning("Could not fetch current task state: %s", e)
if clickup_task_id:
# Update existing task
logger.info(
"Updating ClickUp task %s from beads issue %s",
clickup_task_id,
issue_id,
)
update_args = {
"task_id": clickup_task_id,
}
# Track changes for comment
changes = []
# Map fields
if "title" in issue:
new_title = issue["title"]
old_title = current_task.get("name", "")
if new_title != old_title and old_title:
changes.append(
f"**Title changed:** `{old_title}` → `{new_title}`"
)
update_args["name"] = new_title
if "description" in issue:
description = issue["description"]
description += f"\n\n---\n*Synced from beads issue: {issue_id}*"
update_args["description"] = description
if "status" in issue:
status = self._get_clickup_status_for_sync(issue["status"], issue_id)
if status:
# Check if status changed
old_status = current_task.get("status", {}).get("status", "")
beads_status = issue["status"].lower()
if status != old_status and old_status:
changes.append(
f"**Status moved:** `{old_status}` → `{status}`"
)
# Auto-set start date when moved to in_progress
if beads_status == "in_progress":
# Only set if not already set
if not current_task.get("start_date"):
# Unix timestamp in milliseconds
start_timestamp = int(
datetime.utcnow().timestamp() * 1000
)
update_args["start_date"] = start_timestamp
changes.append("**Start date set:** Started work")
# Auto-set due date when moved to closed/completed
if beads_status in ["closed", "completed", "cancelled"]:
# Unix timestamp in milliseconds
due_timestamp = int(
datetime.utcnow().timestamp() * 1000
)
update_args["due_date"] = due_timestamp
changes.append("**End date set:** Task completed")
update_args["status"] = status
# Override to PENDING REVIEW if in_progress and pr_link is set
if beads_status == "in_progress":
pr_link = self._issue_has_pr_link(issue, issue_id)
if pr_link:
pr_status = self.field_mapper.beads_to_clickup_status(
"pending_review"
)
if pr_status:
update_args["status"] = pr_status
changes.append(
f"**Status override:** → `{pr_status}` (pr_link assigned)"
)
if "priority" in issue:
priority = self.field_mapper.beads_to_clickup_priority(
issue["priority"]
)
if priority is not None:
# Check if priority changed
old_priority_obj = current_task.get("priority")
# ClickUp returns priority as dict or None, extract ID if dict
old_priority = None
if old_priority_obj:
if isinstance(old_priority_obj, dict):
# Priority is returned as {"id": "1", "priority": "urgent", ...}
old_priority = int(old_priority_obj.get("id", 0))
else:
old_priority = old_priority_obj
if old_priority and priority != old_priority:
priority_names = {
1: "Urgent",
2: "High",
3: "Normal",
4: "Low",
}
old_name = priority_names.get(
old_priority, str(old_priority)
)
new_name = priority_names.get(priority, str(priority))
changes.append(
f"**Priority changed:** `{old_name}` → `{new_name}`"
)
update_args["priority"] = priority
if self.field_mapper.labels_to_tags and "labels" in issue:
new_tags = issue["labels"]
old_tags = [
tag.get("name", "") for tag in current_task.get("tags", [])
]
logger.info("Label sync: new_tags=%s, old_tags=%s", new_tags, old_tags)
if set(new_tags) != set(old_tags) and old_tags:
changes.append(
f"**Tags updated:** `{', '.join(old_tags)}` → `{', '.join(new_tags)}`"
)
update_args["tags"] = new_tags
logger.info("Added tags to update_args: %s", new_tags)
# Apply default standard fields from config if not already set
clickup_defaults = self.clickup_config.get("defaults", {})
# Default assignee (only if not already assigned)
current_assignees = current_task.get("assignees", [])
if not current_assignees:
default_assignee = clickup_defaults.get("assignee")
if default_assignee:
update_args["assignees"] = [str(default_assignee)]
logger.debug("Applying default assignee: %s", default_assignee)
# Default time estimate (only if not already set)
if not current_task.get("time_estimate"):
default_time_estimate = clickup_defaults.get("time_estimate")
if default_time_estimate:
update_args["time_estimate"] = default_time_estimate
logger.debug(
"Applying default time estimate: %s", default_time_estimate
)
# Auto start date (only if not already set)
if not current_task.get("start_date"):
if clickup_defaults.get("auto_start_date", False):
update_args["start_date"] = int(
datetime.utcnow().timestamp() * 1000
)
logger.debug("Applying auto start date")
# Auto due date (only if not already set)
if not current_task.get("due_date"):
auto_due_days = clickup_defaults.get("auto_due_date_days")
if auto_due_days:
from datetime import timedelta
due_date = datetime.utcnow() + timedelta(days=auto_due_days)
update_args["due_date"] = int(due_date.timestamp() * 1000)
logger.debug("Applying auto due date: %s days", auto_due_days)
# Update parent relationship if provided
if parent_task_id is not None:
current_parent = current_task.get("parent")
current_parent_id = (
current_parent.get("id")
if isinstance(current_parent, dict)
else current_parent
)
if current_parent_id != parent_task_id:
update_args["parent"] = parent_task_id
if current_parent_id:
changes.append(
f"**Parent changed:** `{current_parent_id}` → `{parent_task_id}`"
)
else:
changes.append(
f"**Parent set:** Task assigned as subtask of `{parent_task_id}`"
)
# Build custom fields for bulk update
try:
# CUSTOM FIELDS: Read from CustomFieldStore
cf_data = self.custom_field_store.get(issue_id)
custom_fields_from_store = cf_data.get("custom_fields", {}) if cf_data else {}
# Use intelligent field inference
from beads_clickup.field_inference import get_smart_defaults
# Get inferred values based on issue content
inferred_fields = get_smart_defaults(issue, self.config)
# Do not set lifecycle_stage from inference; only if already on issue (e.g. from bd_assign_lifecycle)
inferred_fields.pop("lifecycle_stage", None)
# Merge: required defaults < inferred fields < custom fields from store < issue fields
required_defaults = (
self.custom_field_mapper.get_required_fields_with_defaults()
)
issue_custom_fields = issue.get("custom_fields", {})
merged_fields = {
**required_defaults,
**inferred_fields,
**custom_fields_from_store,
**issue_custom_fields,
}
# Convert to ClickUp custom_fields format
custom_fields_list = (
self.custom_field_mapper.beads_custom_fields_to_clickup(
merged_fields
)
)
if custom_fields_list:
update_args["custom_fields"] = custom_fields_list
logger.debug(
"Including %d custom fields in update for task %s (from store: %s, inferred: %s)",
len(custom_fields_list),
clickup_task_id,
list(custom_fields_from_store.keys()),
list(inferred_fields.keys()),
)
except Exception as e:
logger.warning("Failed to prepare custom fields for update: %s", e)
# When moving to PENDING REVIEW, auto-set reviewer = assignees BEFORE
# the status change so ClickUp's mandatory-field validation passes.
if update_args.get("status") == self.field_mapper.beads_to_clickup_status(
"pending_review"
):
try:
assignee_ids = [
a["id"]
for a in current_task.get("assignees", [])
if isinstance(a, dict) and "id" in a
]
reviewer_field_id = None
for fname, fconf in self.custom_field_mapper.custom_fields.items():
if fname.lstrip("0123456789").lstrip("_") == "reviewer":
reviewer_field_id = fconf.get("clickup_field_id")
break
if reviewer_field_id and assignee_ids:
self.mcp_client.set_custom_field(
clickup_task_id,
reviewer_field_id,
{"add": assignee_ids},
)
logger.info(
"Auto-set reviewer to assignees %s for PENDING REVIEW task %s",
assignee_ids,
clickup_task_id,
)
except Exception as e:
logger.warning("Failed to auto-set reviewer field: %s", e)
result = self.mcp_client.update_task(**update_args)
# Set custom fields individually (some field types don't work in bulk update)
# Specifically: users, text fields may need individual updates
try:
for field in custom_fields_list:
field_id = field["id"]
value = field["value"]
# Find field config to check type
field_config = None
for (
fname,
fconf,
) in self.custom_field_mapper.custom_fields.items():
if fconf.get("clickup_field_id") == field_id:
field_config = fconf
break
# Set individually for types where bulk update doesn't work:
# - users/text/short_text: known ClickUp bulk-update limitations
# - drop_down: bulk PUT ignores UUID values; individual POST works
# - url: bulk PUT silently ignores URL fields; individual POST works
if field_config and field_config.get("type") in [
"users",
"text",
"short_text",
"drop_down",
"url",
]:
# Users fields need special {"add": [ids]} format for set_custom_field
if field_config.get("type") == "users":
if isinstance(value, list):
value = {"add": value}
else:
value = {"add": [value]}
self.mcp_client.set_custom_field(
clickup_task_id, field_id, value
)
logger.debug(
"Set custom field individually (type: %s, field_id: %s)",
field_config.get("type"),
field_id,
)
except Exception as e:
logger.warning("Failed to set individual custom fields: %s", e)
# Sync Task Links (neighbor relationships from connected_to)
try:
logger.debug("Starting task link sync for %s", clickup_task_id)
self._sync_task_links(issue_id, clickup_task_id, issue, changes)
except Exception as e:
logger.error(
"Failed to sync task links for %s: %s", clickup_task_id, e, exc_info=True
)
# Add comment about changes if any were detected
if changes:
try:
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
comment_text = (
f"**Ticket Updated** ({timestamp})\n\n"
+ "\n".join(changes)
)
# Add close reason and links if issue was closed
if issue.get("status") in ["closed", "completed", "cancelled"]:
# Try multiple possible fields where close reason might be stored
close_reason = (
issue.get("close_reason")
or issue.get("resolution")
or issue.get("reason")
or issue.get("notes")
)
if close_reason:
comment_text += f"\n\n**Close Reason:** {close_reason}"
# Add commit/PR links
commit_links = self._extract_commit_pr_links(
issue, issue_id
)
if commit_links:
comment_text += f"\n\n{commit_links}"
else:
# Log available fields for debugging
logger.debug(
"No commit/PR links found. Available fields: %s",
list(issue.keys()),
)
comment_text += f"\n\n*Synced from beads issue: {issue_id}*"
self.mcp_client.add_comment(clickup_task_id, comment_text)
logger.info(
"Added update comment to ClickUp task %s", clickup_task_id
)
except Exception as e:
logger.warning("Failed to add comment to task: %s", e)
else:
# Create new task
logger.info("Creating ClickUp task from beads issue %s", issue_id)
# Read custom fields from store and add to issue before creating
cf_data = self.custom_field_store.get(issue_id)
custom_fields_from_store = cf_data.get("custom_fields", {}) if cf_data else {}
# Merge custom fields from store into issue for task creation
issue_cf = issue.get("custom_fields", {})
merged_fields = {
**custom_fields_from_store,
**issue_cf
}
if merged_fields:
issue_with_cf = dict(issue)
issue_with_cf["custom_fields"] = merged_fields
logger.debug(
"Including custom fields from store for new task: %s",
list(custom_fields_from_store.keys())
)
else:
issue_with_cf = issue
task_args = self.field_mapper.beads_to_clickup_task(
issue_with_cf, list_id, parent_task_id, self.custom_field_mapper
)
result = self.mcp_client.create_task(**task_args)
clickup_task_id = result.get("id")
# Store task ID in beads external-ref
if clickup_task_id and self.field_mapper.use_external_ref:
subprocess.run(
[
"bd",
"update",
issue_id,
"--external-ref",
f"clickup-{clickup_task_id}",
],
capture_output=True,
check=False,
cwd=str(self.project_dir),
)
# Always store clickup_metadata after creating a new ClickUp task.
# The shared block below is guarded by `merged_fields` being non-empty,
# which is false for new issues with no custom fields — so without this
# unconditional write, custom_fields.jsonl has no entry for the new task
# and subsequent operations (e.g. bd_custom_fields, next sync) can't
# find the ClickUp task_id there.
if clickup_task_id:
try:
self.custom_field_store.set(
issue_id,
custom_fields=merged_fields,
clickup_metadata={
"task_id": clickup_task_id,
"task_url": f"https://app.clickup.com/t/{clickup_task_id}",
"last_synced_at": datetime.utcnow().isoformat(),
"last_synced_from": "beads",
"clickup_status": "",
"clickup_status_category": issue.get("status", ""),
},
)
logger.debug(
"Stored clickup_metadata for new task %s (issue %s)",
clickup_task_id,
issue_id,
)
except Exception as e:
logger.warning(
"Failed to store clickup_metadata for new task %s: %s",
issue_id,
e,
)
# Update sync state
self.sync_state[issue_id] = {
"clickup_task_id": clickup_task_id,
"last_synced": datetime.utcnow().isoformat(),
"last_beads_update": datetime.utcnow().isoformat(),
"sync_direction": "beads-to-clickup",
"parent_task_id": parent_task_id,
}
self._save_sync_state()
# Update clickup_metadata in custom_fields.jsonl (only if we have custom fields
# or we applied a PENDING REVIEW override so subsequent syncs preserve the status)
_pending_review_status = self.field_mapper.beads_to_clickup_status("pending_review")
_applied_pending_review = (
_pending_review_status is not None
and "update_args" in locals()
and update_args.get("status") == _pending_review_status
)
if custom_fields_from_store or merged_fields or _applied_pending_review:
try:
self.custom_field_store.set(
issue_id,
custom_fields=merged_fields,
clickup_metadata={
"task_id": clickup_task_id,
"task_url": f"https://app.clickup.com/t/{clickup_task_id}",
"last_synced_at": datetime.utcnow().isoformat(),
"last_synced_from": "beads",
"clickup_status": update_args.get("status", ""),
# Beads status (not ClickUp status) — used by
# _get_clickup_status_for_sync for smart preservation:
# stored ClickUp status is preserved when category == beads_status.
# For PENDING REVIEW override: clickup_status="PENDING REVIEW",
# category="in_progress" → preserved on next sync since
# beads stays in_progress.
"clickup_status_category": issue.get("status", ""),
}
)
logger.debug(
"Updated custom field metadata for issue %s in CustomFieldStore",
issue_id
)
except Exception as e:
logger.warning(
"Failed to update custom field metadata for %s: %s",
issue_id,
e
)
logger.info(
"Successfully synced issue %s to ClickUp task %s",
issue_id,
clickup_task_id,
)
# Sync dependencies if enabled
if sync_dependencies and self.config.get("sync", {}).get("sync_dependencies", True):
try:
from beads_clickup.dependency_sync import DependencySync
dep_sync = DependencySync(self)
dep_result = dep_sync.sync_issue_dependencies(issue_id)
if dep_result.synced > 0:
logger.info(
"Synced %d dependencies for issue %s",
dep_result.synced,
issue_id,
)
if dep_result.errors:
for error in dep_result.errors:
logger.warning("Dependency sync error: %s", error)
except Exception as e:
logger.warning("Failed to sync dependencies for %s: %s", issue_id, e)
# Sync comments if enabled
if self.config.get("advanced", {}).get("sync_comments", True):
try:
from beads_clickup.comment_sync import CommentSync
comment_sync = CommentSync(self.project_dir, self.sync_state)
count = comment_sync.sync_comments_to_clickup(
issue_id, clickup_task_id, self.mcp_client
)
if count > 0:
logger.info(
"Synced %d comments for issue %s",
count,
issue_id,
)
except Exception as e:
logger.warning("Failed to sync comments for %s: %s", issue_id, e)
return clickup_task_id
except Exception as e:
logger.error(
"Failed to sync issue %s to ClickUp: %s", issue_id, e, exc_info=True
)
return None
def sync_task_to_beads(
self,
task_id: str,
issue_id: Optional[str] = None,
) -> Optional[str]:
"""Sync a ClickUp task to beads (update existing issue).
Args:
task_id: ClickUp task ID
issue_id: Beads issue ID (if known, otherwise try to find it)
Returns:
Beads issue ID if successful, None otherwise
"""
try:
# Get task from ClickUp
task = self.mcp_client.get_task(task_id)
# Find corresponding beads issue if not provided
if not issue_id:
# Search sync state
for bid, state in self.sync_state.items():
# Skip non-issue entries (like 'last_sync', 'issues', 'tasks')
if not isinstance(state, dict):
continue
if state.get("clickup_task_id") == task_id:
issue_id = bid
break
# If still not found, try to extract from custom field
if not issue_id:
issue_update = self.field_mapper.clickup_to_beads_issue(task)
issue_id = issue_update.get("_beads_id")
if not issue_id:
logger.warning(
"Could not find beads issue for ClickUp task %s", task_id
)
return None
# Convert task to beads issue updates
issue_updates = self.field_mapper.clickup_to_beads_issue(task)
# Remove internal fields
issue_updates.pop("_beads_id", None)
# Extract ClickUp status for metadata storage
status_obj = task.get("status", {})
clickup_status = (
status_obj.get("status", "")
if isinstance(status_obj, dict)
else str(status_obj)
)
beads_status = self.field_mapper.clickup_to_beads_status(clickup_status)
# Convert ClickUp custom fields to beads format
clickup_custom_fields = task.get("custom_fields", [])
beads_custom_fields = self.custom_field_mapper.clickup_custom_fields_to_beads(
clickup_custom_fields
)
if not issue_updates:
logger.info("No fields to update for issue %s", issue_id)
# Still need to store custom fields and metadata even if no core updates
self.custom_field_store.set(
issue_id,
custom_fields=beads_custom_fields,
clickup_metadata={
"task_id": task_id,
"task_url": f"https://app.clickup.com/t/{task_id}",
"clickup_status": clickup_status,
"clickup_status_category": beads_status,
"last_synced_at": datetime.utcnow().isoformat(),
"last_synced_from": "clickup"
}
)
if beads_custom_fields:
logger.info("Stored %d custom fields from ClickUp for %s", len(beads_custom_fields), issue_id)
return issue_id
# Update beads issue
logger.info(
"Updating beads issue %s from ClickUp task %s", issue_id, task_id
)
success = self._update_beads_issue(issue_id, issue_updates)
if success:
# Store custom fields and metadata in CustomFieldStore (separate from issues.jsonl)
self.custom_field_store.set(
issue_id,
custom_fields=beads_custom_fields,
clickup_metadata={
"task_id": task_id,
"task_url": f"https://app.clickup.com/t/{task_id}",
"clickup_status": clickup_status,
"clickup_status_category": beads_status,
"last_synced_at": datetime.utcnow().isoformat(),
"last_synced_from": "clickup"
}
)
if beads_custom_fields:
logger.info("Stored %d custom fields from ClickUp for %s", len(beads_custom_fields), issue_id)
# Update sync state
if issue_id not in self.sync_state:
self.sync_state[issue_id] = {}
self.sync_state[issue_id].update(
{
"clickup_task_id": task_id,
"last_synced": datetime.utcnow().isoformat(),
"last_clickup_update": datetime.utcnow().isoformat(),
"sync_direction": "clickup-to-beads",
}
)
self._save_sync_state()
logger.info(
"Successfully synced ClickUp task %s to beads issue %s",
task_id,
issue_id,
)
return issue_id
return None
except Exception as e:
logger.error(
"Failed to sync task %s to beads: %s", task_id, e, exc_info=True
)
return None
def sync_dependencies_only(
self, issue_id: Optional[str] = None, direction: str = "bidirectional"
) -> Dict[str, Any]:
"""Sync only dependencies (no issue fields).
Args:
issue_id: Optional specific issue ID, or None to sync all
direction: "bidirectional", "to-clickup", or "from-clickup"
Returns:
Dictionary with sync statistics
"""
from beads_clickup.dependency_sync import DependencySync
dep_sync = DependencySync(self)
if issue_id:
result = dep_sync.sync_issue_dependencies(issue_id, direction=direction)
return {
"issue_id": issue_id,
"synced": result.synced,
"failed": result.failed,
"skipped": result.skipped,
"errors": result.errors,
}
else:
stats = dep_sync.sync_all_dependencies()
return {
"total_issues": stats.total_issues,
"synced": stats.synced,
"failed": stats.failed,
"skipped": stats.skipped,
"total_dependencies": stats.total_dependencies,
}
def sync_all(
self, full_sync: bool = False, force_import: bool = False
) -> Dict[str, Any]:
"""Perform bidirectional sync.
Args:
full_sync: If True, sync all issues. If False, only sync changed issues (faster).
force_import: If True, import ALL ClickUp tasks including those with beads-automation tag
(useful for importing tasks created by beads in other projects)
Returns:
Dictionary with sync statistics
"""
stats = {
"beads_to_clickup": {"created": 0, "updated": 0, "failed": 0},
"clickup_to_beads": {"updated": 0, "failed": 0},
"conflicts": 0,
"mode": "full" if full_sync else "incremental",
}
# Capture timestamp at START of sync (before beads→ClickUp updates tasks)
# This ensures ClickUp→beads doesn't re-fetch tasks we just updated
sync_start_timestamp = datetime.utcnow().isoformat()
direction = self.sync_config.get("direction", "bidirectional")
# Sync beads → ClickUp
if direction in ["beads-to-clickup", "bidirectional"]:
if self.sync_config.get("auto_create") or self.sync_config.get(
"auto_update"
):
mode_str = "full" if full_sync else "incremental"
logger.info("Syncing beads issues to ClickUp (%s)...", mode_str)
beads_stats = self._sync_beads_to_clickup(full_sync=full_sync)
stats["beads_to_clickup"] = beads_stats
# Sync ClickUp → beads
if direction in ["clickup-to-beads", "bidirectional"]:
# Default to True for bidirectional sync
auto_sync = self.sync_config.get(
"auto_sync_from_clickup", direction == "bidirectional"
)
if auto_sync:
logger.info("Syncing ClickUp tasks to beads...")
clickup_stats = self._sync_clickup_to_beads(
full_sync=full_sync, force_import=force_import
)
stats["clickup_to_beads"] = clickup_stats
# Update last sync timestamp at the END (using start time to avoid re-fetching tasks we just updated)
self.sync_state["_last_clickup_sync"] = sync_start_timestamp
self._save_sync_state()
# Sync dependencies after all issues are synced
if self.config.get("sync", {}).get("sync_dependencies", True):
logger.info("Syncing dependencies...")
try:
dep_stats = self.sync_dependencies_only()
stats["dependencies_synced"] = dep_stats.get("synced", 0)
stats["dependencies_failed"] = dep_stats.get("failed", 0)
stats["total_dependencies"] = dep_stats.get("total_dependencies", 0)
logger.info(
"Dependency sync complete: %d dependencies synced",
dep_stats.get("total_dependencies", 0),
)
except Exception as e:
logger.warning("Failed to sync dependencies: %s", e)
stats["dependencies_failed"] = 1
# Poll GitHub PRs for PENDING REVIEW → COMPLETE transitions
try:
pr_stats = self.poll_pending_review_prs()
if pr_stats.get("approved", 0) > 0:
logger.info(
"GitHub PR poll: %d approved, %d polled",
pr_stats["approved"],
pr_stats["polled"],
)
stats["prs_polled"] = pr_stats.get("polled", 0)
stats["prs_approved"] = pr_stats.get("approved", 0)
except Exception as e:
logger.warning("GitHub PR polling failed: %s", e)
return stats
def _sync_beads_to_clickup(
self, full_sync: bool = False, max_workers: int = 5
) -> Dict[str, int]:
"""Sync beads issues to ClickUp.
Args:
full_sync: If True, sync all issues. If False, only sync changed issues.
max_workers: Maximum number of concurrent API calls (default: 5)
Returns:
Statistics dictionary
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
stats = {"created": 0, "updated": 0, "failed": 0, "skipped": 0, "unchanged": 0}
issues = self._get_beads_issues()
# Filter and prepare issues for sync
issues_to_sync = []
for issue in issues:
issue_id = issue.get("id", "")
if not issue_id:
continue
# Check filters
if not self.field_mapper.should_sync_issue(issue):
stats["skipped"] += 1
continue
# Check if already synced
state = self.sync_state.get(issue_id, {})
clickup_task_id = state.get("clickup_task_id")
# Incremental sync: skip unchanged issues
if not full_sync and clickup_task_id:
# Get issue's updated timestamp (bd uses updated_at)
issue_updated = (
issue.get("updated_at")
or issue.get("updated")
or issue.get("modified")
)
last_synced = state.get("last_synced")
if issue_updated and last_synced:
# Parse timestamps for comparison (both converted to UTC)
try:
issue_ts_utc = self._parse_timestamp_utc(issue_updated)
sync_ts_utc = self._parse_timestamp_utc(last_synced)
# Skip if issue hasn't changed since last sync
if issue_ts_utc <= sync_ts_utc:
stats["unchanged"] += 1
logger.debug("Skipping unchanged issue %s", issue_id)
continue
except (ValueError, TypeError) as e:
logger.debug(
"Could not compare timestamps for %s: %s", issue_id, e
)
# Continue with sync if timestamp comparison fails
# Add to sync list with metadata
issues_to_sync.append(
{
"issue_id": issue_id,
"is_update": bool(clickup_task_id),
}
)
if not issues_to_sync:
return stats
def sync_single_issue(item: Dict) -> Dict[str, Any]:
"""Sync a single issue and return result."""
issue_id = item["issue_id"]
is_update = item["is_update"]
try:
task_id = self.sync_issue_to_clickup(issue_id, force=True)
if task_id:
return {
"status": "updated" if is_update else "created",
"issue_id": issue_id,
}
else:
return {"status": "failed", "issue_id": issue_id}
except Exception as e:
logger.error("Error syncing issue %s: %s", issue_id, e)
return {"status": "failed", "issue_id": issue_id, "error": str(e)}
# Process issues concurrently
logger.info(
"Syncing %d issues to ClickUp (max %d concurrent)",
len(issues_to_sync),
max_workers,
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(sync_single_issue, item): item
for item in issues_to_sync
}
for future in as_completed(futures):
try:
result = future.result()
status = result.get("status")
if status == "created":
stats["created"] += 1
elif status == "updated":
stats["updated"] += 1
else:
stats["failed"] += 1
except Exception as e:
logger.error("Future failed: %s", e)
stats["failed"] += 1
return stats
def _create_beads_issue_from_task(self, task: Dict[str, Any]) -> Optional[str]:
"""Create a new beads issue from a ClickUp task.
Args:
task: ClickUp task dictionary
Returns:
New beads issue ID if successful, None otherwise
"""
try:
# Extract fields from ClickUp task
title = task.get("name", "Untitled")
description = task.get("description", "")
# Remove any "Synced from beads" footer if present
if "---" in description and "Synced from beads issue:" in description:
description = description.split("---")[0].strip()
# Enforce template structure from .beads/templates.yaml
description = enforce_template_structure(
description,
project_dir=self.project_dir,
title=title,
)
# Map status
status_obj = task.get("status", {})
clickup_status = (
status_obj.get("status", "")
if isinstance(status_obj, dict)
else str(status_obj)
)
beads_status = self.field_mapper.clickup_to_beads_status(clickup_status)
# Map priority
priority_obj = task.get("priority")
clickup_priority = None
if priority_obj:
if isinstance(priority_obj, dict):
clickup_priority = (
int(priority_obj.get("id", 3))
if priority_obj.get("id")
else None
)
else:
clickup_priority = priority_obj
beads_priority = self.field_mapper.clickup_to_beads_priority(
clickup_priority
)
# Build bd create command (description always enforced to template structure)
cmd = ["bd", "create", "--title", title, "--description", description]
if beads_priority:
cmd.extend(["--priority", f"p{beads_priority}"])
# Run bd create
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
cwd=str(self.project_dir),
)
# Extract issue ID from output
import re
match = re.search(r"Created issue: ([\w-]+)", result.stdout)
if match:
issue_id = match.group(1)
task_id = task.get("id", "")
# Update status if not open
if beads_status and beads_status != "open":
if beads_status == "closed":
# bd update --status closed fails the DB CHECK constraint
# (closed_at must be non-NULL when status='closed').
# Use bd close instead, which sets closed_at correctly.
subprocess.run(
[
"bd",
"close",
issue_id,
"--reason",
"Imported from ClickUp (closed)",
],
capture_output=True,
check=False,
cwd=str(self.project_dir),
)
else:
subprocess.run(
["bd", "update", issue_id, "--status", beads_status],
capture_output=True,
check=False,
cwd=str(self.project_dir),
)
# Store external-ref
if task_id and self.field_mapper.use_external_ref:
subprocess.run(
[
"bd",
"update",
issue_id,
"--external-ref",
f"clickup-{task_id}",
],
capture_output=True,
check=False,
cwd=str(self.project_dir),
)
# Convert and store custom fields from ClickUp
clickup_custom_fields = task.get("custom_fields", [])
beads_custom_fields = self.custom_field_mapper.clickup_custom_fields_to_beads(
clickup_custom_fields
)
# Always store metadata (even if no custom fields) to preserve status
self.custom_field_store.set(
issue_id,
custom_fields=beads_custom_fields,
clickup_metadata={
"task_id": task_id,
"task_url": f"https://app.clickup.com/t/{task_id}",
"clickup_status": clickup_status,
"clickup_status_category": beads_status,
"last_synced_at": datetime.utcnow().isoformat(),
"last_synced_from": "clickup"
}
)
if beads_custom_fields:
logger.info("Stored %d custom fields from ClickUp for new issue %s", len(beads_custom_fields), issue_id)
# Update sync state
self.sync_state[issue_id] = {
"clickup_task_id": task_id,
"last_synced": datetime.utcnow().isoformat(),
"last_clickup_update": datetime.utcnow().isoformat(),
"sync_direction": "clickup-to-beads",
}
self._save_sync_state()
logger.info(
"Created beads issue %s from ClickUp task %s", issue_id, task_id
)
return issue_id
logger.error("Could not parse issue ID from bd create output")
return None
except subprocess.CalledProcessError as e:
logger.error("Failed to create beads issue: %s", e.stderr)
return None
except Exception as e:
logger.error("Failed to create beads issue from task: %s", e)
return None
def _handle_deleted_clickup_tasks(self, clickup_task_ids: set) -> Dict[str, int]:
"""Handle ClickUp tasks that have been deleted.
Closes corresponding beads issues when their linked ClickUp task is deleted.
Args:
clickup_task_ids: Set of current ClickUp task IDs in the list
Returns:
Statistics dictionary
"""
stats = {"closed": 0, "failed": 0}
# Find synced issues whose ClickUp task no longer exists
for issue_id, state in list(self.sync_state.items()):
if not isinstance(state, dict):
continue
task_id = state.get("clickup_task_id")
if not task_id:
continue
# Check if task still exists in ClickUp
if task_id not in clickup_task_ids:
# Task was deleted in ClickUp - close the beads issue
logger.info(
"ClickUp task %s was deleted, closing beads issue %s",
task_id,
issue_id,
)
try:
# Check if issue is already closed
issue = self._get_beads_issue(issue_id)
if issue and issue.get("status") not in [
"closed",
"completed",
"cancelled",
]:
subprocess.run(
[
"bd",
"close",
issue_id,
"--reason",
f"ClickUp task {task_id} was deleted",
],
capture_output=True,
check=True,
cwd=str(self.project_dir),
)
stats["closed"] += 1
logger.info(
"Closed beads issue %s (ClickUp task deleted)", issue_id
)
except subprocess.CalledProcessError as e:
logger.error("Failed to close issue %s: %s", issue_id, e.stderr)
stats["failed"] += 1
except Exception as e:
logger.error("Failed to close issue %s: %s", issue_id, e)
stats["failed"] += 1
return stats
def _get_last_clickup_sync_timestamp(self) -> Optional[int]:
"""Get the timestamp of the last ClickUp sync in milliseconds.
Returns:
Unix timestamp in milliseconds, or None if no previous sync
"""
from datetime import timezone
last_sync = self.sync_state.get("_last_clickup_sync")
if last_sync:
try:
# Parse ISO format (result is naive UTC)
ts = self._parse_timestamp_utc(last_sync)
# Add UTC timezone so .timestamp() works correctly
ts_utc = ts.replace(tzinfo=timezone.utc)
return int(ts_utc.timestamp() * 1000)
except (ValueError, TypeError):
pass
return None
def _update_last_clickup_sync_timestamp(self):
"""Update the last ClickUp sync timestamp to now."""
self.sync_state["_last_clickup_sync"] = datetime.utcnow().isoformat()
self._save_sync_state()
def _sync_clickup_to_beads(
self, full_sync: bool = False, force_import: bool = False
) -> Dict[str, int]:
"""Sync ClickUp tasks to beads.
This performs bidirectional sync from ClickUp:
- Creates new beads issues for ClickUp tasks that don't exist locally
- Updates existing beads issues with ClickUp changes (if changed)
- Closes beads issues when their ClickUp task is deleted (only on full sync)
Args:
full_sync: If True, sync all tasks. If False, only sync tasks updated since last sync.
force_import: If True, import ALL tasks including those with beads-automation tag
(useful for importing tasks created by beads in other projects)
Returns:
Statistics dictionary
"""
stats = {
"created": 0,
"updated": 0,
"failed": 0,
"skipped": 0,
"closed": 0,
"unchanged": 0,
}
list_id = self.clickup_config.get("default_list_id")
if not list_id:
logger.error("default_list_id not configured")
return stats
try:
# For incremental sync, only fetch tasks updated since last sync
date_updated_gt = None
if not full_sync:
date_updated_gt = self._get_last_clickup_sync_timestamp()
if date_updated_gt:
logger.info(
"Fetching ClickUp tasks updated since %s",
datetime.utcfromtimestamp(date_updated_gt / 1000).isoformat(),
)
# Get tasks from ClickUp
# Always include closed tasks so status changes to COMPLETE are detected.
# For full sync: no date filter (fetch everything for deletion detection)
# For incremental: filter by date_updated_gt to limit API calls
if full_sync:
tasks = self.mcp_client.list_tasks(list_id, include_closed=True)
else:
tasks = self.mcp_client.list_tasks(
list_id, include_closed=True, date_updated_gt=date_updated_gt
)
logger.info("Fetched %d tasks from ClickUp", len(tasks))
clickup_task_ids = set()
from concurrent.futures import ThreadPoolExecutor, as_completed
# First pass: collect work items and task IDs
tasks_to_create = [] # Tasks that need new beads issues
tasks_to_update = [] # Tasks that need to sync to existing issues
for task in tasks:
task_id = task.get("id", "")
if not task_id:
continue
clickup_task_ids.add(task_id)
# Skip tasks created by beads (to avoid duplicates)
tags = task.get("tags", [])
tag_names = [
t.get("name", "") if isinstance(t, dict) else str(t) for t in tags
]
# Find corresponding beads issue
issue_id = None
for bid, state in self.sync_state.items():
if not isinstance(state, dict):
continue
if state.get("clickup_task_id") == task_id:
issue_id = bid
break
if not issue_id:
# Try to extract from task custom fields
issue_update = self.field_mapper.clickup_to_beads_issue(task)
issue_id = issue_update.get("_beads_id")
if not issue_id:
# No beads issue exists - check if we should create one.
# full_sync imports everything (including beads-automation tasks)
# so that a fresh project gets a complete picture of the ClickUp board.
# Incremental sync skips beads-automation tasks to avoid duplicates
# from other projects unless force_import is explicitly set.
if full_sync or force_import or "beads-automation" not in tag_names:
tasks_to_create.append(task)
else:
logger.debug(
"Skipping task %s (has beads-automation tag, use full or force_import to override)",
task_id,
)
stats["skipped"] += 1
continue
# Skip if task hasn't changed since last sync (incremental only)
task_date_updated = task.get("date_updated")
if not full_sync:
stored_date_updated = self.sync_state.get(issue_id, {}).get(
"clickup_date_updated"
)
if task_date_updated and task_date_updated == stored_date_updated:
stats["unchanged"] += 1
continue
# Queue for update, carrying date_updated for post-sync storage
tasks_to_update.append(
{
"task_id": task_id,
"issue_id": issue_id,
"date_updated": task_date_updated,
}
)
# Process creates and updates concurrently
max_workers = 5
def create_issue_from_task(task: Dict) -> Dict[str, Any]:
"""Create a beads issue from ClickUp task."""
task_id = task.get("id", "")
try:
new_issue_id = self._create_beads_issue_from_task(task)
if new_issue_id:
return {"status": "created", "task_id": task_id}
return {"status": "failed", "task_id": task_id}
except Exception as e:
logger.error("Error creating issue from task %s: %s", task_id, e)
return {"status": "failed", "task_id": task_id}
def update_issue_from_task(item: Dict) -> Dict[str, Any]:
"""Update a beads issue from ClickUp task."""
task_id = item["task_id"]
issue_id = item["issue_id"]
date_updated = item.get("date_updated")
try:
result = self.sync_task_to_beads(task_id, issue_id)
if result:
return {
"status": "updated",
"task_id": task_id,
"issue_id": issue_id,
"date_updated": date_updated,
}
return {"status": "failed", "task_id": task_id}
except Exception as e:
logger.error("Error syncing task %s: %s", task_id, e)
return {"status": "failed", "task_id": task_id}
# Process all work concurrently
all_work = []
all_work.extend([("create", t) for t in tasks_to_create])
all_work.extend([("update", t) for t in tasks_to_update])
if all_work:
logger.info(
"Processing %d tasks from ClickUp (max %d concurrent)",
len(all_work),
max_workers,
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {}
for work_type, item in all_work:
if work_type == "create":
futures[executor.submit(create_issue_from_task, item)] = (
work_type
)
else:
futures[executor.submit(update_issue_from_task, item)] = (
work_type
)
for future in as_completed(futures):
try:
result = future.result()
status = result.get("status")
if status == "created":
stats["created"] += 1
elif status == "updated":
stats["updated"] += 1
# Store date_updated so next sync can skip this task
res_issue_id = result.get("issue_id")
res_date_updated = result.get("date_updated")
if res_issue_id and res_date_updated:
self.sync_state.setdefault(res_issue_id, {})[
"clickup_date_updated"
] = res_date_updated
else:
stats["failed"] += 1
except Exception as e:
logger.error("Future failed: %s", e)
stats["failed"] += 1
# Persist clickup_date_updated values written during concurrent sync
self._save_sync_state()
# Handle deleted tasks (only on full sync since incremental doesn't fetch all tasks)
if full_sync:
deleted_stats = self._handle_deleted_clickup_tasks(clickup_task_ids)
stats["closed"] = deleted_stats["closed"]
stats["failed"] += deleted_stats["failed"]
# Note: last sync timestamp is updated in sync_all() using start time
except Exception as e:
logger.error("Failed to sync ClickUp tasks: %s", e, exc_info=True)
return stats
def link_issue_and_task(
self,
issue_id: str,
task_id: str,
) -> bool:
"""Manually link a beads issue and ClickUp task.
Args:
issue_id: Beads issue ID
task_id: ClickUp task ID
Returns:
True if successful
"""
try:
# Verify both exist
issue = self._get_beads_issue(issue_id)
if not issue:
logger.error("Issue %s not found", issue_id)
return False
task = self.mcp_client.get_task(task_id)
if not task:
logger.error("Task %s not found", task_id)
return False
# Update external-ref in beads
if self.field_mapper.use_external_ref:
subprocess.run(
["bd", "update", issue_id, "--external-ref", f"clickup-{task_id}"],
capture_output=True,
check=True,
cwd=str(self.project_dir),
)
# Update sync state
self.sync_state[issue_id] = {
"clickup_task_id": task_id,
"last_synced": datetime.utcnow().isoformat(),
"sync_direction": "manual",
}
self._save_sync_state()
logger.info("Linked issue %s to task %s", issue_id, task_id)
return True
except Exception as e:
logger.error(
"Failed to link issue %s and task %s: %s", issue_id, task_id, e
)
return False
def fetch_webhook_events(self, webhook_url: Optional[str] = None) -> Dict[str, int]:
"""Fetch and process pending events from webhook server.
This is used when the webhook runs remotely (e.g., Cloud Run) and
events need to be pulled to the local machine for processing.
Args:
webhook_url: Base URL of webhook server (e.g., https://your-webhook.run.app)
If not provided, uses config or env var.
Returns:
Statistics dictionary with processed/failed counts
"""
import requests
stats = {"processed": 0, "failed": 0, "skipped": 0}
# Get webhook URL from config or environment
if not webhook_url:
webhook_url = self.config.get("webhook", {}).get("url") or os.environ.get(
"BEADS_WEBHOOK_URL"
)
if not webhook_url:
logger.warning("No webhook URL configured, skipping event fetch")
return stats
# Remove trailing slash
webhook_url = webhook_url.rstrip("/")
try:
# Fetch pending events
response = requests.get(f"{webhook_url}/events", timeout=30)
response.raise_for_status()
data = response.json()
events = data.get("events", [])
if not events:
logger.debug("No pending webhook events")
return stats
logger.info("Fetched %d pending webhook events", len(events))
processed_ids = []
for event_entry in events:
event_id = event_entry.get("id")
event_data = event_entry.get("data", {})
event_type = event_data.get("event_type")
task_id = event_data.get("task_id")
if not task_id:
logger.warning("Event %s missing task_id, skipping", event_id)
stats["skipped"] += 1
processed_ids.append(event_id)
continue
try:
logger.info(
"Processing webhook event: %s for task %s", event_type, task_id
)
if event_type in (
"taskCreated",
"taskUpdated",
"taskStatusUpdated",
):
result = self.sync_task_to_beads(task_id)
if result:
stats["processed"] += 1
else:
stats["failed"] += 1
elif event_type == "taskDeleted":
# Find and close corresponding beads issue
for bid, state in self.sync_state.items():
if (
isinstance(state, dict)
and state.get("clickup_task_id") == task_id
):
self._update_beads_issue(bid, {"status": "closed"})
logger.info("Closed beads issue %s (task deleted)", bid)
stats["processed"] += 1
break
else:
logger.debug(
"No beads issue found for deleted task %s", task_id
)
stats["skipped"] += 1
else:
# For other events, just sync the task
result = self.sync_task_to_beads(task_id)
if result:
stats["processed"] += 1
else:
stats["failed"] += 1
processed_ids.append(event_id)
except Exception as e:
logger.error("Failed to process event %s: %s", event_id, e)
stats["failed"] += 1
processed_ids.append(
event_id
) # Still mark as processed to avoid retry loop
# Acknowledge processed events
if processed_ids:
try:
ack_response = requests.post(
f"{webhook_url}/events/ack",
json={"event_ids": processed_ids},
timeout=30,
)
ack_response.raise_for_status()
logger.info("Acknowledged %d events", len(processed_ids))
except Exception as e:
logger.warning("Failed to acknowledge events: %s", e)
except requests.RequestException as e:
logger.error("Failed to fetch webhook events: %s", e)
except Exception as e:
logger.error("Error processing webhook events: %s", e, exc_info=True)
return stats
def poll_pending_review_prs(self) -> Dict[str, int]:
"""Poll GitHub for PR approvals on issues in PENDING REVIEW state.
For each issue whose stored ClickUp status is PENDING REVIEW, checks the
linked GitHub PR for an APPROVED review. On approval:
1. Fills the 09_internal_review_results custom field
2. Moves the ClickUp task to COMPLETE
3. Marks the beads issue as completed
Note: "pending_review" is never written to beads. Beads status goes
directly from in_progress to completed.
Returns:
Statistics dict: polled, approved, failed, skipped
"""
stats: Dict[str, int] = {"polled": 0, "approved": 0, "failed": 0, "skipped": 0}
github_config = self.config.get("github", {})
if not github_config.get("poll_prs", True):
logger.debug("GitHub PR polling disabled (github.poll_prs=false)")
return stats
github_token = github_config.get("api_token") or os.environ.get("GITHUB_TOKEN")
pending_review_status = self.field_mapper.beads_to_clickup_status("pending_review")
if not pending_review_status:
logger.debug("No pending_review status configured, skipping GitHub PR poll")
return stats
complete_status = (
self.field_mapper.beads_to_clickup_status("completed") or "COMPLETE"
)
# Find the field ID for 09_internal_review_results
review_field_id: Optional[str] = None
for fname, fconf in self.custom_field_mapper.custom_fields.items():
# Field names may have numeric prefix like "09_internal_review_results"
normalized = fname.lstrip("0123456789").lstrip("_")
if normalized in ("internal_review_results",):
review_field_id = fconf.get("clickup_field_id")
break
for issue_id, state in list(self.sync_state.items()):
if issue_id.startswith("_"):
continue
if not isinstance(state, dict):
continue
clickup_task_id = state.get("clickup_task_id")
if not clickup_task_id:
continue
# Check stored ClickUp status — only process PENDING REVIEW issues
cf_data = self.custom_field_store.get(issue_id)
if not cf_data:
continue
clickup_metadata = cf_data.get("clickup_metadata", {})
stored_status = clickup_metadata.get("clickup_status", "")
if stored_status.upper() != pending_review_status.upper():
continue
# Get pr_link
pr_link = cf_data.get("custom_fields", {}).get("pr_link")
if not pr_link:
stats["skipped"] += 1
continue
# Parse GitHub URL
parsed = self._parse_github_pr_url(pr_link)
if not parsed:
logger.warning(
"Cannot parse GitHub PR URL for %s: %s", issue_id, pr_link
)
stats["skipped"] += 1
continue
owner, repo, pr_number = parsed
stats["polled"] += 1
try:
approved, reviewer, approved_at = self._check_github_pr_approved(
owner, repo, pr_number, github_token
)
except Exception as e:
logger.warning(
"Failed to check GitHub PR for %s (%s): %s", issue_id, pr_link, e
)
stats["failed"] += 1
continue
if not approved:
logger.debug("PR not yet merged for issue %s", issue_id)
continue
review_result = f"Approved by {reviewer} on {approved_at}"
logger.info(
"PR approved for %s (%s) — completing issue", issue_id, review_result
)
try:
# 1. Fill 09_internal_review_results field
if review_field_id:
self.mcp_client.set_custom_field(
clickup_task_id, review_field_id, review_result
)
else:
logger.warning(
"09_internal_review_results field ID not found in config; "
"skipping field update for %s",
issue_id,
)
# 2. Move ClickUp task to COMPLETE
self.mcp_client.update_task(
task_id=clickup_task_id, status=complete_status
)
# 3. Close beads issue via bd CLI so hooks/wrapper/MCP all fire
subprocess.run(
["bd", "close", issue_id, "--reason", review_result],
capture_output=True,
text=True,
check=False,
cwd=str(self.project_dir),
)
# 4. Update stored metadata
existing_cf = cf_data.get("custom_fields", {})
existing_cf["internal_review_results"] = review_result
self.custom_field_store.set(
issue_id,
custom_fields=existing_cf,
clickup_metadata={
**clickup_metadata,
"clickup_status": complete_status,
"clickup_status_category": "completed",
"last_synced_at": datetime.utcnow().isoformat(),
},
)
logger.info(
"Completed issue %s after PR approval by %s", issue_id, reviewer
)
stats["approved"] += 1
except Exception as e:
logger.error(
"Failed to complete issue %s after PR approval: %s",
issue_id,
e,
exc_info=True,
)
stats["failed"] += 1
return stats
def _load_issue(self, issue_id: str) -> Optional[Dict[str, Any]]:
"""Load a single issue from issues.jsonl by ID. Returns None if not found.
Reads the JSONL file directly rather than via ``bd show --json`` subprocess,
so it is safe to call from within the sync engine without spawning a child process.
"""
issues_file = self.project_dir / ".beads" / "issues.jsonl"
if not issues_file.exists():
return None
with open(issues_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
issue = json.loads(line)
if issue.get("id") == issue_id:
return issue
except json.JSONDecodeError:
continue
return None
def check_close_gate(self, issue_id: str) -> tuple[bool, str]:
"""Gate check before closing an issue via MCP.
Returns (allowed, reason). Gate is skipped when require_pr_approval
is not enabled in config. Issues with label 'no-pr' bypass the gate.
Decision order:
1. Gate disabled -> allowed
2. 'no-pr' label -> allowed (bypass)
3. No pr_link -> blocked
4. Bad PR URL -> blocked
5. PR not approved -> blocked
6. PR approved -> allowed
"""
if not self.config.get("sync", {}).get("require_pr_approval", False):
return True, "gate disabled"
# Load issue to check labels
issue = self._load_issue(issue_id)
labels = issue.get("labels") or [] if issue else []
if "no-pr" in labels:
return True, "no-pr label: bypassing gate"
# Check pr_link
pr_link = self._issue_has_pr_link(issue or {}, issue_id)
if not pr_link:
return (
False,
f"No pr_link set on {issue_id}. "
"Set pr_link first: bd_update(issue_id=..., pr_link='https://github.com/...')",
)
# Parse PR URL
parsed = self._parse_github_pr_url(pr_link)
if not parsed:
return False, f"Cannot parse PR URL: {pr_link}"
owner, repo, pr_number = parsed
github_token = (
self.config.get("github", {}).get("api_token")
or os.environ.get("GITHUB_TOKEN")
)
try:
approved, reviewer, approved_at = self._check_github_pr_approved(
owner, repo, pr_number, github_token
)
except Exception as exc:
return False, f"GitHub API error checking PR #{pr_number}: {exc}"
if not approved:
return False, f"PR #{pr_number} not yet approved or merged."
return True, f"PR #{pr_number} approved by {reviewer} on {approved_at}"
def get_sync_status(self) -> Dict[str, Any]:
"""Get synchronization status and statistics.
Returns:
Status dictionary
"""
synced_count = len(self.sync_state)
# Count by direction
beads_to_cu = sum(
1
for state in self.sync_state.values()
if state.get("sync_direction") == "beads-to-clickup"
)
cu_to_beads = sum(
1
for state in self.sync_state.values()
if state.get("sync_direction") == "clickup-to-beads"
)
# Check for recent syncs
recent_syncs = []
for issue_id, state in self.sync_state.items():
last_synced = state.get("last_synced", "")
if last_synced:
recent_syncs.append((issue_id, last_synced))
recent_syncs.sort(key=lambda x: x[1], reverse=True)
last_sync = recent_syncs[0][1] if recent_syncs else None
return {
"enabled": self.sync_config.get("enabled", True),
"direction": self.sync_config.get("direction", "bidirectional"),
"synced_issues": synced_count,
"beads_to_clickup": beads_to_cu,
"clickup_to_beads": cu_to_beads,
"last_sync": last_sync,
"config_path": str(self.config_path),
"state_file": str(self.state_file),
}