Realtime Analytics
This document describes the architecture and role of the RTA (Realtime Analytics) Service in the Deepdesk platform. RTA provides near-instantaneous analytics on user interactions, assistant performance, and system metrics in the studio using ClickHouse as the storage backend.
Key Components:
- ClickHouse: Storage and query engine
- Ingestion: Events (user interactions) and Spans (assistant traces)
- Query Architecture: Repositories, services, and API endpoints
- Dashboards: Recommendations, Assistants, Voice Agents analytics
High-Level Flowβ
The main flow for consuming realtime analytics data is as follows:
- Frontend calls analytics API endpoints (e.g.,
/recommendations/charts/series) - API Endpoint triggers the analytics service layer
- Service Layer calls the analytics repository with query parameters
- Repository Layer performs query parsing and SQL template rendering
- ClickHouse Client executes the query against the distributed table
- Service Layer parses and transforms the results (handles NaN/Infinity, formats response)
- API Endpoint returns the formatted data to the frontend
ClickHouseβ
Setupβ
ClickHouse runs in a replicated, non-sharded setup, with for now exactly 2 replicas running in every environment. The replication provides fault tolerance in our Kubernetes setup where pods can scale up or down.
The benefits of this local + distributed table pattern are:
- Automatic load balancing: Queries distributed across healthy replicas
- Fault tolerance: If one replica fails, queries automatically route to others
- Transparent scaling: Add replicas without changing application code
- Consistent interface: Application always queries the distributed table
For every Google project we have a separate ClickHouse instance, and for every account within a project there are two separate databases within the instance. All databases follow the same naming conventions:
- account_name_analytics
- account_name_admin_db
The analytics database is a regular ClickHouse database in which all tables are deployed. The admin_db is represented as a database within the ClickHouse instance but is actually a connection to the operational PostgreSQL database. For specific tables whitelisted by DevOps, SELECT access has been allowed on these tables. This allows for enrichment of analytical data with operational data.
e.g., an assistant's name can change over time when it's renamed. This is only stored in the operational database. In the events and spans in the analytics database, only the assistant_code is stored. By combining these two data sources, we can ensure that the analytics always provide the correct data.
Table creation patternβ
Due to the replicated nature of our setup, it's important that we use replicated table engines within ClickHouse, as they are designed to work in this configuration. In practice, this means that a table is deployed as follows:
-- Step 1: Create a table on the local cluster named "main" (the cluster the migration client connects to)
CREATE TABLE IF NOT EXISTS events_local ON CLUSTER main
(
event_id UUID,
time_utc DateTime64(6, 'UTC') CODEC (Delta(8), ZSTD(1)),
event LowCardinality(String),
profile_code LowCardinality(Nullable(String)),
platform_session_id Nullable(String),
platform LowCardinality(Nullable(String)),
agent_id LowCardinality(Nullable(String)),
agent_email LowCardinality(Nullable(String)),
agent_name LowCardinality(Nullable(String)),
visitor_id Nullable(String),
conversation_id Nullable(String),
data JSON
)
-- Here the replicated version of the MergeTree is used
-- All variables within the curly braces are automatically substituted by ClickHouse during execution
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/events_local', '{replica}')
-- ORDER BY defines the primary index and sort order (optimizes filtering on these columns)
ORDER BY (event, time_utc)
-- PARTITION BY splits data into monthly partitions (enables efficient data management and pruning)
PARTITION BY toYYYYMM(time_utc);
-- Step 2: A distributed table is created on top of the local table. Queries are executed against this distributed table.
-- This means that ClickHouse, on query execution, will automatically choose the replica to execute against based on load, availability, etc.
CREATE TABLE IF NOT EXISTS events ON CLUSTER main AS events_local
ENGINE = Distributed(main, %(analytics_db_name)s, events_local, rand());
Sharding is not yet necessary as our concurrent query load is relatively small, so horizontal partitioning of the database is not required at this time.
Hardware Configurationβ
- CPU: Varies by account (typically 2)
- RAM: Varies by account (typically ~8GB per replica)
- Storage: Varies by account (typically 50-500GB)
- Network: Low latency between replicas required
The hardware configuration is close to the minimal requirements for most environments (around 2 CPU cores and 8GB RAM). All hardware sizing is done with guidance from the ClickHouse sizing and hardware recommendations.
DevOps has Grafana dashboards set up specifically for ClickHouse metrics. These can be evaluated when performance degrades to make informed scaling decisions, e.g., Dashboard
Data modelβ
The data model used is intentionally very generic and simplistic. There are only 2 tables:
- events
- assistants_tracing_spans
The events schema contains all fields shared among all events and a JSON field called data. This contains the specifics for each event type.
The assistants_tracing_spans table contains all spans generated by assistants. The schema used here is based on the OpenTelemetry tracing schema.
Events Table Schemaβ
| Field | Type | Description |
|---|---|---|
event_id | UUID | Unique identifier for the event |
time_utc | DateTime64(6, 'UTC') | Timestamp of when the event occurred (microsecond precision) |
event | LowCardinality(String) | Event type (e.g., "recommendation_accepted", "message_sent") |
profile_code | LowCardinality(Nullable(String)) | Assistant profile code associated with the event |
platform_session_id | Nullable(String) | Session identifier from the platform |
platform | LowCardinality(Nullable(String)) | Platform where the event originated (e.g., "genesys", "zendesk") |
agent_id | LowCardinality(Nullable(String)) | ID of the agent who triggered the event |
agent_email | LowCardinality(Nullable(String)) | Email of the agent |
agent_name | LowCardinality(Nullable(String)) | Name of the agent |
visitor_id | Nullable(String) | ID of the visitor/customer in the conversation |
conversation_id | Nullable(String) | ID of the conversation |
data | JSON | Event-specific data (flexible schema per event type) |
Table Engine: ReplicatedMergeTree
Order By: (event, time_utc) - Optimized for filtering by event type and time range
Partition By: toYYYYMM(time_utc) - Monthly partitions for efficient data management
Assistants Tracing Spans Table Schemaβ
| Field | Type | Description |
|---|---|---|
name | LowCardinality(String) | Name of the span (e.g., "llm_call", "tool_execution") |
span_id | String | Unique identifier for this span |
trace_id | String | Trace ID linking related spans together |
parent_id | String | ID of the parent span (for nested operations) |
conversation_id | String | ID of the conversation this span belongs to |
kind | LowCardinality(String) | Span kind (e.g., "INTERNAL", "CLIENT") |
start_time | DateTime64(6, 'UTC') | When the span started |
end_time | DateTime64(6, 'UTC') | When the span ended |
span_start_time | Nullable(DateTime64(6, 'UTC')) | Alternative start time field |
span_end_time | Nullable(DateTime64(6, 'UTC')) | Alternative end time field |
status_code | LowCardinality(String) | Status code (e.g., "OK", "ERROR") |
status_description | String | Human-readable status description |
agent_id | String | ID of the agent |
assistant_code | String | Code of the assistant that generated this span |
evaluation_id | String | ID linking to evaluation data |
event_type | LowCardinality(String) | Type of event that triggered this span |
model | LowCardinality(String) | LLM model used (e.g., "gpt-4", "claude-3") |
model_temperature | Nullable(Float32) | Temperature parameter used for LLM |
input | String | Input text/prompt sent to the model |
output | String | Output text/response from the model |
input_tokens | Nullable(UInt32) | Number of input tokens |
output_tokens | Nullable(UInt32) | Number of output tokens |
attributes | JSON | OpenTelemetry span attributes |
context | JSON | Span context information |
status | JSON | Detailed status information |
events | Array(JSON) | Array of events within the span |
links | Array(JSON) | Links to other spans |
resource | JSON | Resource information |
Table Engine: ReplicatedMergeTree
Order By: (start_time, event_type, trace_id, span_id) - Optimized for time-based queries and trace lookups
Partition By: toYYYYMM(start_time) - Monthly partitions for efficient data management
Migration managementβ
The deployment of ClickHouse is split into two phases:
- Infrastructure Layer (DevOps via Terraform): Deploying the infrastructure, networking, databases, users and permissions
- Application Layer (Application startup): Deploying all objects within the database (tables, views, dictionaries, etc.)
The first part is managed by the DevOps team via Terraform and is separately documented. This document focuses on the application layer - how database objects are deployed and managed.
The code for the migration client is stored in deepdesk-admin/backend/domains/analytics/migrations/manager.py and all migrations are stored as SQL files in the migrations folder. All prefixed by three digits numerically increasing up from 000 (the migration table file).
For a more detailed technical explanation see the docstring for the ClickHouseMigrationManager in: deepdesk-admin/backend/domains/analytics/migrations/manager.py
Flowβ
- During startup of the application in Kubernetes, the
deepdesk-admin/scripts/migrate_analytics.pyscript is run. - This script initializes the migration client.
- The migration client checks which migrations have already been applied by querying the
_migration_historytable (which stores version numbers and timestamps). - It then applies all migrations which have not been applied. If the table does not exist, it runs all migrations including the first one that deploys the _migration_history table.
If any migration fails, the application will not start. This ensures the database schema is always in a consistent state. This is important as because of the integration in studio, the analytics are a core part of the application.
File Naming Conventionβ
- Pattern:
NNN_descriptive_name.sql(e.g.,001_create_table_events.sql) - Must be idempotent (safe to run multiple times), so not
CREATE TABLEbutCREATE TABLE IF NOT EXISTS - Can use parameter substitution:
%(analytics_db_name)s,%(admin_db_name)s
Ingestionβ
For both events and assistants_tracing_spans, the same insert mechanism is used. The differentiator is how the rows are generated.
Eventsβ
- Actions of interest happen in the frontend (studio or integrations)
- Events are generated and sent to the
/eventsendpoint - Events are parsed into the required data model
- Anonymization is applied if configured via the NER model
- Celery task
insert_rows_into_clickhouseis called (seedeepdesk-admin/backend/domains/analytics/tasks.py)
Assistants Tracing Spansβ
- Assistant is evaluated in the backend
- Trace is generated via OpenTelemetry standards (see
deepdesk-admin/backend/domains/assistants/utils/tracing.py) - When evaluation is complete, trace is sent via OpenTelemetry exporter
- Celery task
process_spansis triggered for processing and anonymization via NER (seedeepdesk-admin/backend/tasks.py) - Task calls
insert_rows_into_clickhouseCelery task (seedeepdesk-admin/backend/domains/analytics/tasks.py)
Benefitsβ
The benefit of this approach is that there is one single task responsible for asynchronously inserting rows into ClickHouse. This means that all retries, timeouts, etc. can be configured once and are then relevant for any table into which data is ingested. For more details about these settings see the docstring in the code.
Operational Detailsβ
Task Configuration:
- Queue:
ingestion - Timeout: 15s hard limit, 10s soft limit
- Retries: Unlimited with exponential backoff (max 15 min intervals)
- Batch Size: Rows are inserted per call (batch size determined by caller, in case of events always 1, in case of spans always the full trace)
Error Handling:
- Transient errors (auto-retry): Connection errors, timeouts, memory limits, too many queries, network errors
- Permanent errors (rejected): Schema errors, data integrity errors, programming errors
- Special case:
UNKNOWN_TABLEerrors only retry for known tables during scaling events
Query Architectureβ
The query architecture exists within the deepdesk-admin repository and follows Domain-Driven Design (DDD) principles. Each domain that requires realtime analytics has its own endpoints, services, repositories, and queries. This architecture emphasizes code reuse through inheritance and standardization.
Domain Structureβ
Each domain follows this standardized structure:
domain/
βββ endpoints/
β βββ analytics.py # Endpoint definitions
β βββ schemas.py # Query and response Pydantic models
βββ models/
β βββ model_name.py # Business domain models
βββ repositories/
β βββ sql_templates/ # SQL query templates
β β βββ query_name.sql
β βββ analytics.py # Repository functions that load SQL templates
βββ services/
βββ analytics.py # Business logic and result transformation
Key Responsibilities:
- Endpoints: Simple FastAPI endpoint definitions with dependency injection
- Schemas: Pydantic models for request validation and response serialization
- Repositories: Load SQL templates, parse query parameters, execute queries
- Services: Transform raw query results into response models, handle NaN/Infinity values
- SQL Templates: Parameterized SQL queries with placeholders for dynamic filtering
Shared Base Componentsβ
The domain-specific code is kept simple by inheriting from shared base components:
Base Repository (backend/base/clickhouse.py)β
Contains BaseClickHouseAnalyticsRepository which provides:
- SQL template loading from files
- Query parameter parsing (filters, date ranges, sorting, pagination)
- WHERE clause generation from query mixins
- ORDER BY clause generation
- Parameter substitution and rendering
Query Mixins (backend/api/query.py)β
Provides standardized query parameter classes that ensure consistent API structure across all analytics endpoints:
DateTimeRangeQueryMixin: Start/end datetime filteringFilterQueryMixin: Dynamic field filtering with OR groupsSortingQueryMixin: Field-based sorting with directionPaginationQueryMixin: Offset/limit pagination
These mixins are translated by the repository into ClickHouse SQL syntax.
ClickHouse Client (backend/clients/clickhouse.py)β
Provides a base ClickHouse client with:
- Standardized timeout settings
- Connection pooling
- Exception handling and classification
- Parameter conversion (Python types β ClickHouse types)
- Query execution with automatic retries
Exception Handling (backend/base/exceptions.py)β
Contains all analytics exceptions with HTTP response codes that are automatically registered and returned by FastAPI when raised.
Design Rationaleβ
Why this structure?
- Consistency: All analytics endpoints follow the same query parameter patterns (date ranges, filters, sorting, pagination)
- Code Reuse: Common logic (SQL parsing, parameter handling, error handling) is centralized
- Maintainability: Changes to query parsing or error handling only need to be made in one place
- Type Safety: Pydantic models ensure request/response validation
- Separation of Concerns: Clear boundaries between API layer, business logic, data access, and SQL
- Testability: Each layer can be tested independently
- Development Speed: By reusing standardized, tested components, development of new endpoints solely focuses on answering the analytic questions.
Development Guideβ
Local Setupβ
To start with local development, it is highly recommended to get the Deepdesk stack running locally in Docker. This can be done by cloning the deepdesk-stack repository and following the README. For pure RTA development, you don't need the Redis and Elastic components running.
By doing so, ClickHouse will be automatically spun up locally and migrations will be applied on startup of the backend-v2 pod. If you create new migrations and want to have them applied during development, just restart the backend-v2 pod to rerun the migration manager.
You can then connect to the ClickHouse instance directly via any database client or the clickhouse-client CLI application as follows:
clickhouse-client --host localhost --port 9001 --user user --password password
To test the APIs you've developed, you can access them via the browser at https://localhost:8443/api/v2/docs
To test the frontend integrations (if already available), you can access the locally hosted frontend at https://localhost:8443/admin/dd
Any changes made are immediately reflected because the local stack hot reloads.
Querying Staging/Production Environmentsβ
To query live environments, you can access a web-based query interface as long as you are connected to the VPN. The URLs and credentials for this can be found in 1Password in the "Engineering" vault. Search for "clickhouse" and all relevant environments will show up. These accounts are set up to be read-only.
Development Process for a New Endpointβ
In general, development follows these steps in order:
- Define Use Case: Refine a use case/question that can be answered with analytics in the studio
- Design Response Format: Refine with frontend how to display this and what response format the data should have
- Develop SQL Query: Develop a working SQL query against a prod/staging environment that provides the insights required
- Create SQL Template: Generalize this SQL query into a parameterized
sql_template - Define Schemas: Develop the required Query and Response Model for the endpoint
- Implement Repository: Develop the repository logic
- Implement Service: Develop the service logic
- Implement Endpoint: Develop the endpoint
- Add Testing: Develop the required testing
- Merge & Notify: Merge to main and notify frontend if required
- Iterate: Iterate if required
If no other RTA endpoints exist in this domain, the RTA infrastructure for the domain as explained above will have to be set up first.
Of course, the example below is a relatively simple endpoint, but this process applies in all cases. For example, when a bug is reported, go through these steps and figure out where the bug originates. Is it query logic, filtering logic, API logic, etc.? Then apply the fix where needed.
In some cases, new functionality is required for an endpoint. An example from the past was the need to select the previous time range. For instance, if I get the data of the last 7 days, I also want the data of the 7 days before to make a comparison.
In these cases, it's good to think about how this logic (which could be relevant in more cases) should be integrated into the base framework, either by altering a Mixin, Base class, or creating a new Mixin. This specific example was solved by adding an additional function triggered by an optional parameter in the DateTimeRangeMixin.
A Practical Exampleβ
Here the process above is explained practically by going through the development of the assisted-text-per-source endpoint, which can be found in deepdesk-admin/backend/domains/recommendations/endpoints/endpoints.py.
Step 1: Define Use Caseβ
We want to move reporting currently in Looker to the studio (Looker report). It is the "Assisting Deepdesk Source" graph. This graph shows from which source assisted characters in a timeframe come. This can be things like personal suggestions, search, text generation, etc.
Step 2: Design Response Formatβ
Frontend needs an endpoint that can be filtered on:
- A datetime range
- A
profile_code
It needs a response in the format of a Material UI charts series, where there is a series for each source for which there are suggestions. This is already an existing response format called SeriesResponse, which we can reuse. This model can be found in backend/domains/recommendations/endpoints/schemas.py.
It will use this response to generate a stacked bar chart.
Step 3: Develop SQL Queryβ
SELECT
-- We want aggregated metrics for a certain timeframe which can be dynamic (e.g., Year, Month, Week, Day, Hour, etc.)
dateTrunc('DAY', time_utc) AS timestamp,
-- We apply categorization, this categorization was provided by product
CASE
WHEN source::String = 'text_recommendation' THEN 'reply-suggestion'
WHEN source::String IN ('rec','semantic','trie','trie-with-typos','mixed') THEN 'autocomplete'
WHEN source::String = 'sticky-messages' THEN 'pinned-messages'
WHEN source::String IN ('studio-search','search-preview','search-autocomplete','search-default','search-personal','search-stickymessage') THEN 'search'
WHEN source::String = 'gpt' THEN 'text-generation'
WHEN source::String IN ('url_recommendation','url','search-url') THEN 'url-suggestion'
ELSE source::String
END AS source_category,
sum(len::Int32) AS value
FROM vodafoneziggo_analytics.events
LEFT ARRAY JOIN
data.suggestions[].source AS source,
data.suggestions[].len_text AS len
-- Only interested in assisted text which only happens in Send Message events
-- event is part of the order by key from the events table, prewhere allows for efficient filtering on fields in that order key
PREWHERE event = 'Send Message'
-- Select a relevant date range
WHERE
time_utc BETWEEN '2026-02-01 00:00:00' AND '2026-02-07 23:59:59'
-- Remove noise
AND len::Int32 > 0
-- A profile code filter
AND profile_code = 'vodafone'
-- Group by the aggregations
GROUP BY
timestamp, source_category
-- Order already in the database, more performant, this is how frontend will display it by default
ORDER BY
-- It's a timeseries so first by date
timestamp ASC,
-- Then most logical to display the largest values first, at the bottom of a stacked bar chart
value ASC
Step 4: Create SQL Templateβ
Add new file backend/domains/recommendations/repositories/sql_templates/assisted_text_per_source.sql
SELECT
-- Parameterize the datetime aggregation
dateTrunc({aggregation_level:String}, time_utc) AS timestamp,
CASE
WHEN source::String = 'text_recommendation' THEN 'reply-suggestion'
WHEN source::String IN ('rec','semantic','trie','trie-with-typos','mixed') THEN 'autocomplete'
WHEN source::String = 'sticky-messages' THEN 'pinned-messages'
WHEN source::String IN ('studio-search','search-preview','search-autocomplete','search-default','search-personal','search-stickymessage') THEN 'search'
WHEN source::String = 'gpt' THEN 'text-generation'
WHEN source::String IN ('url_recommendation','url','search-url') THEN 'url-suggestion'
ELSE source::String
END AS source_category,
sum(len::Int32) AS value
FROM {analytics_db}.events
LEFT ARRAY JOIN
data.suggestions[].source AS source,
data.suggestions[].len_text AS len
PREWHERE event = 'Send Message'
WHERE
-- Parameterize the datetime range selection
time_utc BETWEEN {datetime_from:DateTime64(3, 'UTC')} AND {datetime_to:DateTime64(3, 'UTC')}
AND len::Int32 > 0
-- Turned out that there are sometimes events without a profile_code ingested. This should not happen.
-- The number of these events was very low, so we decided to filter them out. (This was discovered later and added as a bugfix)
AND profile_code IS NOT NULL
-- Add a dynamic where clause, which allows for filtering by profile_code, but it's not necessary
{where_clause}
GROUP BY
timestamp, source_category
ORDER BY
timestamp ASC,
value ASC
Step 5: Define Schemasβ
Add new model in backend/domains/recommendations/endpoints/schemas.py
# Selected the relevant Mixins
# DateTimeRange for datetime selection
# FilterQuery for the profile_code filtering
# AdditionParameters for aggregation level
class AssistedTextPerSourceQuery(DateTimeRangeQueryMixin, FilterQueryMixin, AdditionalParametersQueryMixin):
"""Query parameters for retrieving assisted text per source."""
# Add aggregation level field to the endpoint
aggregation_level: AggregationLevel = Field()
# Add a profile_code(s) filter to the endpoint
profile_codes: list[str] | None = Field(default=None, description="List of profile codes")
# Overwrite the mixin query with the actual filters
def get_filter_parameters(self) -> list[QueryParameter]:
params: list[QueryParameter] = []
if self.profile_codes:
params.append(ArrayParameter(name="profile_code", type=SupportedDatatypes.STRING, value=self.profile_codes))
return params
# Overwrite the mixin query with the actual additional parameters
def get_additional_parameters(self) -> list[QueryParameter]:
params: list[QueryParameter] = []
params.append(
ValueParameter(name="aggregation_level", type=SupportedDatatypes.STRING, value=self.aggregation_level)
)
return params
Response format already exists so no development needed.
Step 6: Implement Repositoryβ
Add new function in backend/domains/recommendations/repositories/analytics.py
async def get_assisted_text_per_source(self, query: AssistedTextPerSourceQuery) -> list[dict[str, Any]]:
# We rely on the predefined execute_template_query in the base repository shared across all domains
return await self._execute_template_query("assisted_text_per_source.sql", query)
Step 7: Implement Serviceβ
Add new function in backend/domains/recommendations/services/analytics.py
async def get_assisted_text_per_source(self, query: AssistedTextPerSourceQuery) -> SeriesResponse:
rows = await self.repository.get_assisted_text_per_source(query=query)
# We can reuse the existing function that parses into a series response, exception handling is already done in this function
return self._parse_charts_series_response(rows, series_field="source_category")
Step 8: Implement Endpointβ
Add new function in backend/domains/recommendations/endpoints/endpoints.py
@router.api_route(
# Set url, method and security scope
"/assisted-text-per-source", methods=["GET"], dependencies=[Security(jwt_or_oauth, scopes=["recommendations:read"])]
)
async def assisted_text_per_source(
analytics_service: Annotated[AnalyticsService, Depends(get_analytics_service)],
query: Annotated[AssistedTextPerSourceQuery, Query()],
) -> SeriesResponse:
# Because the service will raise an analytics exception when relevant, no exception handling needed here
response = await analytics_service.get_assisted_text_per_source(query=query)
return response
Step 9: Add Testingβ
Add testing around all added functions, see the repository for examples.
Step 10: Merge & Notifyβ
Merge to main and notify frontend that the endpoint is ready for integration.