railiance-platform/scripts/credential-change.py

2190 lines
82 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import difflib
import json
import os
import re
2026-06-28 00:21:02 +02:00
import shlex
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
REPO_DIR = Path(__file__).resolve().parents[1]
DEFAULT_CCR_DIR = REPO_DIR / "credential-change-requests"
ALLOWED_STATUSES = {
"draft",
"proposed",
"needs_changes",
"approved",
"denied",
"apply_pending",
"applied",
"verified",
"active",
"deactivated",
"rotated",
"compromised",
"superseded",
"cancelled",
}
APPLY_ALLOWED_STATUSES = {"approved"}
POST_APPLY_STATUSES = {"applied", "verified", "active"}
SECRET_MARKERS = [
"AGE-SECRET-KEY-1",
"-----BEGIN PRIVATE KEY-----",
"-----BEGIN OPENSSH PRIVATE KEY-----",
"OPENBAO_ROOT_TOKEN=",
"VAULT_TOKEN=",
"BAO_TOKEN=",
"hvb.",
"hvc.",
"hvs.",
"npm_",
"ghp_",
"sk-",
]
DISALLOWED_POLICY_NAMES = {"root", "platform-admin"}
FRONTDOOR_READINESS = {
"template",
"pending-review",
"approved-pending-apply",
"applied-pending-verify",
"ready",
"disabled",
"compromised",
}
SAFE_ID_RE = re.compile(r"^[A-Z0-9][A-Z0-9_.-]*$")
TTL_RE = re.compile(r"^[1-9][0-9]*[smhd]$")
LOWER_SAFE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9-]*$")
FIELD_NAME_RE = re.compile(r"^[A-Z][A-Z0-9_]*$")
APPLIER_DRY_RUN_ALLOWED_STATUSES = APPLY_ALLOWED_STATUSES | POST_APPLY_STATUSES
APPLIER_ALLOWED_ENVIRONMENTS = {
"build",
"development",
"test",
"staging",
"production",
}
WORKLOAD_KV_POLICY_PREFIX = "workload-kv-read-"
OIDC_WORKLOAD_ROLE_SUFFIX = "-workload-kv-read"
KUBERNETES_ROLE_SUFFIXES = ("-workload-kv-read", "-secrets-read")
KUBERNETES_ROLE_PREFIXES = ("external-secrets-",)
EVIDENCE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$")
RUNBOOK_ALLOWED_STATUSES = APPLY_ALLOWED_STATUSES | POST_APPLY_STATUSES
LIFECYCLE_ACTIONS = {
"deactivate": {
"status": "deactivated",
"readiness": "disabled",
"resolvable": False,
"kind": "deactivation",
},
"rotate": {
"status": "rotated",
"readiness": "applied-pending-verify",
"resolvable": False,
"kind": "rotation",
},
"compromise": {
"status": "compromised",
"readiness": "compromised",
"resolvable": False,
"kind": "compromise",
},
}
def fail(message: str) -> None:
raise SystemExit(f"ERROR: {message}")
def utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def resolve_repo_path(path: str | Path) -> Path:
p = Path(path).expanduser()
if p.is_absolute():
return p
return (REPO_DIR / p).resolve()
def load_yaml(path: Path) -> dict[str, Any]:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
fail(f"YAML root must be an object: {path}")
return data
def dump_yaml(path: Path, data: dict[str, Any]) -> None:
path.write_text(
yaml.safe_dump(data, sort_keys=False, allow_unicode=False),
encoding="utf-8",
)
def ccr_dir() -> Path:
return resolve_repo_path(os.environ.get("CCR_DIR", str(DEFAULT_CCR_DIR)))
def resolve_ccr(ref: str) -> Path:
candidate = resolve_repo_path(ref)
if candidate.exists():
return candidate
matches = sorted(ccr_dir().glob(f"{ref}*.y*ml"))
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
fail(f"CCR reference is ambiguous: {ref} -> {[m.name for m in matches]}")
fail(f"CCR not found by path or id prefix: {ref}")
def require_object(value: Any, field: str, errors: list[str]) -> dict[str, Any]:
if not isinstance(value, dict):
errors.append(f"{field} must be an object")
return {}
return value
def require_list(value: Any, field: str, errors: list[str]) -> list[Any]:
if not isinstance(value, list):
errors.append(f"{field} must be a list")
return []
return value
def require_string(value: Any, field: str, errors: list[str]) -> str:
if not isinstance(value, str) or not value.strip():
errors.append(f"{field} must be a non-empty string")
return ""
return value.strip()
def contains_secret_marker(text: str, marker: str) -> bool:
if marker == "sk-":
return re.search(r"(?<![A-Za-z])sk-[A-Za-z0-9]", text) is not None
return marker in text
def scan_for_secrets(path: Path, errors: list[str]) -> None:
text = path.read_text(encoding="utf-8")
for marker in SECRET_MARKERS:
if contains_secret_marker(text, marker):
errors.append(f"{path.name} contains rejected secret marker {marker!r}")
def reject_secret_text(text: str, field: str) -> None:
for marker in SECRET_MARKERS:
if contains_secret_marker(text, marker):
fail(f"{field} contains rejected secret marker {marker!r}")
def validate_workload_kv_read(ccr: dict[str, Any], errors: list[str], warnings: list[str]) -> None:
target = require_object(ccr.get("target"), "target", errors)
for field in ("domain", "tenant", "workload", "environment", "purpose"):
require_string(target.get(field), f"target.{field}", errors)
openbao = require_object(ccr.get("openbao"), "openbao", errors)
mount = require_string(openbao.get("mount"), "openbao.mount", errors)
kv_path = require_string(openbao.get("kv_path"), "openbao.kv_path", errors)
policy_name = require_string(
openbao.get("policy_name"), "openbao.policy_name", errors
)
policy_file = require_string(
openbao.get("policy_file"), "openbao.policy_file", errors
)
fields = [str(field) for field in require_list(openbao.get("fields"), "openbao.fields", errors)]
if not fields:
errors.append("openbao.fields must contain at least one field")
if mount and kv_path and not kv_path.startswith(f"{mount}/"):
errors.append("openbao.kv_path must start with the declared mount")
if any(fragment in kv_path for fragment in ("*", "..")):
errors.append("openbao.kv_path must not contain '*' or '..'")
if policy_name in DISALLOWED_POLICY_NAMES:
errors.append(f"openbao.policy_name is disallowed: {policy_name}")
if policy_file:
resolved_policy = resolve_repo_path(policy_file)
if not resolved_policy.exists():
errors.append(f"openbao.policy_file does not exist: {policy_file}")
auth = require_object(openbao.get("auth"), "openbao.auth", errors)
method = require_string(auth.get("method"), "openbao.auth.method", errors)
if method not in {"oidc", "kubernetes"}:
errors.append("openbao.auth.method must be oidc or kubernetes")
require_string(auth.get("mount"), "openbao.auth.mount", errors)
require_string(auth.get("role"), "openbao.auth.role", errors)
if method == "oidc":
redirect_uris = require_list(
auth.get("allowed_redirect_uris"),
"openbao.auth.allowed_redirect_uris",
errors,
)
if not redirect_uris:
errors.append("openbao.auth.allowed_redirect_uris must not be empty for oidc")
for index, uri in enumerate(redirect_uris):
if not isinstance(uri, str) or not uri.strip():
errors.append(
f"openbao.auth.allowed_redirect_uris[{index}] must be a non-empty string"
)
if auth.get("groups_claim"):
oidc_scopes = require_list(
auth.get("oidc_scopes"), "openbao.auth.oidc_scopes", errors
)
if "groups" not in oidc_scopes:
errors.append(
"openbao.auth.oidc_scopes must include 'groups' when groups_claim is set"
)
for index, scope in enumerate(oidc_scopes):
if not isinstance(scope, str) or not scope.strip():
errors.append(
f"openbao.auth.oidc_scopes[{index}] must be a non-empty string"
)
policies = [str(policy) for policy in require_list(auth.get("policies"), "openbao.auth.policies", errors)]
if policies != [policy_name]:
errors.append("openbao.auth.policies must contain exactly openbao.policy_name")
for policy in policies:
if policy in DISALLOWED_POLICY_NAMES:
errors.append(f"openbao.auth.policies contains disallowed policy {policy}")
ttl = auth.get("ttl")
if ttl is not None and (not isinstance(ttl, str) or not TTL_RE.match(ttl)):
errors.append("openbao.auth.ttl must match <positive integer><s|m|h|d>")
bound_claims = require_object(
auth.get("bound_claims"), "openbao.auth.bound_claims", errors
)
if not bound_claims:
errors.append("openbao.auth.bound_claims must not be empty")
if auth.get("bound_claims_confirmed") is not True:
warnings.append("OIDC/Kubernetes bound claim is not confirmed; apply is blocked")
frontdoor = require_object(ccr.get("access_frontdoor"), "access_frontdoor", errors)
require_string(frontdoor.get("type"), "access_frontdoor.type", errors)
require_string(frontdoor.get("catalog_id"), "access_frontdoor.catalog_id", errors)
readiness = require_string(frontdoor.get("readiness"), "access_frontdoor.readiness", errors)
if readiness and readiness not in FRONTDOOR_READINESS:
errors.append(
f"access_frontdoor.readiness must be one of {sorted(FRONTDOOR_READINESS)}"
)
resolvable = frontdoor.get("resolvable")
if not isinstance(resolvable, bool):
errors.append("access_frontdoor.resolvable must be a boolean")
if resolvable is True and ccr.get("status") != "active":
errors.append("access_frontdoor.resolvable=true requires status active")
command = frontdoor.get("command")
if command is not None and not isinstance(command, str):
errors.append("access_frontdoor.command must be a string when present")
risk = require_object(ccr.get("risk"), "risk", errors)
require_string(risk.get("classification"), "risk.classification", errors)
require_list(risk.get("notes"), "risk.notes", errors)
verification = require_object(ccr.get("verification"), "verification", errors)
for field in ("positive", "negative", "activation_conditions"):
values = require_list(verification.get(field), f"verification.{field}", errors)
if not values:
errors.append(f"verification.{field} must not be empty")
lifecycle = require_object(ccr.get("lifecycle"), "lifecycle", errors)
for field in ("deactivate", "rotate", "compromised"):
require_string(lifecycle.get(field), f"lifecycle.{field}", errors)
def validate_ccr(path: Path) -> tuple[dict[str, Any], list[str], list[str]]:
errors: list[str] = []
warnings: list[str] = []
scan_for_secrets(path, errors)
ccr = load_yaml(path)
for field in (
"id",
"kind",
"schema_version",
"request_type",
"title",
"status",
"created",
"updated",
"requester",
):
if field == "schema_version":
if ccr.get(field) != 1:
errors.append("schema_version must be 1")
elif field == "requester":
require_object(ccr.get(field), field, errors)
else:
require_string(ccr.get(field), field, errors)
if ccr.get("kind") != "credential-change-request":
errors.append("kind must be credential-change-request")
ccr_id = ccr.get("id")
if isinstance(ccr_id, str) and not SAFE_ID_RE.match(ccr_id):
errors.append("id must contain only uppercase letters, digits, dot, dash, or underscore")
status = ccr.get("status")
if isinstance(status, str) and status not in ALLOWED_STATUSES:
errors.append(f"status must be one of {sorted(ALLOWED_STATUSES)}")
request_type = ccr.get("request_type")
if request_type != "workload-kv-read":
errors.append("request_type must be workload-kv-read")
else:
validate_workload_kv_read(ccr, errors, warnings)
return ccr, errors, warnings
def render_summary(ccr: dict[str, Any], warnings: list[str]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
frontdoor = ccr["access_frontdoor"]
risk = ccr["risk"]
verification = ccr["verification"]
fields = ", ".join(openbao["fields"])
claim_bits = ", ".join(
f"{key}={value}" for key, value in auth.get("bound_claims", {}).items()
)
lines = [
f"Request: {ccr['title']}",
f"CCR: {ccr['id']} ({ccr['status']})",
f"Type: {ccr['request_type']}",
f"Target: {ccr['target']['tenant']}/{ccr['target']['workload']} ({ccr['target']['environment']})",
"Mount/path/field:",
f" {openbao['kv_path']}",
f" {fields}",
"Policy:",
f" {openbao['policy_name']}",
"Auth binding:",
f" {auth['mount']} {auth['method']} role {auth['role']}",
f" bound claims: {claim_bits}",
f" confirmed: {auth.get('bound_claims_confirmed') is True}",
"Access front door:",
f" {frontdoor['type']} {frontdoor['catalog_id']}",
f" readiness: {frontdoor.get('readiness')} resolvable={frontdoor.get('resolvable') is True}",
]
if frontdoor.get("command"):
lines.append(f" command: {frontdoor['command']}")
lines.append(f"Risk: {risk['classification']}")
for note in risk.get("notes", []):
lines.append(f" - {note}")
lines.append("Checks:")
for check in verification.get("positive", []):
lines.append(f" + {check}")
for check in verification.get("negative", []):
lines.append(f" - {check}")
if warnings:
lines.append("Warnings:")
for warning in warnings:
lines.append(f" ! {warning}")
lines.extend(
[
"Decision:",
" approve | deny | needs_changes",
"Comment:",
" free text; do not include secret values",
]
)
return "\n".join(lines)
def generated_policy_hcl(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
mount = openbao["mount"]
suffix = openbao["kv_path"][len(mount) + 1 :]
return (
f'path "{mount}/data/{suffix}" {{\n'
' capabilities = ["read"]\n'
"}\n\n"
f'path "{mount}/metadata/{suffix}" {{\n'
' capabilities = ["read"]\n'
"}\n"
)
def display_repo_path(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_DIR))
except ValueError:
return str(path)
def policy_artifact_diff(ccr: dict[str, Any]) -> dict[str, Any]:
policy_path = resolve_repo_path(ccr["openbao"]["policy_file"])
generated = generated_policy_hcl(ccr)
generated_lines = generated.rstrip().splitlines()
result: dict[str, Any] = {
"path": display_repo_path(policy_path),
"status": "missing",
"matches": False,
"diff": [],
}
if policy_path.exists():
source = policy_path.read_text(encoding="utf-8")
source_lines = source.rstrip().splitlines()
result["matches"] = normalized_policy_body(source) == normalized_policy_body(
generated
)
result["status"] = "matches" if result["matches"] else "differs"
else:
source_lines = []
if not result["matches"]:
result["diff"] = list(
difflib.unified_diff(
source_lines,
generated_lines,
fromfile=result["path"],
tofile=f"generated/{ccr['openbao']['policy_name']}.hcl",
lineterm="",
)
)
return result
def render_policy_artifact_diff(ccr: dict[str, Any], indent: str = "") -> list[str]:
artifact = policy_artifact_diff(ccr)
lines = [
f"{indent}source artifact: {artifact['path']}",
f"{indent}artifact status: {artifact['status']}",
]
if artifact["matches"]:
lines.append(f"{indent}diff: none; source artifact matches generated policy body")
return lines
lines.append(f"{indent}diff:")
for line in artifact["diff"]:
lines.append(f"{indent}{line}")
return lines
def auth_payload(ccr: dict[str, Any]) -> dict[str, Any]:
auth = ccr["openbao"]["auth"]
if auth["method"] == "kubernetes":
claims = auth["bound_claims"]
return {
"bound_service_account_names": claims.get("service_account_names", []),
"bound_service_account_namespaces": claims.get(
"service_account_namespaces", []
),
"policies": ",".join(auth["policies"]),
"ttl": auth.get("ttl", "15m"),
}
payload: dict[str, Any] = {
"role_type": "oidc",
"user_claim": auth.get("user_claim", "sub"),
"policies": ",".join(auth["policies"]),
"ttl": auth.get("ttl", "15m"),
"bound_claims": auth["bound_claims"],
}
if auth.get("groups_claim"):
payload["groups_claim"] = auth["groups_claim"]
if auth.get("allowed_redirect_uris"):
payload["allowed_redirect_uris"] = auth["allowed_redirect_uris"]
if auth.get("oidc_scopes"):
payload["oidc_scopes"] = auth["oidc_scopes"]
return payload
def render_plan(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
payload = auth_payload(ccr)
lines = [
f"CCR {ccr['id']} apply plan",
"",
"1. Write policy HCL:",
f" policy: {openbao['policy_name']}",
f" source: {openbao['policy_file']}",
"",
generated_policy_hcl(ccr).rstrip(),
"",
" Source artifact diff:",
*render_policy_artifact_diff(ccr, indent=" "),
"",
"2. Create/update auth role payload:",
f" path: auth/{auth['mount']}/role/{auth['role']}",
json.dumps(payload, indent=2, sort_keys=True),
"",
"3. Provision secret value out-of-band:",
f" path: {openbao['kv_path']}",
f" fields: {', '.join(openbao['fields'])}",
"",
"4. Verify positive and negative access without printing secret values.",
]
return "\n".join(lines)
2026-06-28 00:21:02 +02:00
def render_operator_commands(ccr: dict[str, Any]) -> str:
openbao = ccr["openbao"]
auth = openbao["auth"]
auth_path = f"auth/{auth['mount']}/role/{auth['role']}"
payload = auth_payload(ccr)
2026-06-28 02:33:42 +02:00
role_payload = json.dumps(payload, indent=2, sort_keys=True)
2026-06-28 00:21:02 +02:00
secret_args = " ".join(
shlex.quote(f"{field}=<enter-through-approved-custody>")
for field in openbao["fields"]
)
lines = [
f"# Operator handoff for {ccr['id']}: {ccr['title']}",
"# Run from the railiance-platform repo with an approved OpenBao operator token.",
2026-06-28 09:18:36 +02:00
"# Do not paste this shell block into the OpenBao Browser CLI.",
f"# Web UI API Explorer path for the role JSON body: /v1/{auth_path}",
2026-06-28 00:21:02 +02:00
"set -euo pipefail",
f"bao policy write {shlex.quote(openbao['policy_name'])} {shlex.quote(openbao['policy_file'])}",
2026-06-28 02:33:42 +02:00
'role_payload_file="$(mktemp)"',
'trap \'rm -f "$role_payload_file"\' EXIT',
'cat >"$role_payload_file" <<\'JSON\'',
role_payload,
"JSON",
f"bao write {shlex.quote(auth_path)} @\"$role_payload_file\"",
2026-06-28 00:21:02 +02:00
"",
"# Secret provisioning remains under approved OpenBao/operator custody.",
"# Do not paste secret values into Git, State Hub, workplans, logs, or chat.",
f"# bao kv put {shlex.quote(openbao['kv_path'])} {secret_args}",
"",
"# After provisioning, run positive and negative verification without printing secret values.",
]
return "\n".join(lines)
def normalized_policy_body(text: str) -> str:
lines: list[str] = []
for line in text.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
lines.append(stripped)
return "\n".join(lines)
def string_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
values: list[str] = []
for item in value:
values.extend(string_values(item))
return values
if isinstance(value, dict):
values = []
for item in value.values():
values.extend(string_values(item))
return values
return []
def applier_policy_violations(ccr: dict[str, Any]) -> list[str]:
violations: list[str] = []
if ccr.get("request_type") != "workload-kv-read":
return ["delegated applier only supports workload-kv-read CCRs"]
target = ccr.get("target", {})
environment = str(target.get("environment") or "")
if environment not in APPLIER_ALLOWED_ENVIRONMENTS:
violations.append(f"target.environment is outside delegated applier scope: {environment}")
openbao = ccr.get("openbao", {})
mount = str(openbao.get("mount") or "")
kv_path = str(openbao.get("kv_path") or "")
policy_name = str(openbao.get("policy_name") or "")
policy_file = str(openbao.get("policy_file") or "")
fields = openbao.get("fields") or []
if mount != "platform":
violations.append(f"openbao.mount must be platform, got {mount}")
if not kv_path.startswith("platform/workloads/"):
violations.append("openbao.kv_path must stay under platform/workloads/")
if any(fragment in kv_path for fragment in ("*", "..", "//")):
violations.append("openbao.kv_path must not contain wildcard, parent, or empty segments")
if "/data/" in kv_path or "/metadata/" in kv_path:
violations.append("openbao.kv_path must be the logical KV path, not a KV-v2 API path")
for field in fields:
field_name = str(field)
if not FIELD_NAME_RE.match(field_name):
violations.append(f"openbao.fields contains unsafe field name: {field_name}")
if policy_name in DISALLOWED_POLICY_NAMES:
violations.append(f"openbao.policy_name is disallowed: {policy_name}")
if not policy_name.startswith(WORKLOAD_KV_POLICY_PREFIX):
violations.append(
f"openbao.policy_name must start with {WORKLOAD_KV_POLICY_PREFIX}"
)
if not LOWER_SAFE_ID_RE.match(policy_name):
violations.append(f"openbao.policy_name contains unsafe characters: {policy_name}")
if policy_file:
resolved_policy = resolve_repo_path(policy_file)
policy_dir = (REPO_DIR / "openbao" / "policies").resolve()
try:
resolved_policy.relative_to(policy_dir)
except ValueError:
violations.append("openbao.policy_file must stay under openbao/policies")
expected_name = f"{policy_name}.hcl"
if resolved_policy.name != expected_name:
violations.append(
f"openbao.policy_file name must match policy name: {expected_name}"
)
if resolved_policy.exists():
source = normalized_policy_body(resolved_policy.read_text(encoding="utf-8"))
generated = normalized_policy_body(generated_policy_hcl(ccr))
if source != generated:
violations.append("openbao.policy_file does not match generated CCR policy")
auth = openbao.get("auth", {})
method = str(auth.get("method") or "")
auth_mount = str(auth.get("mount") or "")
role = str(auth.get("role") or "")
if method == "oidc" and auth_mount != "netkingdom":
violations.append("OIDC workload CCRs may only mutate auth/netkingdom roles")
elif method == "kubernetes" and auth_mount != "kubernetes":
violations.append("Kubernetes workload CCRs may only mutate auth/kubernetes roles")
elif method not in {"oidc", "kubernetes"}:
violations.append(f"unsupported auth method for delegated applier: {method}")
if not LOWER_SAFE_ID_RE.match(role):
violations.append(f"openbao.auth.role contains unsafe characters: {role}")
if role in DISALLOWED_POLICY_NAMES:
violations.append(f"openbao.auth.role is disallowed: {role}")
if method == "oidc" and not role.endswith(OIDC_WORKLOAD_ROLE_SUFFIX):
violations.append(
f"OIDC workload role must end with {OIDC_WORKLOAD_ROLE_SUFFIX}"
)
if method == "kubernetes" and not (
role.endswith(KUBERNETES_ROLE_SUFFIXES)
or role.startswith(KUBERNETES_ROLE_PREFIXES)
):
allowed_roles = list(KUBERNETES_ROLE_SUFFIXES) + [
f"{prefix}*" for prefix in KUBERNETES_ROLE_PREFIXES
]
violations.append(
"Kubernetes workload role must end with/start with "
+ " or ".join(allowed_roles)
)
for policy in auth.get("policies") or []:
policy_value = str(policy)
if policy_value != policy_name:
violations.append("openbao.auth.policies must contain only openbao.policy_name")
if policy_value in DISALLOWED_POLICY_NAMES:
violations.append(f"openbao.auth.policies contains disallowed policy: {policy_value}")
for value in string_values(auth.get("bound_claims") or {}):
if not value.strip() or "*" in value or ".." in value:
violations.append("openbao.auth.bound_claims contains an unsafe value")
break
return violations
def applier_readiness_blockers(ccr: dict[str, Any]) -> list[str]:
blockers: list[str] = []
status = ccr.get("status")
if status not in APPLIER_DRY_RUN_ALLOWED_STATUSES:
blockers.append(
"applier dry-run requires approved, applied, verified, or active "
f"CCR status, got {status}"
)
if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True:
blockers.append("applier dry-run requires confirmed OpenBao auth binding")
blockers.extend(applier_policy_violations(ccr))
return blockers
def applier_dry_run_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]:
openbao = ccr["openbao"]
auth = openbao["auth"]
auth_path = f"auth/{auth['mount']}/role/{auth['role']}"
return {
"id": ccr["id"],
"title": ccr["title"],
"status": ccr["status"],
"environment": ccr["target"]["environment"],
"warnings": warnings,
"source_artifacts": {
"policy": policy_artifact_diff(ccr),
},
"mutations": [
{
"kind": "policy_write",
"openbao_path": f"sys/policies/acl/{openbao['policy_name']}",
"policy_name": openbao["policy_name"],
"source": openbao["policy_file"],
"body": generated_policy_hcl(ccr).rstrip(),
},
{
"kind": "auth_role_write",
"openbao_path": auth_path,
"payload": auth_payload(ccr),
},
],
"out_of_scope": [
"secret value writes",
"secret reads",
"front-door activation before verification",
],
"required_evidence": [
"CCR id and approval reference",
"policy name and auth role path",
"OpenBao request id or timestamp",
"positive and negative verification references",
],
}
def render_applier_dry_run(payload: dict[str, Any]) -> str:
lines = [
f"CCR {payload['id']} delegated applier dry-run",
f"Status: {payload['status']}",
f"Environment: {payload['environment']}",
"",
"Allowed metadata mutations:",
]
for mutation in payload["mutations"]:
lines.append(f"- {mutation['kind']}: {mutation['openbao_path']}")
if mutation["kind"] == "policy_write":
lines.append(f" source: {mutation['source']}")
lines.append(" body:")
for line in mutation["body"].splitlines():
lines.append(f" {line}")
else:
lines.append(" payload:")
rendered_payload = json.dumps(mutation["payload"], indent=4, sort_keys=True)
for line in rendered_payload.splitlines():
lines.append(f" {line}")
lines.append("")
lines.append("Out of scope:")
for item in payload["out_of_scope"]:
lines.append(f"- {item}")
lines.append("Required non-secret evidence:")
for item in payload["required_evidence"]:
lines.append(f"- {item}")
policy_artifact = payload.get("source_artifacts", {}).get("policy")
if policy_artifact:
lines.append("Source artifact checks:")
lines.append(f"- policy: {policy_artifact['path']} ({policy_artifact['status']})")
if policy_artifact.get("diff"):
lines.append(" diff:")
for line in policy_artifact["diff"]:
lines.append(f" {line}")
if payload["warnings"]:
lines.append("Warnings:")
for warning in payload["warnings"]:
lines.append(f"- {warning}")
return "\n".join(lines)
def applier_confirmation_phrase(ccr: dict[str, Any]) -> str:
return f"DELEGATED APPLY {ccr['id']}"
def delegated_apply_details(ccr: dict[str, Any], actor: str) -> list[str]:
openbao = ccr["openbao"]
auth = openbao["auth"]
return [
f"Delegated metadata applier ran as {actor} using local bao CLI ambient authority.",
f"Policy metadata write: sys/policies/acl/{openbao['policy_name']}",
f"Auth role metadata write: auth/{auth['mount']}/role/{auth['role']}",
"No secret values were read, written, printed, or accepted in argv.",
]
def render_applier_apply_plan(ccr: dict[str, Any], warnings: list[str]) -> str:
payload = applier_dry_run_payload(ccr, warnings)
lines = [render_applier_dry_run(payload), "", "Delegated apply confirmation:"]
lines.append(f" {applier_confirmation_phrase(ccr)}")
lines.extend(
[
"",
"Live apply command:",
f" scripts/credential-change.py applier-apply {ccr['id']} --actor <applier> --confirm \"{applier_confirmation_phrase(ccr)}\" --record-state-hub",
"",
"The command uses the local bao CLI and ambient delegated applier identity.",
"It does not accept OpenBao tokens in argv and never writes secret values.",
]
)
return "\n".join(lines)
def runbook_readiness_blockers(ccr: dict[str, Any]) -> list[str]:
blockers: list[str] = []
status = ccr.get("status")
if status not in RUNBOOK_ALLOWED_STATUSES:
blockers.append(
"runbook requires approved, applied, verified, or active CCR status, "
f"got {status}"
)
if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True:
blockers.append("runbook requires confirmed OpenBao auth binding")
return blockers
def runbook_confirmation_phrase(ccr: dict[str, Any]) -> str:
return f"APPLY {ccr['id']}"
def runbook_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]:
openbao = ccr["openbao"]
frontdoor = ccr["access_frontdoor"]
verification = ccr["verification"]
auth = openbao["auth"]
return {
"id": ccr["id"],
"title": ccr["title"],
"status": ccr["status"],
"target": ccr["target"],
"warnings": warnings,
"decision_link": ccr.get("state_hub", {}).get("decision_api_url")
or ccr.get("state_hub", {}).get("decision_dashboard_url"),
"confirmation_phrase": runbook_confirmation_phrase(ccr),
"policy": {
"name": openbao["policy_name"],
"source": openbao["policy_file"],
"artifact": policy_artifact_diff(ccr),
},
"auth_role": {
"path": f"auth/{auth['mount']}/role/{auth['role']}",
"method": auth["method"],
},
"secret_provisioning": {
"path": openbao["kv_path"],
"fields": openbao["fields"],
"instruction": (
"Enter or rotate secret values only through approved OpenBao/operator "
"custody; do not paste values into Git, State Hub, prompts, chat, "
"argv, or logs."
),
},
"verification": {
"positive": verification.get("positive", []),
"negative": verification.get("negative", []),
"activation_conditions": verification.get("activation_conditions", []),
},
"frontdoor": {
"type": frontdoor["type"],
"catalog_id": frontdoor["catalog_id"],
"readiness": frontdoor.get("readiness"),
"resolvable": frontdoor.get("resolvable") is True,
"command": frontdoor.get("command"),
},
}
def render_runbook(payload: dict[str, Any]) -> str:
fields = ", ".join(payload["secret_provisioning"]["fields"])
lines = [
f"CCR {payload['id']} operator runbook",
f"Title: {payload['title']}",
f"Status: {payload['status']}",
f"Target: {payload['target']['tenant']}/{payload['target']['workload']} ({payload['target']['environment']})",
]
if payload.get("decision_link"):
lines.append(f"Decision: {payload['decision_link']}")
lines.extend(
[
"",
"Final attended confirmation:",
f" {payload['confirmation_phrase']}",
"",
"1. Review generated metadata plan:",
f" policy: {payload['policy']['name']}",
f" source: {payload['policy']['source']}",
f" artifact: {payload['policy']['artifact']['status']}",
f" auth role: {payload['auth_role']['path']}",
"",
"2. Apply non-secret metadata:",
" scripts/credential-change.py runbook <CCR> --execute-metadata --actor <operator>",
" The command uses the local bao CLI and ambient approved operator authority;",
" it does not accept OpenBao tokens in argv and it does not write secret values.",
"",
"3. Provision secret value through approved custody:",
f" path: {payload['secret_provisioning']['path']}",
f" fields: {fields}",
f" {payload['secret_provisioning']['instruction']}",
"",
"4. Positive verification:",
]
)
for item in payload["verification"]["positive"]:
lines.append(f" - {item}")
lines.append("")
lines.append("5. Negative verification:")
for item in payload["verification"]["negative"]:
lines.append(f" - {item}")
lines.append("")
lines.append("6. Record non-secret evidence:")
lines.extend(
[
" scripts/credential-change.py record-evidence <CCR> --actor <operator> --kind metadata_apply --result passed --detail \"OpenBao request id or audit timestamp: <non-secret>\" --status applied --record-state-hub",
" scripts/credential-change.py record-evidence <CCR> --actor <operator> --kind secret_provisioned --result passed --detail \"Field presence checked without printing values\" --record-state-hub",
" scripts/credential-change.py record-evidence <CCR> --actor <operator> --kind positive_verification --result passed --detail \"Positive verification reference: <non-secret>\" --record-state-hub",
" scripts/credential-change.py record-evidence <CCR> --actor <operator> --kind negative_verification --result passed --detail \"Negative verification reference: <non-secret>\" --status verified --record-state-hub",
" scripts/credential-change.py record-evidence <CCR> --actor <operator> --kind frontdoor_activation --result passed --detail \"Front door ready/resolvable after verification\" --status active --frontdoor-ready --record-state-hub",
"",
"Activation conditions:",
]
)
for item in payload["verification"]["activation_conditions"]:
lines.append(f" - {item}")
if payload["frontdoor"].get("command"):
lines.extend(["", f"Front-door command: {payload['frontdoor']['command']}"])
if payload["warnings"]:
lines.append("")
lines.append("Warnings:")
for warning in payload["warnings"]:
lines.append(f" - {warning}")
return "\n".join(lines)
def run_bao_metadata_apply(ccr: dict[str, Any], bao_bin: str) -> None:
openbao = ccr["openbao"]
auth = openbao["auth"]
auth_path = f"auth/{auth['mount']}/role/{auth['role']}"
with tempfile.NamedTemporaryFile("w", encoding="utf-8", delete=False) as role_file:
role_file.write(json.dumps(auth_payload(ccr), indent=2, sort_keys=True))
role_file.write("\n")
role_path = Path(role_file.name)
try:
commands = [
[bao_bin, "policy", "write", openbao["policy_name"], openbao["policy_file"]],
[bao_bin, "write", auth_path, f"@{role_path}"],
]
for command in commands:
subprocess.run(command, cwd=REPO_DIR, check=True)
finally:
try:
role_path.unlink()
except FileNotFoundError:
pass
def validate_evidence_text(kind: str, result: str, details: list[str]) -> None:
if not EVIDENCE_ID_RE.match(kind):
fail("evidence kind must contain only lowercase letters, digits, underscore, or dash")
if not EVIDENCE_ID_RE.match(result):
fail("evidence result must contain only lowercase letters, digits, underscore, or dash")
reject_secret_text(kind, "evidence kind")
reject_secret_text(result, "evidence result")
for detail in details:
reject_secret_text(detail, "evidence detail")
def append_evidence(
path: Path,
actor: str,
kind: str,
result: str,
details: list[str],
set_status: str | None = None,
frontdoor_ready: bool = False,
) -> dict[str, Any]:
validate_evidence_text(kind, result, details)
reject_secret_text(actor, "evidence actor")
ccr, errors, warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
verification = ccr.setdefault("verification", {})
evidence = verification.setdefault("evidence", [])
if not isinstance(evidence, list):
fail("verification.evidence must be a list")
evidence.append(
{
"at": utc_now(),
"actor": actor,
"kind": kind,
"result": result,
"details": details,
}
)
if set_status:
if set_status not in ALLOWED_STATUSES:
fail(f"status must be one of {sorted(ALLOWED_STATUSES)}")
ccr["status"] = set_status
if frontdoor_ready:
frontdoor = ccr.setdefault("access_frontdoor", {})
frontdoor["readiness"] = "ready"
frontdoor["resolvable"] = True
ccr["updated"] = datetime.now(timezone.utc).date().isoformat()
dump_yaml(path, ccr)
return ccr
def record_evidence_state_hub(
ccr: dict[str, Any], base_url: str, actor: str, kind: str, result: str, details: list[str]
) -> dict[str, Any]:
openbao = ccr["openbao"]
summary = (
f"CCR {ccr['id']} evidence {kind}/{result} by {actor}: "
f"status={ccr['status']} path={openbao['kv_path']} "
f"policy={openbao['policy_name']}; "
+ "; ".join(details)
)
return state_hub_post_json(
base_url,
"/progress/",
{
"summary": summary,
"event_type": "credential_change_evidence",
"author": actor,
},
)
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "item"
def reject_secret_values(values: list[str], field: str) -> None:
for value in values:
reject_secret_text(value, field)
def parse_key_values(values: list[str]) -> dict[str, list[str]]:
parsed: dict[str, list[str]] = {}
for raw in values:
reject_secret_text(raw, "bound claim")
key, separator, value = raw.partition("=")
key = key.strip()
value = value.strip()
if separator != "=" or not key or not value:
fail("bound claims must use key=value syntax")
parsed.setdefault(key, []).append(value)
return parsed
def lifecycle_action_config(action: str) -> dict[str, Any]:
try:
return LIFECYCLE_ACTIONS[action]
except KeyError:
fail(f"lifecycle action must be one of {sorted(LIFECYCLE_ACTIONS)}")
def lifecycle_payload(ccr: dict[str, Any], action: str) -> dict[str, Any]:
config = lifecycle_action_config(action)
openbao = ccr["openbao"]
auth = openbao["auth"]
frontdoor = ccr["access_frontdoor"]
auth_role_path = f"auth/{auth['mount']}/role/{auth['role']}"
disable_commands = [
f"bao delete {shlex.quote(auth_role_path)}",
f"bao policy delete {shlex.quote(openbao['policy_name'])}",
]
return {
"id": ccr["id"],
"title": ccr["title"],
"action": action,
"target_status": config["status"],
"target_readiness": config["readiness"],
"target_resolvable": config["resolvable"],
"frontdoor": {
"type": frontdoor["type"],
"catalog_id": frontdoor["catalog_id"],
"command": frontdoor.get("command"),
},
"openbao": {
"secret_path": openbao["kv_path"],
"fields": openbao["fields"],
"policy_name": openbao["policy_name"],
"auth_role_path": auth_role_path,
"disable_commands": disable_commands,
},
"record_command": (
"scripts/credential-change.py lifecycle-event "
f"{ccr['id']} --action {action} --actor <operator> "
"--reason \"<non-secret reason>\" --detail \"<non-secret evidence>\" "
"--record-state-hub"
),
}
def render_lifecycle_plan(payload: dict[str, Any]) -> str:
action = payload["action"]
lines = [
f"CCR {payload['id']} lifecycle plan: {action}",
f"Title: {payload['title']}",
f"Target CCR status: {payload['target_status']}",
f"Target front door: readiness={payload['target_readiness']} resolvable={payload['target_resolvable']}",
"",
"1. Record lifecycle event:",
f" {payload['record_command']}",
"",
"2. Front-door action:",
f" Mark {payload['frontdoor']['type']} catalog {payload['frontdoor']['catalog_id']} as {payload['target_readiness']} before any further use.",
]
if payload["frontdoor"].get("command"):
lines.append(f" Existing command to disable/check externally: {payload['frontdoor']['command']}")
lines.extend(["", "3. OpenBao metadata action:"])
if action in {"deactivate", "compromise"}:
lines.append(" Disable caller access by removing the auth role and read policy with approved operator authority:")
for command in payload["openbao"]["disable_commands"]:
lines.append(f" {command}")
lines.append(" Secret values are not printed or copied; rotate/delete values only through approved custody.")
elif action == "rotate":
lines.append(" Keep the front door non-resolvable while the replacement value is entered through approved custody.")
lines.append(f" Secret path: {payload['openbao']['secret_path']}")
lines.append(f" Fields: {', '.join(payload['openbao']['fields'])}")
lines.append(" After positive and negative verification, record front-door activation evidence to return the lane to active.")
lines.extend(["", "4. Required non-secret notes:"])
if action == "compromise":
lines.append(" Include blast-radius notes and follow-up task references; never include the exposed value.")
elif action == "rotate":
lines.append(" Include old-value revocation evidence, new-value field presence evidence, and verification references.")
else:
lines.append(" Include the reason for disablement, OpenBao audit/request reference, and front-door disable reference.")
return "\n".join(lines)
def append_lifecycle_event(
path: Path,
actor: str,
action: str,
reason: str,
details: list[str],
blast_radius: list[str] | None = None,
follow_up: list[str] | None = None,
) -> dict[str, Any]:
config = lifecycle_action_config(action)
reject_secret_text(actor, "lifecycle actor")
reject_secret_text(reason, "lifecycle reason")
reject_secret_values(details, "lifecycle detail")
blast_radius = blast_radius or []
follow_up = follow_up or []
reject_secret_values(blast_radius, "lifecycle blast radius")
reject_secret_values(follow_up, "lifecycle follow-up")
ccr, errors, warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
lifecycle = ccr.setdefault("lifecycle", {})
events = lifecycle.setdefault("events", [])
if not isinstance(events, list):
fail("lifecycle.events must be a list")
event = {
"at": utc_now(),
"actor": actor,
"action": action,
"status": config["status"],
"reason": reason,
"details": details,
}
if blast_radius:
event["blast_radius"] = blast_radius
if follow_up:
event["follow_up"] = follow_up
events.append(event)
ccr["status"] = config["status"]
frontdoor = ccr.setdefault("access_frontdoor", {})
frontdoor["readiness"] = config["readiness"]
frontdoor["resolvable"] = config["resolvable"]
ccr["updated"] = datetime.now(timezone.utc).date().isoformat()
dump_yaml(path, ccr)
return ccr
def record_lifecycle_state_hub(
ccr: dict[str, Any], base_url: str, actor: str, action: str, reason: str, details: list[str]
) -> dict[str, Any]:
openbao = ccr["openbao"]
frontdoor = ccr["access_frontdoor"]
summary = (
f"CCR {ccr['id']} lifecycle {action} by {actor}: "
f"status={ccr['status']} readiness={frontdoor.get('readiness')} "
f"resolvable={frontdoor.get('resolvable') is True} path={openbao['kv_path']} "
f"policy={openbao['policy_name']}; reason={reason}; "
+ "; ".join(details)
)
return state_hub_post_json(
base_url,
"/progress/",
{
"summary": summary,
"event_type": "credential_change_lifecycle",
"author": actor,
},
)
def inventory_ccr_from_args(args: argparse.Namespace) -> dict[str, Any]:
fields = list(args.field or [])
if not fields:
fail("at least one --field is required")
reject_secret_values(fields, "inventory field")
for value in (
args.id,
args.title,
args.tenant,
args.workload,
args.environment,
args.purpose,
args.kv_path,
args.policy_name or "",
args.policy_file or "",
args.auth_mount,
args.auth_role,
args.frontdoor_type,
args.catalog_id,
args.reason,
):
reject_secret_text(str(value), "inventory metadata")
policy_name = args.policy_name or f"workload-kv-read-{slugify(args.workload)}-{slugify(args.purpose)}"
policy_file = args.policy_file or f"openbao/policies/{policy_name}.hcl"
bound_claims = parse_key_values(args.bound_claim or [])
if args.auth_method == "kubernetes":
if args.service_account:
bound_claims["service_account_names"] = list(args.service_account)
if args.service_account_namespace:
bound_claims["service_account_namespaces"] = list(args.service_account_namespace)
if not bound_claims:
fail("at least one --bound-claim or Kubernetes service account binding is required")
allowed_redirect_uris = list(getattr(args, "redirect_uri", None) or [])
if args.auth_method == "oidc" and not allowed_redirect_uris:
allowed_redirect_uris = [
"https://bao.coulomb.social/ui/vault/auth/netkingdom/oidc/callback",
"http://localhost:8250/oidc/callback",
"http://127.0.0.1:8250/oidc/callback",
]
ccr = {
"id": args.id,
"kind": "credential-change-request",
"schema_version": 1,
"request_type": "workload-kv-read",
"title": args.title,
"status": args.status,
"created": datetime.now(timezone.utc).date().isoformat(),
"updated": datetime.now(timezone.utc).date().isoformat(),
"requester": {
"agent": args.requester_agent,
"reason": args.reason,
},
"review": {
"required": True,
"required_approvers": ["platform-operator"],
"comments": [
{
"at": utc_now(),
"reviewer": args.actor,
"decision": "inventory_imported",
"comment": args.reason,
}
],
},
"target": {
"domain": "financials",
"tenant": args.tenant,
"workload": args.workload,
"environment": args.environment,
"purpose": args.purpose,
},
"openbao": {
"mount": args.mount,
"kv_path": args.kv_path,
"fields": fields,
"policy_name": policy_name,
"policy_file": policy_file,
"auth": {
"method": args.auth_method,
"mount": args.auth_mount,
"role": args.auth_role,
"bound_claims": bound_claims,
"bound_claims_confirmed": args.bound_claims_confirmed,
"policies": [policy_name],
"ttl": args.ttl,
},
},
"access_frontdoor": {
"type": args.frontdoor_type,
"catalog_id": args.catalog_id,
"readiness": args.readiness,
"resolvable": args.resolvable,
"activation": "imported-existing-inventory",
},
"risk": {
"classification": args.risk,
"notes": ["Imported existing credential lane as non-secret CCR-backed inventory."],
},
"verification": {
"positive": [args.positive_check],
"negative": [args.negative_check],
"activation_conditions": [
"Existing policy/auth metadata confirmed without printing secret values.",
"Existing secret value remains under approved OpenBao/operator custody.",
],
"evidence": [],
},
"lifecycle": {
"deactivate": "Disable the access front door and remove or detach auth role policy.",
"rotate": "Replace the secret value through approved custody and re-run verification.",
"compromised": "Immediately disable access, rotate value, record blast-radius notes, and open follow-up tasks.",
"events": [],
},
}
if args.auth_method == "oidc":
auth = ccr["openbao"]["auth"]
auth["allowed_redirect_uris"] = allowed_redirect_uris
auth["oidc_scopes"] = ["openid", "profile", "email", "groups"]
auth["user_claim"] = "sub"
if "groups" in bound_claims:
auth["groups_claim"] = "groups"
if args.command:
reject_secret_text(args.command, "inventory command")
ccr["access_frontdoor"]["command"] = args.command
if args.selector:
reject_secret_text(args.selector, "inventory selector")
ccr["access_frontdoor"]["selector"] = args.selector
return ccr
def inventory_output_path(ccr: dict[str, Any], output_dir: str) -> Path:
output = resolve_repo_path(output_dir)
filename = f"{ccr['id']}-{slugify(ccr['title'])}.yaml"
return output / filename
def write_inventory_ccr(args: argparse.Namespace) -> Path:
ccr = inventory_ccr_from_args(args)
output_path = inventory_output_path(ccr, args.output_dir)
output_path.parent.mkdir(parents=True, exist_ok=True)
policy_path = resolve_repo_path(ccr["openbao"]["policy_file"])
if args.write_policy:
policy_path.parent.mkdir(parents=True, exist_ok=True)
if not policy_path.exists():
policy_path.write_text(generated_policy_hcl(ccr), encoding="utf-8")
dump_yaml(output_path, ccr)
_ccr, errors, warnings = validate_ccr(output_path)
for warning in warnings:
print(f"[WARN] {output_path.name}: {warning}", file=sys.stderr)
if errors:
for error in errors:
print(f"[FAIL] {output_path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
return output_path
def validate_or_exit(path: Path) -> tuple[dict[str, Any], list[str]]:
ccr, errors, warnings = validate_ccr(path)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
return ccr, warnings
def apply_blockers(ccr: dict[str, Any]) -> list[str]:
blockers: list[str] = []
status = ccr.get("status")
if status in POST_APPLY_STATUSES:
return blockers
if status not in APPLY_ALLOWED_STATUSES:
blockers.append(f"apply requires status approved, got {status}")
if ccr["openbao"]["auth"].get("bound_claims_confirmed") is not True:
blockers.append("apply requires confirmed OpenBao auth binding")
return blockers
def frontdoor_blockers(ccr: dict[str, Any]) -> list[str]:
frontdoor = ccr["access_frontdoor"]
blockers: list[str] = []
if ccr.get("status") != "active":
blockers.append(f"front door requires CCR status active, got {ccr.get('status')}")
if frontdoor.get("readiness") != "ready":
blockers.append(
f"front door readiness must be ready, got {frontdoor.get('readiness')}"
)
if frontdoor.get("resolvable") is not True:
blockers.append("front door is marked resolvable=false")
return blockers
def status_payload(ccr: dict[str, Any], warnings: list[str]) -> dict[str, Any]:
apply_blocked_by = apply_blockers(ccr)
frontdoor_blocked_by = frontdoor_blockers(ccr)
frontdoor = ccr["access_frontdoor"]
openbao = ccr["openbao"]
auth = openbao["auth"]
return {
"id": ccr["id"],
"title": ccr["title"],
"status": ccr["status"],
"request_type": ccr["request_type"],
"apply_allowed": ccr.get("status") in APPLY_ALLOWED_STATUSES and not apply_blocked_by,
"apply_complete": ccr.get("status") in POST_APPLY_STATUSES,
"apply_blockers": apply_blocked_by,
"frontdoor_resolvable": not frontdoor_blocked_by,
"frontdoor_blockers": frontdoor_blocked_by,
"warnings": warnings,
"openbao": {
"mount": openbao["mount"],
"kv_path": openbao["kv_path"],
"fields": openbao["fields"],
"policy_name": openbao["policy_name"],
"auth_mount": auth["mount"],
"auth_method": auth["method"],
"auth_role": auth["role"],
"bound_claims_confirmed": auth.get("bound_claims_confirmed") is True,
},
"access_frontdoor": {
"type": frontdoor["type"],
"catalog_id": frontdoor["catalog_id"],
"readiness": frontdoor.get("readiness"),
"resolvable": frontdoor.get("resolvable") is True,
"command": frontdoor.get("command"),
},
"state_hub": {
"decision_id": ccr.get("state_hub", {}).get("decision_id"),
"decision_api_url": ccr.get("state_hub", {}).get("decision_api_url"),
"decision_dashboard_url": ccr.get("state_hub", {}).get("decision_dashboard_url"),
},
}
def render_status(payload: dict[str, Any]) -> str:
lines = [
f"CCR: {payload['id']} ({payload['status']})",
f"Catalog: {payload['access_frontdoor']['catalog_id']}",
f"Readiness: {payload['access_frontdoor']['readiness']}",
f"Resolvable: {payload['frontdoor_resolvable']}",
f"Apply allowed: {payload['apply_allowed']}",
f"Apply complete: {payload.get('apply_complete') is True}",
]
decision = payload.get("state_hub", {}).get("decision_api_url")
dashboard = payload.get("state_hub", {}).get("decision_dashboard_url")
if decision:
lines.append(f"State Hub decision: {decision}")
if dashboard:
lines.append(f"Decision dashboard: {dashboard}")
command = payload["access_frontdoor"].get("command")
if command:
lines.append(f"Command: {command}")
if payload["apply_blockers"]:
lines.append("Apply blockers:")
for blocker in payload["apply_blockers"]:
lines.append(f" - {blocker}")
if payload["frontdoor_blockers"]:
lines.append("Front-door blockers:")
for blocker in payload["frontdoor_blockers"]:
lines.append(f" - {blocker}")
if payload["warnings"]:
lines.append("Warnings:")
for warning in payload["warnings"]:
lines.append(f" - {warning}")
return "\n".join(lines)
def append_decision(path: Path, status: str, reviewer: str, comment: str) -> dict[str, Any]:
reject_secret_text(comment, "review comment")
ccr, _warnings = validate_or_exit(path)
review = ccr.setdefault("review", {})
comments = review.setdefault("comments", [])
if not isinstance(comments, list):
fail("review.comments must be a list")
comments.append(
{
"at": utc_now(),
"reviewer": reviewer,
"decision": status,
"comment": comment,
}
)
ccr["status"] = status
ccr["updated"] = datetime.now(timezone.utc).date().isoformat()
dump_yaml(path, ccr)
return ccr
2026-06-27 23:45:31 +02:00
def confirm_binding(path: Path, reviewer: str, comment: str) -> None:
reject_secret_text(comment, "binding comment")
2026-06-27 23:45:31 +02:00
ccr, errors, _warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
ccr["openbao"]["auth"]["bound_claims_confirmed"] = True
review = ccr.setdefault("review", {})
comments = review.setdefault("comments", [])
if not isinstance(comments, list):
fail("review.comments must be a list")
comments.append(
{
"at": utc_now(),
"reviewer": reviewer,
"decision": "binding_confirmed",
"comment": comment,
}
)
ccr["updated"] = datetime.now(timezone.utc).date().isoformat()
dump_yaml(path, ccr)
STATE_HUB_DECISION_PREFIXES = (
("NEEDS_CHANGES", "needs_changes"),
("NEEDS CHANGES", "needs_changes"),
("REQUEST CHANGES", "needs_changes"),
("APPROVE", "approved"),
("APPROVED", "approved"),
("DENY", "denied"),
("DENIED", "denied"),
("REJECT", "denied"),
("REJECTED", "denied"),
)
def state_hub_get_json(base_url: str, path: str) -> dict[str, Any]:
url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
try:
with urllib.request.urlopen(url, timeout=10) as response:
data = json.load(response)
except urllib.error.HTTPError as exc:
fail(f"State Hub GET {url} failed with HTTP {exc.code}")
except OSError as exc:
fail(f"State Hub GET {url} failed: {exc}")
if not isinstance(data, dict):
fail(f"State Hub GET {url} returned non-object JSON")
return data
def state_hub_post_json(base_url: str, path: str, payload: dict[str, Any]) -> dict[str, Any]:
url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
data = json.load(response)
except urllib.error.HTTPError as exc:
fail(f"State Hub POST {url} failed with HTTP {exc.code}")
except OSError as exc:
fail(f"State Hub POST {url} failed: {exc}")
if not isinstance(data, dict):
fail(f"State Hub POST {url} returned non-object JSON")
return data
def decision_template_context(ccr: dict[str, Any]) -> dict[str, str]:
openbao = ccr["openbao"]
auth = openbao["auth"]
state_hub = ccr.get("state_hub", {})
return {
"id": ccr["id"],
"kv_path": openbao["kv_path"],
"policy_name": openbao["policy_name"],
"auth_role_path": f"auth/{auth['mount']}/role/{auth['role']}",
"decision_link": state_hub.get("decision_api_url")
or state_hub.get("decision_dashboard_url")
or "<link State Hub decision>",
}
def decision_templates(ccr: dict[str, Any] | None = None) -> dict[str, str]:
if ccr:
context = decision_template_context(ccr)
else:
context = {
"id": "<CCR-ID>",
"kv_path": "<platform/workloads/...>",
"policy_name": "<workload-kv-read-...>",
"auth_role_path": "auth/<mount>/role/<role>",
"decision_link": "<link State Hub decision>",
}
scope = (
f"{context['id']} path={context['kv_path']} "
f"policy={context['policy_name']} auth_role={context['auth_role_path']}"
)
return {
"approve": f"APPROVE: {scope}; rationale=<non-secret reason>",
"deny": f"DENY: {scope}; rationale=<non-secret reason>",
"needs_changes": f"NEEDS_CHANGES: {scope}; rationale=<non-secret reason>",
}
def render_decision_templates(ccr: dict[str, Any]) -> str:
context = decision_template_context(ccr)
templates = decision_templates(ccr)
lines = [
f"CCR {context['id']} decision templates",
f"Decision link: {context['decision_link']}",
"Use one of these accepted prefixes exactly:",
]
for key in ("approve", "deny", "needs_changes"):
lines.append(f"- {templates[key]}")
return "\n".join(lines)
def invalid_decision_template_message(ccr: dict[str, Any] | None = None) -> str:
templates = decision_templates(ccr)
return (
"resolved State Hub decision rationale must start with a recognized "
"decision template:\n"
f" {templates['approve']}\n"
f" {templates['deny']}\n"
f" {templates['needs_changes']}"
)
def state_hub_decision_status(ccr: dict[str, Any], base_url: str) -> dict[str, Any]:
decision_id = ccr.get("state_hub", {}).get("decision_id")
if not decision_id:
fail("CCR has no state_hub.decision_id")
return state_hub_get_json(base_url, f"/decisions/{decision_id}")
def ccr_status_from_state_hub_rationale(
rationale: str, ccr: dict[str, Any] | None = None
) -> str:
normalized = rationale.strip().upper().replace("-", "_")
for prefix, status in STATE_HUB_DECISION_PREFIXES:
if normalized == prefix or normalized.startswith(f"{prefix}:"):
return status
fail(invalid_decision_template_message(ccr))
def sync_state_hub_decision(path: Path, base_url: str) -> dict[str, Any]:
ccr, errors, warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
raise SystemExit(1)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
decision = state_hub_decision_status(ccr, base_url)
if decision.get("status") != "resolved":
return decision
rationale = str(decision.get("rationale") or "")
status = ccr_status_from_state_hub_rationale(rationale, ccr)
reviewer = str(decision.get("decided_by") or "state-hub")
append_decision(
path,
status,
reviewer,
f"State Hub decision {decision['id']}: {rationale}",
)
updated = load_yaml(path)
state_hub = updated.setdefault("state_hub", {})
state_hub["decision_resolved_at"] = decision.get("decided_at")
dump_yaml(path, updated)
return decision
def command_validate(args: argparse.Namespace) -> int:
refs = args.refs or [str(path) for path in sorted(ccr_dir().glob("*.y*ml"))]
if not refs:
fail(f"no CCR files found in {ccr_dir()}")
ok = True
for ref in refs:
path = resolve_ccr(ref)
_ccr, errors, warnings = validate_ccr(path)
for warning in warnings:
print(f"[WARN] {path.name}: {warning}", file=sys.stderr)
if errors:
ok = False
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
else:
print(f"[OK] {path.name}")
return 0 if ok else 1
def command_render(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, warnings = validate_or_exit(path)
print(render_summary(ccr, warnings))
return 0
def command_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
print(render_plan(ccr))
return 0
def command_decision_templates(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
print(render_decision_templates(ccr))
return 0
def command_status(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, errors, warnings = validate_ccr(path)
if errors:
for error in errors:
print(f"[FAIL] {path.name}: {error}", file=sys.stderr)
return 1
payload = status_payload(ccr, warnings)
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_status(payload))
return 0
2026-06-28 00:21:02 +02:00
def require_apply_ready(ccr: dict[str, Any], command_name: str) -> None:
if ccr.get("status") not in APPLY_ALLOWED_STATUSES:
2026-06-28 00:21:02 +02:00
fail(f"{command_name} requires status approved, got {ccr.get('status')}")
auth = ccr["openbao"]["auth"]
if auth.get("bound_claims_confirmed") is not True:
2026-06-28 00:21:02 +02:00
fail(f"{command_name} requires openbao.auth.bound_claims_confirmed=true")
def command_apply_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
require_apply_ready(ccr, "apply-plan")
print(render_plan(ccr))
return 0
2026-06-28 00:21:02 +02:00
def command_operator_commands(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
require_apply_ready(ccr, "operator-commands")
print(render_operator_commands(ccr))
return 0
def command_applier_dry_run(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, warnings = validate_or_exit(path)
blockers = applier_readiness_blockers(ccr)
if blockers:
for blocker in blockers:
print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr)
return 1
payload = applier_dry_run_payload(ccr, warnings)
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_applier_dry_run(payload))
return 0
def command_applier_apply(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, warnings = validate_or_exit(path)
blockers = applier_readiness_blockers(ccr)
if blockers:
for blocker in blockers:
print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr)
return 1
if args.json:
print(json.dumps(applier_dry_run_payload(ccr, warnings), indent=2, sort_keys=True))
elif not args.quiet:
print(render_applier_apply_plan(ccr, warnings))
if args.plan_only:
return 0
expected = applier_confirmation_phrase(ccr)
phrase = args.confirm or input("Type delegated apply confirmation phrase: ")
if phrase != expected:
fail(f"confirmation phrase mismatch; expected {expected!r}")
run_bao_metadata_apply(ccr, args.bao_bin)
set_status = "applied" if ccr.get("status") == "approved" else None
details = delegated_apply_details(ccr, args.actor)
ccr = append_evidence(
path,
args.actor,
"delegated_metadata_apply",
"passed",
details,
set_status=set_status,
)
print(f"[OK] {path.name} delegated metadata apply recorded")
if args.record_state_hub:
event = record_evidence_state_hub(
ccr,
args.state_hub_url,
args.actor,
"delegated_metadata_apply",
"passed",
details,
)
print(f"[OK] State Hub progress event {event.get('id', '<unknown>')}")
return 0
def command_runbook(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, warnings = validate_or_exit(path)
blockers = runbook_readiness_blockers(ccr)
if blockers:
for blocker in blockers:
print(f"[BLOCK] {path.name}: {blocker}", file=sys.stderr)
return 1
payload = runbook_payload(ccr, warnings)
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_runbook(payload))
if args.execute_metadata:
require_apply_ready(ccr, "runbook --execute-metadata")
phrase = args.confirm or input("Type final confirmation phrase: ")
expected = runbook_confirmation_phrase(ccr)
if phrase != expected:
fail(f"confirmation phrase mismatch; expected {expected!r}")
run_bao_metadata_apply(ccr, args.bao_bin)
details = ["OpenBao policy and auth-role metadata apply completed without secret values"]
ccr = append_evidence(
path,
args.actor,
"metadata_apply",
"passed",
details,
set_status="applied",
)
print(f"[OK] {path.name} metadata applied and evidence recorded")
if args.record_state_hub:
event = record_evidence_state_hub(
ccr, args.state_hub_url, args.actor, "metadata_apply", "passed", details
)
print(f"[OK] State Hub progress event {event.get('id', '<unknown>')}")
return 0
def command_record_evidence(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr = append_evidence(
path,
args.actor,
args.kind,
args.result,
args.detail,
set_status=args.status,
frontdoor_ready=args.frontdoor_ready,
)
print(f"[OK] {path.name} evidence {args.kind}/{args.result} recorded")
if args.record_state_hub:
event = record_evidence_state_hub(
ccr, args.state_hub_url, args.actor, args.kind, args.result, args.detail
)
print(f"[OK] State Hub progress event {event.get('id', '<unknown>')}")
return 0
def command_decision(args: argparse.Namespace, status: str) -> int:
path = resolve_ccr(args.ref)
ccr = append_decision(path, status, args.reviewer, args.comment)
print(f"[OK] {path.name} -> {status}")
if args.record_state_hub:
openbao = ccr["openbao"]
auth = openbao["auth"]
event = state_hub_post_json(
args.state_hub_url,
"/progress/",
{
"summary": (
f"CCR {ccr['id']} decision {status} by {args.reviewer}: "
f"path={openbao['kv_path']} policy={openbao['policy_name']} "
f"fields={','.join(openbao['fields'])} "
f"auth_role=auth/{auth['mount']}/role/{auth['role']}; "
f"{args.comment}"
),
"event_type": "credential_change_decision",
"author": args.reviewer,
},
)
print(f"[OK] State Hub progress event {event.get('id', '<unknown>')}")
return 0
2026-06-27 23:45:31 +02:00
def command_confirm_binding(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
confirm_binding(path, args.reviewer, args.comment)
print(f"[OK] {path.name} -> binding_confirmed")
return 0
def command_lifecycle_plan(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr, _warnings = validate_or_exit(path)
payload = lifecycle_payload(ccr, args.action)
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_lifecycle_plan(payload))
return 0
def command_lifecycle_event(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
ccr = append_lifecycle_event(
path,
args.actor,
args.action,
args.reason,
args.detail,
blast_radius=args.blast_radius,
follow_up=args.follow_up,
)
print(f"[OK] {path.name} lifecycle {args.action} -> {ccr['status']}")
if args.record_state_hub:
event = record_lifecycle_state_hub(
ccr, args.state_hub_url, args.actor, args.action, args.reason, args.detail
)
print(f"[OK] State Hub progress event {event.get('id', '<unknown>')}")
return 0
def command_import_inventory(args: argparse.Namespace) -> int:
path = write_inventory_ccr(args)
print(f"[OK] inventory CCR written: {display_repo_path(path)}")
return 0
def command_sync_decision(args: argparse.Namespace) -> int:
path = resolve_ccr(args.ref)
decision = sync_state_hub_decision(path, args.state_hub_url)
if decision.get("status") == "resolved":
print(f"[OK] {path.name} <- State Hub decision {decision['id']}")
else:
print(
f"[WAIT] State Hub decision {decision['id']} is {decision.get('status')}; "
"resolve it before syncing CCR status"
)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Validate, render, and review non-secret credential change requests."
)
sub = parser.add_subparsers(dest="command", required=True)
validate = sub.add_parser("validate", help="Validate CCR files")
validate.add_argument("refs", nargs="*")
validate.set_defaults(func=command_validate)
render = sub.add_parser("render", help="Render a human review summary")
render.add_argument("ref")
render.set_defaults(func=command_render)
plan = sub.add_parser("plan", help="Render the generated apply plan for review")
plan.add_argument("ref")
plan.set_defaults(func=command_plan)
templates = sub.add_parser(
"decision-templates",
help="Render State Hub/chat decision rationale templates",
)
templates.add_argument("ref")
templates.set_defaults(func=command_decision_templates)
status = sub.add_parser("status", help="Render machine-readable readiness status")
status.add_argument("ref")
status.add_argument("--json", action="store_true")
status.set_defaults(func=command_status)
apply_plan = sub.add_parser(
"apply-plan", help="Render an operator apply plan only for approved CCRs"
)
apply_plan.add_argument("ref")
apply_plan.set_defaults(func=command_apply_plan)
2026-06-28 00:21:02 +02:00
operator_commands = sub.add_parser(
"operator-commands",
help="Render reviewed non-secret OpenBao commands for an approved CCR",
)
operator_commands.add_argument("ref")
operator_commands.set_defaults(func=command_operator_commands)
applier_dry_run = sub.add_parser(
"applier-dry-run",
help="Validate and render delegated OpenBao metadata mutations",
)
applier_dry_run.add_argument("ref")
applier_dry_run.add_argument("--json", action="store_true")
applier_dry_run.set_defaults(func=command_applier_dry_run)
applier_apply = sub.add_parser(
"applier-apply",
help="Apply delegated OpenBao metadata after dry-run guardrails",
)
applier_apply.add_argument("ref")
applier_apply.add_argument("--actor", default=os.environ.get("USER", "delegated-applier"))
applier_apply.add_argument("--confirm")
applier_apply.add_argument("--bao-bin", default=os.environ.get("BAO_BIN", "bao"))
applier_apply.add_argument("--plan-only", action="store_true")
applier_apply.add_argument("--json", action="store_true")
applier_apply.add_argument("--quiet", action="store_true")
applier_apply.add_argument("--record-state-hub", action="store_true")
applier_apply.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
applier_apply.set_defaults(func=command_applier_apply)
runbook = sub.add_parser(
"runbook",
help="Render or execute the attended operator apply/verify runbook",
)
runbook.add_argument("ref")
runbook.add_argument("--json", action="store_true")
runbook.add_argument("--execute-metadata", action="store_true")
runbook.add_argument("--actor", default=os.environ.get("USER", "operator"))
runbook.add_argument("--confirm")
runbook.add_argument("--bao-bin", default=os.environ.get("BAO_BIN", "bao"))
runbook.add_argument("--record-state-hub", action="store_true")
runbook.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
runbook.set_defaults(func=command_runbook)
evidence = sub.add_parser(
"record-evidence",
help="Append non-secret apply/verification evidence to a CCR",
)
evidence.add_argument("ref")
evidence.add_argument("--actor", required=True)
evidence.add_argument("--kind", required=True)
evidence.add_argument("--result", required=True)
evidence.add_argument("--detail", action="append", required=True)
evidence.add_argument("--status", choices=sorted(ALLOWED_STATUSES))
evidence.add_argument("--frontdoor-ready", action="store_true")
evidence.add_argument("--record-state-hub", action="store_true")
evidence.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
evidence.set_defaults(func=command_record_evidence)
lifecycle_plan = sub.add_parser(
"lifecycle-plan",
help="Render deactivation, rotation, or compromise lifecycle guidance",
)
lifecycle_plan.add_argument("ref")
lifecycle_plan.add_argument("--action", choices=sorted(LIFECYCLE_ACTIONS), required=True)
lifecycle_plan.add_argument("--json", action="store_true")
lifecycle_plan.set_defaults(func=command_lifecycle_plan)
lifecycle_event = sub.add_parser(
"lifecycle-event",
help="Record a non-secret deactivation, rotation, or compromise event",
)
lifecycle_event.add_argument("ref")
lifecycle_event.add_argument("--action", choices=sorted(LIFECYCLE_ACTIONS), required=True)
lifecycle_event.add_argument("--actor", required=True)
lifecycle_event.add_argument("--reason", required=True)
lifecycle_event.add_argument("--detail", action="append", required=True)
lifecycle_event.add_argument("--blast-radius", action="append", default=[])
lifecycle_event.add_argument("--follow-up", action="append", default=[])
lifecycle_event.add_argument("--record-state-hub", action="store_true")
lifecycle_event.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
lifecycle_event.set_defaults(func=command_lifecycle_event)
inventory = sub.add_parser(
"import-inventory",
help="Create a non-secret CCR for an existing credential lane",
)
inventory.add_argument("id")
inventory.add_argument("--title", required=True)
inventory.add_argument("--tenant", required=True)
inventory.add_argument("--workload", required=True)
inventory.add_argument("--environment", required=True)
inventory.add_argument("--purpose", required=True)
inventory.add_argument("--mount", default="platform")
inventory.add_argument("--kv-path", required=True)
inventory.add_argument("--field", action="append", required=True)
inventory.add_argument("--policy-name")
inventory.add_argument("--policy-file")
inventory.add_argument("--auth-method", choices=("oidc", "kubernetes"), required=True)
inventory.add_argument("--auth-mount", required=True)
inventory.add_argument("--auth-role", required=True)
inventory.add_argument("--bound-claim", action="append", default=[])
inventory.add_argument("--redirect-uri", action="append")
inventory.add_argument("--service-account", action="append")
inventory.add_argument("--service-account-namespace", action="append")
inventory.add_argument("--bound-claims-confirmed", action="store_true")
inventory.add_argument("--ttl", default="15m")
inventory.add_argument("--frontdoor-type", required=True)
inventory.add_argument("--catalog-id", required=True)
inventory.add_argument("--selector")
inventory.add_argument("--command")
inventory.add_argument("--status", choices=sorted(ALLOWED_STATUSES), default="active")
inventory.add_argument("--readiness", choices=sorted(FRONTDOOR_READINESS), default="ready")
inventory.add_argument("--resolvable", action="store_true")
inventory.add_argument("--risk", default="high")
inventory.add_argument("--positive-check", default="Authorized caller can fetch the named field without printing the value.")
inventory.add_argument("--negative-check", default="Unauthorized caller cannot read the path or field.")
inventory.add_argument("--requester-agent", default="codex")
inventory.add_argument("--actor", default=os.environ.get("USER", "operator"))
inventory.add_argument("--reason", required=True)
inventory.add_argument("--output-dir", default=str(DEFAULT_CCR_DIR))
inventory.add_argument("--write-policy", action=argparse.BooleanOptionalAction, default=True)
inventory.set_defaults(func=command_import_inventory)
for name, status in (
("approve", "approved"),
("deny", "denied"),
("needs-changes", "needs_changes"),
):
decision = sub.add_parser(name, help=f"Record {status} decision")
decision.add_argument("ref")
decision.add_argument("--reviewer", required=True)
decision.add_argument("--comment", required=True)
decision.add_argument("--record-state-hub", action="store_true")
decision.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
decision.set_defaults(func=lambda args, status=status: command_decision(args, status))
2026-06-27 23:45:31 +02:00
binding = sub.add_parser(
"confirm-binding",
help="Record that the non-secret OpenBao auth binding was confirmed",
)
binding.add_argument("ref")
binding.add_argument("--reviewer", required=True)
binding.add_argument("--comment", required=True)
binding.set_defaults(func=command_confirm_binding)
sync_decision = sub.add_parser(
"sync-decision",
help="Sync an approved/denied/needs_changes CCR decision from State Hub",
)
sync_decision.add_argument("ref")
sync_decision.add_argument(
"--state-hub-url",
default=os.environ.get("STATE_HUB_URL", "http://127.0.0.1:8000"),
)
sync_decision.set_defaults(func=command_sync_decision)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())