AI Log Analyzer Architecture
Developer Deep Dive: The Hybrid AI Log Analyzer Architecture
The AI-powered Investigation View introduces a powerful capability for automated, large-scale log analysis. Unlike other LLM features that operate on-demand on a small selection of events, the Log Analyzer is designed to process all active timelines in a sketch, which can involve millions of events. This requirement necessitates a robust, asynchronous, and streaming-native architecture.
This guide provides a technical deep dive into the components that power this feature and outlines the API contract required to connect your own compatible log reasoning agent.
Architectural Overview: A Hybrid Approach
To handle potentially long-running analysis tasks without timing out web requests, we've implemented a hybrid architecture that combines Timesketch's existing Analyzer Framework with the new LLM Feature and Provider system.
This approach gives us the best of both worlds: * Asynchronous Execution: The analysis is triggered as a Celery background task, preventing API timeouts and allowing the user to continue working while the analysis runs. * Decoupling and Reusability: The core logic is encapsulated in the LLM Feature and Provider, separating it from the task queue system. * Scalability: The architecture is built on streaming data (NDJSON), ensuring that memory usage remains low on both the Timesketch server and the AI agent, regardless of the number of events.
The three key components are:
- The Analyzer (
llm_log_analyzer
): The trigger and Celery task wrapper. - The Feature (
LogAnalyzer
): The orchestrator of the entire workflow. - The Provider (
SecGeminiLogAnalyzer
): The communicator that handles the streaming connection to the external AI agent.
Component Breakdown
1. The Analyzer: Triggering the Workflow
The LLMLogAnalyzer
is a lightweight BaseAnalyzer
plugin. Its sole
responsibility is to initiate the analysis process on a Celery worker.
- File:
timesketch/lib/analyzers/llm_log_analyzer.py
# A simplified view of the analyzer's run method
class LLMLogAnalyzer(interface.BaseAnalyzer):
NAME = "llm_log_analyzer"
# ...
def run(self):
# 1. Get the feature instance
feature_instance = feature_manager.FeatureManager.get_feature_instance(
"log_analyzer"
)
# 2. Get the LLM provider instance
llm_provider = llm_provider_manager.LLMManager.create_provider(
feature_name=feature_instance.NAME
)
# 3. Call the feature's execute method to start the real work
result = feature_instance.execute(
sketch=self.sketch.sql_sketch,
form={},
llm_provider=llm_provider,
)
# 4. Summarize the result and update the analyzer status in the UI
self.output.result_summary = f"Log Analyzer finished. Processed {result.get('total_findings_processed', 0)} findings."
return str(self.output)
When a user clicks "Generate Initial Report", this analyzer is queued. It runs
in the background, instantiates the necessary LLM feature and provider, and then
calls the execute
method to kick off the main logic.
2. The Feature: Orchestrating the Data Flow
The LogAnalyzer
feature is the heart of the operation. It manages the entire
process from data extraction to result processing.
- File:
timesketch/lib/llms/features/log_analyzer.py
Its execute
method orchestrates the following steps:
- Prepare Log Stream: It calls
prepare_log_stream_for_analysis
, which creates a Python generator that yields events one by one from all active timelines in the sketch using the OpenSearchsliced_scroll
API. This ensures data is streamed efficiently from the datastore. - Invoke Provider: It passes this log generator to the LLM provider's
generate_stream_from_logs
method. - Process Findings Stream: It receives a generator of "findings" back from the provider. It iterates through each finding as it arrives.
- Commit to Database: For each finding, it calls its own
process_response
method, which creates and commits the necessaryInvestigativeQuestion
andInvestigativeQuestionConclusion
objects to the PostgreSQL database.
3. The Provider: Communicating with the AI Agent
The SecGeminiLogAnalyzer
provider acts as the client for the external AI agent.
It is responsible for the network communication.
- File:
timesketch/lib/llms/providers/secgemini_log_analyzer_agent.py
Key methods:
_stream_log_data_as_ndjson_gen
: This helper method takes the Python generator of log events from the feature and converts each event into a line of a Newline Delimited JSON (NDJSON) stream.generate_stream_from_logs
: This is the main communication method.- It initiates a streaming
requests.post
call to the AI agent's API endpoint. - The
data
parameter of the request is the NDJSON generator, which streams the log data to the agent. - It then listens to the response stream from the agent, using
response.iter_lines()
to read the incoming NDJSON findings one by one. - It
yields
each finding back to the feature layer for processing.
- It initiates a streaming
Experiment With Your Own Log Reasoning Agent
To integrate your own custom log analysis service, you need to build an HTTP
service that adheres to the API contract expected by the SecGeminiLogAnalyzer
provider or create your own custom provider file similar to the SecGeminiLogAnalyzer
that works with the log_analyzer
feature.
API Contract:
Endpoint
Your service must expose an endpoint to receive the log data. The default is /analyze_logs
.
Request Format
The provider will send a streaming POST
request with the following characteristics:
* Method: POST
* Headers:
* Content-Type: application/x-ndjson
* Accept: application/x-ndjson
* Body: A stream of newline-delimited JSON objects. Each object is a complete Timesketch event.
Example Request Body (NDJSON stream):
{"_index": "a1b2c3d4...", "_id": "event_id_1", "_source": {"message": "User 'admin' logged in from 192.168.1.100", "timestamp": 1672531200000000, ...}}
{"_index": "a1b2c3d4...", "_id": "event_id_2", "_source": {"message": "Process 'evil.exe' created by pid 1234", "timestamp": 1672531201000000, ...}}
Response Format
Your agent must stream back a response with the following characteristics:
* Headers: Content-Type: application/x-ndjson
* Body: A stream of newline-delimited JSON objects. Each line should be a
complete, self-contained JSON object representing a question and "finding"
that links back to an original Timesketch event.
Finding Object Schema:
The Timesketch backend will parse each incoming JSON object from the stream. The following table details the schema for a single finding object.
Key | Type | Required | Description |
---|---|---|---|
record_id |
String | Yes | The _id of the original Timesketch event this finding relates to. |
annotations |
List of Objects | Yes | A list of annotation objects. Each annotation will generate DFIQ objects in Timesketch. |
_index or __ts_index_name |
String | No | The OpenSearch index of the original event. Including this is recommended for performance. |
error |
String | No | If an error occurred while processing, send an object with this key instead of a finding. |
Annotation Object Schema:
Each object inside the annotations
list must have the following structure:
Key | Type | Required | Description |
---|---|---|---|
investigative_question |
String | Yes | The DFIQ question that was generated. This will become an InvestigativeQuestion . |
conclusions |
List of Strings | Yes | A list of answers or findings for the question. Each will become an InvestigativeQuestionConclusion . |
priority |
String | No | The priority for the question. Can be low , medium , or high . |
attack_stage |
String | No | A suggested attack stage (e.g., MITRE ATT&CK Tactic). This is stored as a question attribute. |
Example of a Single Finding Object (one line in the NDJSON stream):
{
"annotations": [
{
"attack_stage": "Initial access",
"priority": "high",
"investigative_question": "What is the initial access vector?",
"conclusions": [
"Successful password-based root login from a known Tor exit node IP address, indicating potential anonymity seeking by the attacker."
]
},
{
"attack_stage": "Persistence",
"investigative_question": "What is the persistence mechanism?",
"conclusions": [
"Creation of a malicious cron job file for cryptocurrency mining."
]
}
],
"record_id": "8XdJUJgB092O9Z5p3KNH",
}
A Note on API Flexibility:
The Timesketch log_analyzer
feature will only parse the fields specified as
"Required" in the schemas above. Any other fields in your finding or annotation
objects will be safely ignored. This allows you to include additional metadata
in your agent's output for your own logging or debugging purposes without
breaking the integration. If you want to utilize any additional fields, you need
to create your own LLM feature file.
Configuration in Timesketch
Once your agent is deployed, update your timesketch.conf
to point to it:
LLM_PROVIDER_CONFIGS = {
'log_analyzer': {
'<PROVIDER>_log_analyzer_agent': {
'server_url': 'http://YOUR_AGENT_HOST:PORT',
'api_key': '<YOUR_API_KEY_IF_ANY>',
}
}
}
By following this architecture and API contract, you can seamlessly integrate custom, large-scale log analysis capabilities into the Timesketch Investigation View.