Event Lineage Workflow - From Incident to Full Event Chain
Question: How can I use the Cyberhaven API to regularly retrieve multiple incidents and gather the complete lineage of events that led to each one?
Answer Summary: The API's for Cyberhaven have been built to help ensure that common workflows like gathering information for incident investigation can be achieved in a automated manner. The included descriptions and example code helps to explain a proposed structure for the complete workflow for using the API to trace incidents from initial detection through their full event lineage. The suggested design shows a pattern for regular scheduled execution with timestamp filtering and batch processing capabilities.
Important Note: The example python code snippets have not been tested in real environments and is provided as an indication of the structure only, do not rely on the code to deliver working functionality.
API Workflow Overview
The following diagram illustrates the complete API workflow for processing multiple incidents and their event lineages:
Overview
This guide demonstrates how to use the Cyberhaven API to:
- Authenticate and obtain access tokens for API calls
- Retrieve incidents from the platform using timestamp filters for regular execution
- Process multiple incidents in batch for efficient analysis
- Extract event lineage information from each incident
- Get complete event lineage chains for forensic analysis
- Retrieve detailed information for all events in each lineage
- Schedule regular execution for continuous monitoring
This workflow is designed for regular scheduled execution (hourly, daily, or custom intervals) and is essential for:
- Continuous incident monitoring and automated analysis
- Forensic investigation of security events
- Compliance reporting with complete audit trails
- Risk assessment through pattern analysis
- Automated alerting based on event characteristics
Scheduling Recommendations
For optimal performance and coverage, consider these scheduling options:
- High-volume environments: Every 15-30 minutes
- Standard environments: Every 1-2 hours
- Low-volume environments: Every 4-6 hours
- Compliance-focused: Daily with comprehensive reporting
The script uses timestamp filtering to ensure only new incidents since the last run are processed, preventing duplicate analysis.
Prerequisites
- Valid Cyberhaven API key
- Python environment with
requestslibrary - Access to the Cyberhaven API endpoints
- File system access for storing last run timestamps (for scheduled execution)
Step 1: Authentication and Configuration
Purpose: This section establishes the connection to the Cyberhaven API using your API key. The authentication process uses the /v2/auth/token/access endpoint to exchange your long-lived API key for a short-lived access token (15-minute expiry).
API Route: POST /v2/auth/token/access
Data Flow: API Key → Access Token → Used in all subsequent API calls
import requests
import json
import os
import time
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional, Any
from collections import defaultdict
# Configuration for scheduled execution
CYBERHAVEN_HOST = "your-cyberhaven-hostname" # Replace with your hostname
API_KEY = "your-api-key" # Replace with your API key
LAST_RUN_FILE = "last_incident_run.txt" # File to track last execution timestamp
RESULTS_DIR = "incident_analysis_results" # Directory to store analysis results
# Rate limiting configuration - 10 queries per second per endpoint
RATE_LIMIT_QPS = 10 # Queries per second
RATE_LIMIT_DELAY = 1.0 / RATE_LIMIT_QPS # 0.1 seconds between requests
class RateLimiter:
"""
Rate limiter to ensure API calls don't exceed 10 queries per second per endpoint.
This class tracks the timing of API calls for each endpoint and enforces
a delay between requests to stay within the rate limits.
"""
def __init__(self, qps: float = RATE_LIMIT_QPS):
self.qps = qps
self.delay = 1.0 / qps
self.last_call_times = defaultdict(float)
def wait_if_needed(self, endpoint: str) -> None:
"""
Wait if necessary to respect rate limits for the given endpoint.
Args:
endpoint: The API endpoint being called (e.g., 'auth', 'incidents', 'lineage', 'events')
"""
current_time = time.time()
last_call = self.last_call_times[endpoint]
time_since_last = current_time - last_call
if time_since_last < self.delay:
sleep_time = self.delay - time_since_last
print(f" Rate limiting: waiting {sleep_time:.3f}s for {endpoint} endpoint")
time.sleep(sleep_time)
self.last_call_times[endpoint] = time.time()
# Global rate limiter instance
rate_limiter = RateLimiter()
def get_access_token(api_key: str) -> str:
"""
Authenticate with Cyberhaven API and obtain access token.
This function calls the authentication endpoint to exchange the API key
for a short-lived access token that will be used for all subsequent API calls.
Rate limited to 10 QPS.
Args:
api_key: Your Cyberhaven API key
Returns:
Access token string for API authentication
Raises:
Exception: If authentication fails
"""
rate_limiter.wait_if_needed('auth')
url = f"https://{CYBERHAVEN_HOST}/public/v2/auth/token/access"
headers = {
'Content-Type': 'application/json'
}
data = {
"refresh_token": api_key
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json().get('access_token')
else:
raise Exception(f"Failed to get access token: {response.status_code} - {response.text}")
def get_last_run_timestamp() -> Optional[str]:
"""
Retrieve the timestamp of the last successful script execution.
This enables incremental processing by only fetching incidents that occurred
since the last run, preventing duplicate analysis and improving efficiency.
Returns:
ISO timestamp string of last run, or None if first execution
"""
if os.path.exists(LAST_RUN_FILE):
with open(LAST_RUN_FILE, 'r') as f:
return f.read().strip()
return None
def save_last_run_timestamp(timestamp: str) -> None:
"""
Save the current execution timestamp for the next run.
Args:
timestamp: ISO timestamp string to save
"""
with open(LAST_RUN_FILE, 'w') as f:
f.write(timestamp)
# Initialize authentication
print("Step 1: Authenticating with Cyberhaven API...")
access_token = get_access_token(API_KEY)
print(f"✓ Access token obtained: {access_token[:20]}...")
# Get last run timestamp for incremental processing
last_run = get_last_run_timestamp()
current_run = datetime.now(timezone.utc).isoformat()
print(f"✓ Last run: {last_run or 'First execution'}")
print(f"✓ Current run: {current_run}")
Step 2: Retrieve Incidents with Timestamp Filtering
Purpose: This section queries the Cyberhaven platform for incidents using the /v2/incidents/list endpoint. It implements timestamp filtering to only process new incidents since the last execution, making it suitable for regular scheduled runs.
API Route: POST /v2/incidents/list
Data Flow: Access Token + Filters → List of Incident Objects → Extracted for processing
Key Features:
- Timestamp filtering for incremental processing
- Configurable severity and status filters
- Pagination support for large result sets
- Sorting by trigger time (newest first)
def list_incidents_with_timestamp_filter(access_token: str,
last_run_timestamp: Optional[str] = None,
additional_filters: Optional[Dict] = None,
page_size: int = 100) -> List[Dict]:
"""
Retrieve incidents from Cyberhaven with timestamp filtering for scheduled execution.
This function queries the incidents endpoint with filters to only retrieve
incidents that have occurred since the last script execution. This enables
efficient incremental processing for regular scheduled runs.
Rate limited to 10 QPS.
Args:
access_token: Valid API access token
last_run_timestamp: ISO timestamp of last execution (None for first run)
additional_filters: Optional additional filters (severity, status, etc.)
page_size: Number of incidents to retrieve per page
Returns:
List of incident dictionaries
"""
rate_limiter.wait_if_needed('incidents')
url = f"https://{CYBERHAVEN_HOST}/public/v2/incidents/list"
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
# Build filters for the request
filters = {}
# Add timestamp filter if this is not the first run
if last_run_timestamp:
filters["start_time"] = last_run_timestamp
print(f" Filtering incidents since: {last_run_timestamp}")
# Add additional filters (severity, status, etc.)
if additional_filters:
filters.update(additional_filters)
print(f" Additional filters applied: {additional_filters}")
# Request body for the incidents API
data = {
"page_request": {
"size": page_size,
"sort_by": "trigger_time desc" # Most recent incidents first
},
"include_ai_summaries": True # Include AI analysis if available
}
# Add filters to request if any are specified
if filters:
data["filter"] = filters
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
response_data = response.json()
incidents = response_data.get('resources', [])
total_count = response_data.get('page_response', {}).get('total', len(incidents))
print(f" Retrieved {len(incidents)} incidents (Total available: {total_count})")
return incidents
else:
raise Exception(f"Failed to list incidents: {response.status_code} - {response.text}")
# Configure filters for incident retrieval
print("\nStep 2: Retrieving incidents with timestamp filtering...")
# Example filters - customize based on your requirements
incident_filters = {
"policy_severities": ["high", "critical"], # Focus on high-priority incidents
"resolution_statuses": ["open"], # Only unresolved incidents
# "created_by": ["policy"], # Only policy-created incidents (not AI)
# "dataset_sensitivities": ["high", "critical"] # High-sensitivity data only
}
# Retrieve incidents since last run
incidents = list_incidents_with_timestamp_filter(
access_token=access_token,
last_run_timestamp=last_run,
additional_filters=incident_filters,
page_size=100
)
print(f"✓ Found {len(incidents)} new incidents to process")
# Display summary of retrieved incidents
if incidents:
print("\nIncident Summary:")
for i, incident in enumerate(incidents[:10]): # Show first 10
print(f" {i+1}. ID: {incident['id'][:20]}...")
print(f" User: {incident.get('user', {}).get('local_username', 'Unknown')}")
print(f" Policy: {incident.get('policy', {}).get('name', 'Unknown')}")
print(f" Trigger Time: {incident.get('trigger_time', 'Unknown')}")
print(f" Risk Score: {incident.get('risk_score', 'Unknown')}")
print(f" Dataset: {incident.get('dataset', {}).get('name', 'Unknown')}")
if len(incidents) > 10:
print(f" ... and {len(incidents) - 10} more incidents")
else:
print(" No new incidents found since last run")
Step 3: Process Multiple Incidents - Extract Event Lineage Information
Purpose: This section processes each incident retrieved from Step 2 to extract the event lineage identifiers. Each incident contains start_event_id and end_event_id which define the boundaries of the event chain that led to the incident.
Data Flow: Incident Objects → Event Lineage IDs → Prepared for lineage retrieval
Key Information Extracted:
- Incident metadata (ID, user, policy, dataset, risk score)
- Event lineage boundaries (start and end event IDs)
- Contextual information for analysis
def extract_event_lineage_info(incident: Dict) -> Dict:
"""
Extract event lineage information and metadata from an incident.
This function parses the incident object to extract the event lineage
identifiers and relevant metadata needed for the complete analysis.
Args:
incident: Incident dictionary from the API response
Returns:
Dictionary containing lineage info and incident metadata
"""
event_lineage = incident.get('event_lineage_id', {})
start_event_id = event_lineage.get('start_event_id')
end_event_id = event_lineage.get('end_event_id')
return {
'incident_id': incident.get('id'),
'start_event_id': start_event_id,
'end_event_id': end_event_id,
'user': incident.get('user', {}).get('local_username'),
'trigger_time': incident.get('trigger_time'),
'policy_name': incident.get('policy', {}).get('name'),
'dataset_name': incident.get('dataset', {}).get('name'),
'risk_score': incident.get('risk_score'),
'blocked': incident.get('blocked', False),
'ai_severity': incident.get('ai_severity'),
'created_by': incident.get('created_by')
}
# Process all incidents to extract lineage information
print("\nStep 3: Processing incidents to extract event lineage information...")
incident_lineage_data = []
incidents_with_lineage = 0
for i, incident in enumerate(incidents):
lineage_info = extract_event_lineage_info(incident)
incident_lineage_data.append(lineage_info)
# Check if incident has valid lineage data
if lineage_info['start_event_id'] and lineage_info['end_event_id']:
incidents_with_lineage += 1
print(f" ✓ Incident {i+1}: {lineage_info['incident_id'][:20]}... has lineage data")
print(f" User: {lineage_info['user']}")
print(f" Policy: {lineage_info['policy_name']}")
print(f" Risk Score: {lineage_info['risk_score']}")
print(f" Event Range: {lineage_info['start_event_id'][:8]}...→{lineage_info['end_event_id'][:8]}...")
else:
print(f" ⚠ Incident {i+1}: {lineage_info['incident_id'][:20]}... missing lineage data")
print(f"\n✓ Processed {len(incidents)} incidents")
print(f"✓ {incidents_with_lineage} incidents have complete lineage data")
Step 4: Retrieve Complete Event Lineages
Purpose: For each incident with valid lineage data, this section calls the /v2/event-lineage endpoint to retrieve the complete chain of event IDs between the start and end events. This provides the full sequence of events that led to the incident.
API Route: POST /v2/event-lineage
Data Flow: Start/End Event IDs → Complete Event ID Chain → Prepared for detailed retrieval
def get_event_lineage(access_token: str, start_event_id: str, end_event_id: str) -> List[str]:
"""
Retrieve the complete event lineage between two events.
This function calls the event lineage endpoint to get the full chain
of events that occurred between the start and end events, providing
the complete audit trail for the incident.
Rate limited to 10 QPS.
Args:
access_token: Valid API access token
start_event_id: ID of the first event in the lineage
end_event_id: ID of the last event in the lineage
Returns:
List of event IDs in chronological order
"""
rate_limiter.wait_if_needed('lineage')
url = f"https://{CYBERHAVEN_HOST}/public/v2/event-lineage"
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
data = {
"start_event_id": start_event_id,
"end_event_id": end_event_id
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json().get('resources', [])
else:
raise Exception(f"Failed to get event lineage: {response.status_code} - {response.text}")
# Retrieve event lineages for all incidents with valid lineage data
print("\nStep 4: Retrieving complete event lineages...")
all_incident_lineages = []
for i, lineage_info in enumerate(incident_lineage_data):
if lineage_info['start_event_id'] and lineage_info['end_event_id']:
try:
print(f" Processing incident {i+1}: {lineage_info['incident_id'][:20]}...")
event_ids = get_event_lineage(
access_token,
lineage_info['start_event_id'],
lineage_info['end_event_id']
)
lineage_info['event_ids'] = event_ids
lineage_info['event_count'] = len(event_ids)
all_incident_lineages.append(lineage_info)
print(f" ✓ Retrieved {len(event_ids)} events in lineage")
except Exception as e:
print(f" ✗ Failed to get lineage: {str(e)}")
lineage_info['event_ids'] = []
lineage_info['event_count'] = 0
else:
print(f" Skipping incident {i+1}: No lineage data available")
print(f"\n✓ Successfully retrieved lineages for {len(all_incident_lineages)} incidents")
total_events = sum(lineage['event_count'] for lineage in all_incident_lineages)
print(f"✓ Total events to analyze: {total_events}")
Step 5: Retrieve Detailed Event Information
Purpose: This section uses the /v2/event-details endpoint to retrieve comprehensive information for all events in all lineages. It processes events in batches for efficiency and extracts key forensic information from each event.
API Route: POST /v2/event-details
Data Flow: Event ID Lists → Detailed Event Objects → Analyzed Event Data
def get_event_details_batch(access_token: str, event_ids: List[str]) -> List[Dict]:
"""
Retrieve detailed information for multiple events in a single API call.
This function efficiently retrieves comprehensive event data including
actions, users, source/destination information, and content details.
Args:
access_token: Valid API access token
event_ids: List of event IDs to retrieve details for
Returns:
List of detailed event dictionaries
"""
url = f"https://{CYBERHAVEN_HOST}/public/v2/event-details"
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
data = {
"ids": event_ids
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json().get('resources', [])
else:
raise Exception(f"Failed to get event details: {response.status_code} - {response.text}")
def analyze_event(event: Dict) -> Dict:
"""
Extract and structure key information from an event for analysis.
This function parses the complex event structure to extract the most
relevant information for incident investigation and forensic analysis.
Args:
event: Raw event dictionary from the API
Returns:
Structured dictionary with key event information
"""
action = event.get('action', {})
user = event.get('user', {})
source = event.get('source', {})
destination = event.get('destination', {})
return {
'event_id': event.get('id'),
'timestamp': event.get('timestamp'),
'action_kind': action.get('kind'),
'blocked': action.get('blocked', False),
'data_size': action.get('data_size', 0),
'hostname': action.get('hostname'),
'sensor_kind': action.get('sensor_kind'),
'user_name': user.get('local_username'),
'user_id': user.get('id'),
'source_file': source.get('file', {}).get('name') if source else None,
'source_path': source.get('local_file', {}).get('path') if source else None,
'dest_file': destination.get('file', {}).get('name') if destination else None,
'dest_path': destination.get('local_file', {}).get('path') if destination else None,
'dest_web_domain': destination.get('web', {}).get('domain') if destination else None,
'dest_web_url': destination.get('web', {}).get('url') if destination else None,
'content_tags': source.get('content', {}).get('tags', []) if source else [],
'email_to': destination.get('email', {}).get('to', []) if destination else [],
'email_subject': destination.get('email', {}).get('subject') if destination else None
}
# Process all incidents to retrieve detailed event information
print("\nStep 5: Retrieving detailed event information...")
# Create results directory if it doesn't exist
os.makedirs(RESULTS_DIR, exist_ok=True)
all_analysis_results = []
batch_size = 50 # Process events in batches to avoid API limits
for incident_idx, lineage_info in enumerate(all_incident_lineages):
incident_id = lineage_info['incident_id']
event_ids = lineage_info['event_ids']
if not event_ids:
continue
print(f"\n Processing incident {incident_idx + 1}: {incident_id[:20]}...")
print(f" Retrieving details for {len(event_ids)} events...")
# Process events in batches
all_events = []
for i in range(0, len(event_ids), batch_size):
batch_ids = event_ids[i:i + batch_size]
try:
batch_events = get_event_details_batch(access_token, batch_ids)
all_events.extend(batch_events)
print(f" ✓ Retrieved batch {i/batch_size + 1}: {len(batch_events)} events")
except Exception as e:
print(f" ✗ Failed to retrieve batch {i/batch_size + 1}: {str(e)}")
# Analyze all events for this incident
analyzed_events = [analyze_event(event) for event in all_events]
incident_analysis = {
'incident_info': lineage_info,
'events': analyzed_events,
'analysis_timestamp': current_run,
'total_events': len(analyzed_events),
'blocked_events': sum(1 for e in analyzed_events if e['blocked']),
'total_data_size': sum(e['data_size'] for e in analyzed_events),
'unique_users': len(set(e['user_name'] for e in analyzed_events if e['user_name'])),
'unique_hostnames': len(set(e['hostname'] for e in analyzed_events if e['hostname'])),
'action_types': list(set(e['action_kind'] for e in analyzed_events if e['action_kind']))
}
all_analysis_results.append(incident_analysis)
print(f" ✓ Analyzed {len(analyzed_events)} events")
print(f" - Blocked events: {incident_analysis['blocked_events']}")
print(f" - Total data size: {incident_analysis['total_data_size']:,} bytes")
print(f" - Unique users: {incident_analysis['unique_users']}")
print(f" - Action types: {', '.join(incident_analysis['action_types'][:5])}")
print(f"\n✓ Completed analysis of {len(all_analysis_results)} incidents")
Step 6: Generate Analysis Report and Save Results
Purpose: This final section generates a comprehensive analysis report, saves detailed results to files, and updates the last run timestamp for the next scheduled execution.
Data Flow: Analyzed Event Data → Summary Report → Saved Results → Updated Timestamp
def generate_summary_report(analysis_results: List[Dict]) -> Dict:
"""
Generate a comprehensive summary report of all analyzed incidents.
Args:
analysis_results: List of incident analysis dictionaries
Returns:
Summary report dictionary
"""
if not analysis_results:
return {"message": "No incidents analyzed"}
total_incidents = len(analysis_results)
total_events = sum(result['total_events'] for result in analysis_results)
total_blocked = sum(result['blocked_events'] for result in analysis_results)
total_data_size = sum(result['total_data_size'] for result in analysis_results)
# Aggregate statistics
all_users = set()
all_hostnames = set()
all_action_types = set()
high_risk_incidents = []
for result in analysis_results:
incident_info = result['incident_info']
# Collect unique values
for event in result['events']:
if event['user_name']:
all_users.add(event['user_name'])
if event['hostname']:
all_hostnames.add(event['hostname'])
if event['action_kind']:
all_action_types.add(event['action_kind'])
# Identify high-risk incidents
if (incident_info.get('risk_score', 0) > 7.0 or
result['blocked_events'] > 0 or
incident_info.get('policy_name', '').lower().find('critical') != -1):
high_risk_incidents.append({
'incident_id': incident_info['incident_id'],
'risk_score': incident_info.get('risk_score'),
'user': incident_info.get('user'),
'policy': incident_info.get('policy_name'),
'blocked_events': result['blocked_events'],
'total_events': result['total_events']
})
return {
'execution_summary': {
'execution_time': current_run,
'last_run_time': last_run,
'total_incidents_analyzed': total_incidents,
'total_events_analyzed': total_events,
'total_blocked_events': total_blocked,
'total_data_size_bytes': total_data_size
},
'aggregated_statistics': {
'unique_users': len(all_users),
'unique_hostnames': len(all_hostnames),
'unique_action_types': len(all_action_types),
'action_types': sorted(list(all_action_types)),
'high_risk_incidents_count': len(high_risk_incidents)
},
'high_risk_incidents': high_risk_incidents[:10], # Top 10 high-risk incidents
'recommendations': generate_recommendations(analysis_results)
}
def generate_recommendations(analysis_results: List[Dict]) -> List[str]:
"""Generate actionable recommendations based on analysis results."""
recommendations = []
if not analysis_results:
return ["No incidents to analyze - system appears secure"]
total_blocked = sum(result['blocked_events'] for result in analysis_results)
total_events = sum(result['total_events'] for result in analysis_results)
if total_blocked > 0:
recommendations.append(f"Review {total_blocked} blocked events for potential policy adjustments")
# Check for patterns in user behavior
user_incident_count = {}
for result in analysis_results:
user = result['incident_info'].get('user')
if user:
user_incident_count[user] = user_incident_count.get(user, 0) + 1
repeat_users = [user for user, count in user_incident_count.items() if count > 1]
if repeat_users:
recommendations.append(f"Consider additional training for users with multiple incidents: {', '.join(repeat_users[:5])}")
# Check for high-risk patterns
high_risk_count = sum(1 for result in analysis_results
if result['incident_info'].get('risk_score', 0) > 7.0)
if high_risk_count > len(analysis_results) * 0.3:
recommendations.append("High percentage of high-risk incidents detected - review policy sensitivity settings")
return recommendations
# Generate comprehensive analysis report
print("\nStep 6: Generating analysis report and saving results...")
summary_report = generate_summary_report(all_analysis_results)
# Display summary to console
print("\n" + "="*80)
print("INCIDENT LINEAGE ANALYSIS SUMMARY")
print("="*80)
exec_summary = summary_report.get('execution_summary', {})
print(f"Execution Time: {exec_summary.get('execution_time')}")
print(f"Last Run: {exec_summary.get('last_run_time', 'First execution')}")
print(f"Incidents Analyzed: {exec_summary.get('total_incidents_analyzed', 0)}")
print(f"Total Events: {exec_summary.get('total_events_analyzed', 0)}")
print(f"Blocked Events: {exec_summary.get('total_blocked_events', 0)}")
print(f"Total Data Size: {exec_summary.get('total_data_size_bytes', 0):,} bytes")
agg_stats = summary_report.get('aggregated_statistics', {})
print(f"\nUnique Users: {agg_stats.get('unique_users', 0)}")
print(f"Unique Hostnames: {agg_stats.get('unique_hostnames', 0)}")
print(f"Action Types: {', '.join(agg_stats.get('action_types', [])[:10])}")
high_risk = summary_report.get('high_risk_incidents', [])
if high_risk:
print(f"\nHigh-Risk Incidents ({len(high_risk)}):")
for incident in high_risk[:5]:
print(f" - {incident['incident_id'][:20]}... (Risk: {incident['risk_score']}, User: {incident['user']})")
recommendations = summary_report.get('recommendations', [])
if recommendations:
print(f"\nRecommendations:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
# Save detailed results to files
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save summary report
summary_file = os.path.join(RESULTS_DIR, f"incident_analysis_summary_{timestamp_str}.json")
with open(summary_file, 'w') as f:
json.dump(summary_report, f, indent=2, default=str)
print(f"\n✓ Summary report saved: {summary_file}")
# Save detailed results
details_file = os.path.join(RESULTS_DIR, f"incident_analysis_details_{timestamp_str}.json")
with open(details_file, 'w') as f:
json.dump(all_analysis_results, f, indent=2, default=str)
print(f"✓ Detailed results saved: {details_file}")
# Update last run timestamp for next execution
save_last_run_timestamp(current_run)
print(f"✓ Last run timestamp updated: {current_run}")
print("\n" + "="*80)
print("EXECUTION COMPLETED SUCCESSFULLY")
print("="*80)
print(f"Next scheduled run will process incidents after: {current_run}")
Complete Scheduled Workflow Function
Purpose: This comprehensive function combines all the steps above into a single executable workflow designed for scheduled execution. It includes error handling, logging, and automatic recovery capabilities.
def scheduled_incident_lineage_analysis(api_key: str,
cyberhaven_host: str,
incident_filters: Optional[Dict] = None,
max_incidents: int = 50) -> Dict:
"""
Complete scheduled workflow for incident lineage analysis.
This function implements the full workflow for regular execution:
1. Authentication and timestamp management
2. Incident retrieval with filtering
3. Event lineage extraction
4. Detailed event analysis
5. Report generation and result storage
Args:
api_key: Cyberhaven API key
cyberhaven_host: Cyberhaven hostname
incident_filters: Optional filters for incident retrieval
max_incidents: Maximum number of incidents to process per run
Returns:
Dictionary containing execution summary and results
"""
global CYBERHAVEN_HOST
CYBERHAVEN_HOST = cyberhaven_host
execution_start = datetime.now(timezone.utc)
results = {
'execution_start': execution_start.isoformat(),
'success': False,
'incidents_processed': 0,
'events_analyzed': 0,
'errors': []
}
try:
print(f"Starting scheduled incident lineage analysis at {execution_start}")
# Step 1: Authentication
access_token = get_access_token(api_key)
last_run = get_last_run_timestamp()
current_run = execution_start.isoformat()
# Step 2: Retrieve incidents
incidents = list_incidents_with_timestamp_filter(
access_token=access_token,
last_run_timestamp=last_run,
additional_filters=incident_filters,
page_size=min(max_incidents, 100)
)
if not incidents:
results['message'] = "No new incidents found"
results['success'] = True
save_last_run_timestamp(current_run)
return results
# Limit incidents if needed
if len(incidents) > max_incidents:
incidents = incidents[:max_incidents]
print(f"Limited processing to {max_incidents} incidents")
# Step 3: Extract lineage information
incident_lineage_data = []
for incident in incidents:
lineage_info = extract_event_lineage_info(incident)
incident_lineage_data.append(lineage_info)
# Step 4: Retrieve event lineages
all_incident_lineages = []
for lineage_info in incident_lineage_data:
if lineage_info['start_event_id'] and lineage_info['end_event_id']:
try:
event_ids = get_event_lineage(
access_token,
lineage_info['start_event_id'],
lineage_info['end_event_id']
)
lineage_info['event_ids'] = event_ids
lineage_info['event_count'] = len(event_ids)
all_incident_lineages.append(lineage_info)
except Exception as e:
results['errors'].append(f"Failed to get lineage for incident {lineage_info['incident_id']}: {str(e)}")
# Step 5: Retrieve detailed event information
all_analysis_results = []
batch_size = 50
for lineage_info in all_incident_lineages:
if not lineage_info.get('event_ids'):
continue
try:
# Process events in batches
all_events = []
event_ids = lineage_info['event_ids']
for i in range(0, len(event_ids), batch_size):
batch_ids = event_ids[i:i + batch_size]
batch_events = get_event_details_batch(access_token, batch_ids)
all_events.extend(batch_events)
# Analyze events
analyzed_events = [analyze_event(event) for event in all_events]
incident_analysis = {
'incident_info': lineage_info,
'events': analyzed_events,
'analysis_timestamp': current_run,
'total_events': len(analyzed_events),
'blocked_events': sum(1 for e in analyzed_events if e['blocked']),
'total_data_size': sum(e['data_size'] for e in analyzed_events),
'unique_users': len(set(e['user_name'] for e in analyzed_events if e['user_name'])),
'unique_hostnames': len(set(e['hostname'] for e in analyzed_events if e['hostname'])),
'action_types': list(set(e['action_kind'] for e in analyzed_events if e['action_kind']))
}
all_analysis_results.append(incident_analysis)
except Exception as e:
results['errors'].append(f"Failed to analyze incident {lineage_info['incident_id']}: {str(e)}")
# Step 6: Generate report and save results
summary_report = generate_summary_report(all_analysis_results)
# Save results
os.makedirs(RESULTS_DIR, exist_ok=True)
timestamp_str = execution_start.strftime("%Y%m%d_%H%M%S")
summary_file = os.path.join(RESULTS_DIR, f"incident_analysis_summary_{timestamp_str}.json")
with open(summary_file, 'w') as f:
json.dump(summary_report, f, indent=2, default=str)
details_file = os.path.join(RESULTS_DIR, f"incident_analysis_details_{timestamp_str}.json")
with open(details_file, 'w') as f:
json.dump(all_analysis_results, f, indent=2, default=str)
# Update results
results.update({
'success': True,
'incidents_processed': len(all_analysis_results),
'events_analyzed': sum(r['total_events'] for r in all_analysis_results),
'summary_file': summary_file,
'details_file': details_file,
'summary_report': summary_report
})
# Update last run timestamp
save_last_run_timestamp(current_run)
execution_end = datetime.now(timezone.utc)
results['execution_end'] = execution_end.isoformat()
results['execution_duration'] = str(execution_end - execution_start)
print(f"Analysis completed successfully in {results['execution_duration']}")
print(f"Processed {results['incidents_processed']} incidents with {results['events_analyzed']} total events")
return results
except Exception as e:
results['errors'].append(f"Critical error: {str(e)}")
results['success'] = False
print(f"Critical error during execution: {str(e)}")
return results
# Example usage for scheduled execution
if __name__ == "__main__":
# Configuration
API_KEY = "your-api-key-here"
CYBERHAVEN_HOST = "your-cyberhaven-hostname"
# Filters for incident retrieval
filters = {
"policy_severities": ["high", "critical"],
"resolution_statuses": ["open"]
}
# Run the complete scheduled analysis
result = scheduled_incident_lineage_analysis(
api_key=API_KEY,
cyberhaven_host=CYBERHAVEN_HOST,
incident_filters=filters,
max_incidents=25 # Limit for performance
)
# Display execution summary
if result['success']:
print(f"\n✅ Execution successful!")
print(f"📊 Incidents processed: {result['incidents_processed']}")
print(f"🔍 Events analyzed: {result['events_analyzed']}")
print(f"⏱️ Duration: {result.get('execution_duration', 'Unknown')}")
if result.get('summary_report', {}).get('high_risk_incidents'):
high_risk_count = len(result['summary_report']['high_risk_incidents'])
print(f"⚠️ High-risk incidents found: {high_risk_count}")
else:
print(f"\n❌ Execution failed!")
for error in result['errors']:
print(f" Error: {error}")
Scheduling Implementation Examples
Cron Job (Linux/macOS)
# Run every 2 hours
0 */2 * * * /usr/bin/python3 /path/to/incident_lineage_analysis.py >> /var/log/cyberhaven_analysis.log 2>&1
# Run daily at 6 AM
0 6 * * * /usr/bin/python3 /path/to/incident_lineage_analysis.py >> /var/log/cyberhaven_analysis.log 2>&1
Windows Task Scheduler
# Create a scheduled task to run every 4 hours
schtasks /create /tn "Cyberhaven Incident Analysis" /tr "python C:\path\to\incident_lineage_analysis.py" /sc hourly /mo 4
Python Scheduler (for development/testing)
import schedule
import time
def run_analysis():
"""Wrapper function for scheduled execution"""
try:
result = scheduled_incident_lineage_analysis(
api_key=API_KEY,
cyberhaven_host=CYBERHAVEN_HOST,
incident_filters={"policy_severities": ["high", "critical"]},
max_incidents=25
)
print(f"Scheduled run completed: {result['success']}")
except Exception as e:
print(f"Scheduled run failed: {str(e)}")
# Schedule the analysis
schedule.every(2).hours.do(run_analysis) # Every 2 hours
# schedule.every().day.at("06:00").do(run_analysis) # Daily at 6 AM
print("Scheduler started. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
Error Handling and Best Practices
Rate Limiting
The API has rate limits. Implement retry logic with exponential backoff:
import time
import random
def api_request_with_retry(func, max_retries=3, base_delay=1):
"""
Execute API request with retry logic
"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise e
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Request failed, retrying in {delay:.2f} seconds...")
time.sleep(delay)
Token Refresh
Access tokens expire after 15 minutes. Implement automatic refresh:
class CyberhavenAPIClient:
def __init__(self, api_key, hostname):
self.api_key = api_key
self.hostname = hostname
self.access_token = None
self.token_expires_at = None
def get_valid_token(self):
"""
Get a valid access token, refreshing if necessary
"""
if (not self.access_token or
not self.token_expires_at or
datetime.now() >= self.token_expires_at):
self.access_token = get_access_token(self.api_key)
# Tokens expire in 15 minutes, refresh after 14 minutes
self.token_expires_at = datetime.now() + timedelta(minutes=14)
return self.access_token
Use Cases
This workflow is particularly useful for:
- Incident Investigation: Understanding the complete chain of events that led to a security incident
- Forensic Analysis: Tracing data movement and user actions across multiple events
- Compliance Reporting: Documenting the complete audit trail for regulatory requirements
- Risk Assessment: Analyzing patterns in event sequences to identify risk factors
- Policy Tuning: Understanding event chains to improve policy effectiveness
Conclusion
This comprehensive workflow allows you to start with any incident in your Cyberhaven environment and trace it back through its complete event lineage. The Python examples provide a solid foundation that you can extend and customize based on your specific investigation needs.
Remember to:
- Handle API rate limits appropriately
- Refresh access tokens before they expire
- Implement proper error handling for production use
- Consider pagination for large datasets
- Store sensitive information like API keys securely