Complete Workflow Example

A complete, production-ready example of an agent workflow using TraceMem.

Overview

This example demonstrates a complete workflow: reading customer data and incidents, evaluating a discount policy, requesting approval if needed, and creating an order.

Example Code

This example demonstrates a complete workflow: reading customer data and incidents, evaluating a discount policy, requesting approval if needed, and creating an order.

text
#!/usr/bin/env python3
"""
Complete agent workflow example: Incident-based discount order processing.
"""

import os
import sys
import json
import time
import requests
from typing import Dict, Any, Optional

class MCPClient:
    """Simple MCP client for Agent MCP server."""
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Agent {api_key}",
            "Content-Type": "application/json"
        })
        self.request_id = 0
        self.initialized = False
    
    def _next_id(self) -> int:
        self.request_id += 1
        return self.request_id
    
    def _call(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        request = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": method,
            "params": params or {}
        }
        response = self.session.post(self.base_url, json=request)
        response.raise_for_status()
        result = response.json()
        if "error" in result:
            error = result["error"]
            raise Exception(f"MCP Error {error['code']}: {error['message']}")
        return result.get("result", {})
    
    def initialize(self) -> Dict[str, Any]:
        result = self._call("initialize", {
            "protocolVersion": "2024-11-05",
            "capabilities": {},
            "clientInfo": {"name": "discount-agent", "version": "1.0.0"}
        })
        self.initialized = True
        return result
    
    def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        if not self.initialized:
            self.initialize()
        
        result = self._call("tools/call", {
            "name": name,
            "arguments": arguments
        })
        
        if "content" in result and result["content"]:
            content = result["content"][0]
            if content.get("type") == "text":
                try:
                    return json.loads(content["text"])
                except json.JSONDecodeError:
                    return {"raw_text": content["text"]}
        return result


def process_discount_order(customer_id: str, proposed_discount: float):
    """Process a discount order with approval workflow."""
    
    mcp_url = os.getenv("MCP_AGENT_URL", "https://mcp.tracemem.com")
    api_key = os.getenv("TRACEMEM_API_KEY")
    instance = os.getenv("TRACEMEM_INSTANCE", "my-agent-v1")
    
    if not api_key:
        raise Exception("TRACEMEM_API_KEY environment variable is required")
    
    client = MCPClient(mcp_url, api_key)
    client.initialize()
    
    decision_id = None
    try:
        # 1. Create decision envelope
        decision = client.call_tool("decision_create", {
            "intent": "customer.order.discount.evaluate",
            "automation_mode": "propose",
            "instance": instance,
            "metadata": {
                "customer_id": customer_id,
                "proposed_discount": str(proposed_discount)
            }
        })
        decision_id = decision["decision_id"]
        print(f"Created decision: {decision_id}")
        
        # 2. Read customer data
        customer = client.call_tool("decision_read", {
            "decision_id": decision_id,
            "product": "customers_v1",
            "purpose": "renewal_context",
            "query": {"id": customer_id}
        })
        customer_data = customer.get("records", [])
        if not customer_data:
            raise Exception("Customer not found")
        
        # 3. Read customer incidents (for justification)
        incidents = client.call_tool("decision_read", {
            "decision_id": decision_id,
            "product": "incidents_v1",
            "purpose": "renewal_context",
            "query": {"customer_id": customer_id}
        })
        incidents_data = incidents.get("records", [])
        critical_incidents = [
            i for i in incidents_data 
            if i.get("severity_level") == 1 and i.get("status") in ["open", "in_progress"]
        ]
        
        # 4. Evaluate discount policy
        justification = ""
        if critical_incidents:
            justification = f"Customer has {len(critical_incidents)} critical open incidents."
        
        policy_result = client.call_tool("decision_evaluate", {
            "decision_id": decision_id,
            "policy_id": "discount_cap_v1",
            "inputs": {
                "proposed_discount": proposed_discount,
                "justification": justification
            }
        })
        
        outcome = policy_result.get("outcome", "unknown")
        print(f"Policy outcome: {outcome}")
        
        # 5. Handle policy outcome
        if outcome == "deny":
            print("Discount denied by policy")
            client.call_tool("decision_close", {
                "decision_id": decision_id,
                "action": "rollback"
            })
            return False
        
        elif outcome == "requires_exception":
            print("Approval required")
            
            # Build approval message
            message = f"Customer {customer_id} requesting {proposed_discount*100:.0f}% discount.\n\n"
            if critical_incidents:
                message += f"JUSTIFICATION: Customer has {len(critical_incidents)} critical open incidents.\n"
                for inc in critical_incidents:
                    message += f"  • {inc.get('title')}\n"
            
            # Request approval
            approval = client.call_tool("decision_request_approval", {
                "decision_id": decision_id,
                "title": f"{proposed_discount*100:.0f}% Discount Approval",
                "message": message,
                "require_rationale": True,
                "expires_in_seconds": 3600
            })
            
            # Poll for approval
            max_wait = 300
            waited = 0
            while waited < max_wait:
                time.sleep(5)
                decision_status = client.call_tool("decision_get", {
                    "decision_id": decision_id
                })
                
                status = decision_status.get("status", "unknown")
                if status == "approved":
                    print("Approval granted!")
                    break
                elif status == "rejected":
                    print("Approval rejected")
                    client.call_tool("decision_close", {
                        "decision_id": decision_id,
                        "action": "rollback"
                    })
                    return False
                
                waited += 5
            
            if waited >= max_wait:
                print("Approval timeout")
                client.call_tool("decision_close", {
                    "decision_id": decision_id,
                    "action": "rollback"
                })
                return False
        
        # 6. Write order (if allowed/approved)
        write_result = client.call_tool("decision_write", {
            "decision_id": decision_id,
            "product": "orders_v1",
            "purpose": "order_creation",
            "mutation": {
                "operation": "insert",
                "records": [{
                    "customer_id": customer_id,
                    "discount": proposed_discount,
                    "status": "pending"
                }]
            }
        })
        print(f"Order created: {write_result.get('event_id')}")
        
        # 7. Close decision
        client.call_tool("decision_close", {
            "decision_id": decision_id,
            "action": "commit"
        })
        print("Decision committed")
        
        return True
        
    except Exception as e:
        if decision_id:
            client.call_tool("decision_close", {
                "decision_id": decision_id,
                "action": "rollback"
            })
        raise


if __name__ == "__main__":
    import sys
    customer_id = sys.argv[1] if len(sys.argv) > 1 else "1002"
    discount = float(sys.argv[2]) if len(sys.argv) > 2 else 0.20
    
    process_discount_order(customer_id, discount)

Walkthrough

This example demonstrates:

  1. Creating a decision envelope - Wraps the entire discount evaluation in a traceable decision
  2. Reading customer data - Accesses customer information through a governed data product
  3. Reading incidents - Gathers context (critical incidents) to justify the discount
  4. Evaluating policy - Checks if the discount is allowed based on policy rules
  5. Handling outcomes:
    • If allow: Proceeds directly to create the order
    • If requires_exception: Requests approval and polls for status
    • If deny: Rolls back the decision
  6. Writing data - Creates the order if allowed or approved
  7. Closing the decision - Commits or rolls back based on outcome

Key Concepts

[Content to be filled]

Error Handling

[Content to be filled]

TraceMem is trace-native infrastructure for AI agents