Skip to main content

Realtime Analytics

This document describes the architecture and role of the RTA (Realtime Analytics) Service in the Deepdesk platform. RTA provides near-instantaneous analytics on user interactions, assistant performance, and system metrics in the studio using ClickHouse as the storage backend.

Key Components:

  • ClickHouse: Storage and query engine
  • Ingestion: Events (user interactions) and Spans (assistant traces)
  • Query Architecture: Repositories, services, and API endpoints
  • Dashboards: Recommendations, Assistants, Voice Agents analytics

High-Level Flow​

The main flow for consuming realtime analytics data is as follows:

  1. Frontend calls analytics API endpoints (e.g., /recommendations/charts/series)
  2. API Endpoint triggers the analytics service layer
  3. Service Layer calls the analytics repository with query parameters
  4. Repository Layer performs query parsing and SQL template rendering
  5. ClickHouse Client executes the query against the distributed table
  6. Service Layer parses and transforms the results (handles NaN/Infinity, formats response)
  7. API Endpoint returns the formatted data to the frontend

ClickHouse​

Setup​

ClickHouse runs in a replicated, non-sharded setup, with for now exactly 2 replicas running in every environment. The replication provides fault tolerance in our Kubernetes setup where pods can scale up or down.

The benefits of this local + distributed table pattern are:

  • Automatic load balancing: Queries distributed across healthy replicas
  • Fault tolerance: If one replica fails, queries automatically route to others
  • Transparent scaling: Add replicas without changing application code
  • Consistent interface: Application always queries the distributed table

For every Google project we have a separate ClickHouse instance, and for every account within a project there are two separate databases within the instance. All databases follow the same naming conventions:

  • account_name_analytics
  • account_name_admin_db

The analytics database is a regular ClickHouse database in which all tables are deployed. The admin_db is represented as a database within the ClickHouse instance but is actually a connection to the operational PostgreSQL database. For specific tables whitelisted by DevOps, SELECT access has been allowed on these tables. This allows for enrichment of analytical data with operational data.

e.g., an assistant's name can change over time when it's renamed. This is only stored in the operational database. In the events and spans in the analytics database, only the assistant_code is stored. By combining these two data sources, we can ensure that the analytics always provide the correct data.

Table creation pattern​

Due to the replicated nature of our setup, it's important that we use replicated table engines within ClickHouse, as they are designed to work in this configuration. In practice, this means that a table is deployed as follows:

-- Step 1: Create a table on the local cluster named "main" (the cluster the migration client connects to)
CREATE TABLE IF NOT EXISTS events_local ON CLUSTER main
(
event_id UUID,
time_utc DateTime64(6, 'UTC') CODEC (Delta(8), ZSTD(1)),
event LowCardinality(String),
profile_code LowCardinality(Nullable(String)),
platform_session_id Nullable(String),
platform LowCardinality(Nullable(String)),
agent_id LowCardinality(Nullable(String)),
agent_email LowCardinality(Nullable(String)),
agent_name LowCardinality(Nullable(String)),
visitor_id Nullable(String),
conversation_id Nullable(String),
data JSON
)
-- Here the replicated version of the MergeTree is used
-- All variables within the curly braces are automatically substituted by ClickHouse during execution
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/events_local', '{replica}')
-- ORDER BY defines the primary index and sort order (optimizes filtering on these columns)
ORDER BY (event, time_utc)
-- PARTITION BY splits data into monthly partitions (enables efficient data management and pruning)
PARTITION BY toYYYYMM(time_utc);

-- Step 2: A distributed table is created on top of the local table. Queries are executed against this distributed table.
-- This means that ClickHouse, on query execution, will automatically choose the replica to execute against based on load, availability, etc.
CREATE TABLE IF NOT EXISTS events ON CLUSTER main AS events_local
ENGINE = Distributed(main, %(analytics_db_name)s, events_local, rand());

Sharding is not yet necessary as our concurrent query load is relatively small, so horizontal partitioning of the database is not required at this time.

Hardware Configuration​

  • CPU: Varies by account (typically 2)
  • RAM: Varies by account (typically ~8GB per replica)
  • Storage: Varies by account (typically 50-500GB)
  • Network: Low latency between replicas required

The hardware configuration is close to the minimal requirements for most environments (around 2 CPU cores and 8GB RAM). All hardware sizing is done with guidance from the ClickHouse sizing and hardware recommendations.

DevOps has Grafana dashboards set up specifically for ClickHouse metrics. These can be evaluated when performance degrades to make informed scaling decisions, e.g., Dashboard

Data model​

The data model used is intentionally very generic and simplistic. There are only 2 tables:

  1. events
  2. assistants_tracing_spans

The events schema contains all fields shared among all events and a JSON field called data. This contains the specifics for each event type.

The assistants_tracing_spans table contains all spans generated by assistants. The schema used here is based on the OpenTelemetry tracing schema.

Events Table Schema​
FieldTypeDescription
event_idUUIDUnique identifier for the event
time_utcDateTime64(6, 'UTC')Timestamp of when the event occurred (microsecond precision)
eventLowCardinality(String)Event type (e.g., "recommendation_accepted", "message_sent")
profile_codeLowCardinality(Nullable(String))Assistant profile code associated with the event
platform_session_idNullable(String)Session identifier from the platform
platformLowCardinality(Nullable(String))Platform where the event originated (e.g., "genesys", "zendesk")
agent_idLowCardinality(Nullable(String))ID of the agent who triggered the event
agent_emailLowCardinality(Nullable(String))Email of the agent
agent_nameLowCardinality(Nullable(String))Name of the agent
visitor_idNullable(String)ID of the visitor/customer in the conversation
conversation_idNullable(String)ID of the conversation
dataJSONEvent-specific data (flexible schema per event type)

Table Engine: ReplicatedMergeTree
Order By: (event, time_utc) - Optimized for filtering by event type and time range
Partition By: toYYYYMM(time_utc) - Monthly partitions for efficient data management

Assistants Tracing Spans Table Schema​
FieldTypeDescription
nameLowCardinality(String)Name of the span (e.g., "llm_call", "tool_execution")
span_idStringUnique identifier for this span
trace_idStringTrace ID linking related spans together
parent_idStringID of the parent span (for nested operations)
conversation_idStringID of the conversation this span belongs to
kindLowCardinality(String)Span kind (e.g., "INTERNAL", "CLIENT")
start_timeDateTime64(6, 'UTC')When the span started
end_timeDateTime64(6, 'UTC')When the span ended
span_start_timeNullable(DateTime64(6, 'UTC'))Alternative start time field
span_end_timeNullable(DateTime64(6, 'UTC'))Alternative end time field
status_codeLowCardinality(String)Status code (e.g., "OK", "ERROR")
status_descriptionStringHuman-readable status description
agent_idStringID of the agent
assistant_codeStringCode of the assistant that generated this span
evaluation_idStringID linking to evaluation data
event_typeLowCardinality(String)Type of event that triggered this span
modelLowCardinality(String)LLM model used (e.g., "gpt-4", "claude-3")
model_temperatureNullable(Float32)Temperature parameter used for LLM
inputStringInput text/prompt sent to the model
outputStringOutput text/response from the model
input_tokensNullable(UInt32)Number of input tokens
output_tokensNullable(UInt32)Number of output tokens
attributesJSONOpenTelemetry span attributes
contextJSONSpan context information
statusJSONDetailed status information
eventsArray(JSON)Array of events within the span
linksArray(JSON)Links to other spans
resourceJSONResource information

Table Engine: ReplicatedMergeTree
Order By: (start_time, event_type, trace_id, span_id) - Optimized for time-based queries and trace lookups
Partition By: toYYYYMM(start_time) - Monthly partitions for efficient data management

Migration management​

The deployment of ClickHouse is split into two phases:

  1. Infrastructure Layer (DevOps via Terraform): Deploying the infrastructure, networking, databases, users and permissions
  2. Application Layer (Application startup): Deploying all objects within the database (tables, views, dictionaries, etc.)

The first part is managed by the DevOps team via Terraform and is separately documented. This document focuses on the application layer - how database objects are deployed and managed.

The code for the migration client is stored in deepdesk-admin/backend/domains/analytics/migrations/manager.py and all migrations are stored as SQL files in the migrations folder. All prefixed by three digits numerically increasing up from 000 (the migration table file).

For a more detailed technical explanation see the docstring for the ClickHouseMigrationManager in: deepdesk-admin/backend/domains/analytics/migrations/manager.py

Flow​

  1. During startup of the application in Kubernetes, the deepdesk-admin/scripts/migrate_analytics.py script is run.
  2. This script initializes the migration client.
  3. The migration client checks which migrations have already been applied by querying the _migration_history table (which stores version numbers and timestamps).
  4. It then applies all migrations which have not been applied. If the table does not exist, it runs all migrations including the first one that deploys the _migration_history table.
warning

If any migration fails, the application will not start. This ensures the database schema is always in a consistent state. This is important as because of the integration in studio, the analytics are a core part of the application.

File Naming Convention​

  • Pattern: NNN_descriptive_name.sql (e.g., 001_create_table_events.sql)
  • Must be idempotent (safe to run multiple times), so not CREATE TABLE but CREATE TABLE IF NOT EXISTS
  • Can use parameter substitution: %(analytics_db_name)s, %(admin_db_name)s

Ingestion​

For both events and assistants_tracing_spans, the same insert mechanism is used. The differentiator is how the rows are generated.

Events​

  1. Actions of interest happen in the frontend (studio or integrations)
  2. Events are generated and sent to the /events endpoint
  3. Events are parsed into the required data model
  4. Anonymization is applied if configured via the NER model
  5. Celery task insert_rows_into_clickhouse is called (see deepdesk-admin/backend/domains/analytics/tasks.py)

Assistants Tracing Spans​

  1. Assistant is evaluated in the backend
  2. Trace is generated via OpenTelemetry standards (see deepdesk-admin/backend/domains/assistants/utils/tracing.py)
  3. When evaluation is complete, trace is sent via OpenTelemetry exporter
  4. Celery task process_spans is triggered for processing and anonymization via NER (see deepdesk-admin/backend/tasks.py)
  5. Task calls insert_rows_into_clickhouse Celery task (see deepdesk-admin/backend/domains/analytics/tasks.py)

Benefits​

The benefit of this approach is that there is one single task responsible for asynchronously inserting rows into ClickHouse. This means that all retries, timeouts, etc. can be configured once and are then relevant for any table into which data is ingested. For more details about these settings see the docstring in the code.

Operational Details​

Task Configuration:

  • Queue: ingestion
  • Timeout: 15s hard limit, 10s soft limit
  • Retries: Unlimited with exponential backoff (max 15 min intervals)
  • Batch Size: Rows are inserted per call (batch size determined by caller, in case of events always 1, in case of spans always the full trace)

Error Handling:

  • Transient errors (auto-retry): Connection errors, timeouts, memory limits, too many queries, network errors
  • Permanent errors (rejected): Schema errors, data integrity errors, programming errors
  • Special case: UNKNOWN_TABLE errors only retry for known tables during scaling events

Query Architecture​

The query architecture exists within the deepdesk-admin repository and follows Domain-Driven Design (DDD) principles. Each domain that requires realtime analytics has its own endpoints, services, repositories, and queries. This architecture emphasizes code reuse through inheritance and standardization.

Domain Structure​

Each domain follows this standardized structure:

domain/
β”œβ”€β”€ endpoints/
β”‚ β”œβ”€β”€ analytics.py # Endpoint definitions
β”‚ └── schemas.py # Query and response Pydantic models
β”œβ”€β”€ models/
β”‚ └── model_name.py # Business domain models
β”œβ”€β”€ repositories/
β”‚ β”œβ”€β”€ sql_templates/ # SQL query templates
β”‚ β”‚ └── query_name.sql
β”‚ └── analytics.py # Repository functions that load SQL templates
└── services/
└── analytics.py # Business logic and result transformation

Key Responsibilities:

  • Endpoints: Simple FastAPI endpoint definitions with dependency injection
  • Schemas: Pydantic models for request validation and response serialization
  • Repositories: Load SQL templates, parse query parameters, execute queries
  • Services: Transform raw query results into response models, handle NaN/Infinity values
  • SQL Templates: Parameterized SQL queries with placeholders for dynamic filtering

Shared Base Components​

The domain-specific code is kept simple by inheriting from shared base components:

Base Repository (backend/base/clickhouse.py)​

Contains BaseClickHouseAnalyticsRepository which provides:

  • SQL template loading from files
  • Query parameter parsing (filters, date ranges, sorting, pagination)
  • WHERE clause generation from query mixins
  • ORDER BY clause generation
  • Parameter substitution and rendering

Query Mixins (backend/api/query.py)​

Provides standardized query parameter classes that ensure consistent API structure across all analytics endpoints:

  • DateTimeRangeQueryMixin: Start/end datetime filtering
  • FilterQueryMixin: Dynamic field filtering with OR groups
  • SortingQueryMixin: Field-based sorting with direction
  • PaginationQueryMixin: Offset/limit pagination

These mixins are translated by the repository into ClickHouse SQL syntax.

ClickHouse Client (backend/clients/clickhouse.py)​

Provides a base ClickHouse client with:

  • Standardized timeout settings
  • Connection pooling
  • Exception handling and classification
  • Parameter conversion (Python types β†’ ClickHouse types)
  • Query execution with automatic retries

Exception Handling (backend/base/exceptions.py)​

Contains all analytics exceptions with HTTP response codes that are automatically registered and returned by FastAPI when raised.

Design Rationale​

Why this structure?

  1. Consistency: All analytics endpoints follow the same query parameter patterns (date ranges, filters, sorting, pagination)
  2. Code Reuse: Common logic (SQL parsing, parameter handling, error handling) is centralized
  3. Maintainability: Changes to query parsing or error handling only need to be made in one place
  4. Type Safety: Pydantic models ensure request/response validation
  5. Separation of Concerns: Clear boundaries between API layer, business logic, data access, and SQL
  6. Testability: Each layer can be tested independently
  7. Development Speed: By reusing standardized, tested components, development of new endpoints solely focuses on answering the analytic questions.

Development Guide​

Local Setup​

To start with local development, it is highly recommended to get the Deepdesk stack running locally in Docker. This can be done by cloning the deepdesk-stack repository and following the README. For pure RTA development, you don't need the Redis and Elastic components running.

By doing so, ClickHouse will be automatically spun up locally and migrations will be applied on startup of the backend-v2 pod. If you create new migrations and want to have them applied during development, just restart the backend-v2 pod to rerun the migration manager.

You can then connect to the ClickHouse instance directly via any database client or the clickhouse-client CLI application as follows:

clickhouse-client --host localhost --port 9001 --user user --password password

To test the APIs you've developed, you can access them via the browser at https://localhost:8443/api/v2/docs

To test the frontend integrations (if already available), you can access the locally hosted frontend at https://localhost:8443/admin/dd

Any changes made are immediately reflected because the local stack hot reloads.

Querying Staging/Production Environments​

To query live environments, you can access a web-based query interface as long as you are connected to the VPN. The URLs and credentials for this can be found in 1Password in the "Engineering" vault. Search for "clickhouse" and all relevant environments will show up. These accounts are set up to be read-only.

Development Process for a New Endpoint​

In general, development follows these steps in order:

  1. Define Use Case: Refine a use case/question that can be answered with analytics in the studio
  2. Design Response Format: Refine with frontend how to display this and what response format the data should have
  3. Develop SQL Query: Develop a working SQL query against a prod/staging environment that provides the insights required
  4. Create SQL Template: Generalize this SQL query into a parameterized sql_template
  5. Define Schemas: Develop the required Query and Response Model for the endpoint
  6. Implement Repository: Develop the repository logic
  7. Implement Service: Develop the service logic
  8. Implement Endpoint: Develop the endpoint
  9. Add Testing: Develop the required testing
  10. Merge & Notify: Merge to main and notify frontend if required
  11. Iterate: Iterate if required

If no other RTA endpoints exist in this domain, the RTA infrastructure for the domain as explained above will have to be set up first.

Of course, the example below is a relatively simple endpoint, but this process applies in all cases. For example, when a bug is reported, go through these steps and figure out where the bug originates. Is it query logic, filtering logic, API logic, etc.? Then apply the fix where needed.

In some cases, new functionality is required for an endpoint. An example from the past was the need to select the previous time range. For instance, if I get the data of the last 7 days, I also want the data of the 7 days before to make a comparison.

In these cases, it's good to think about how this logic (which could be relevant in more cases) should be integrated into the base framework, either by altering a Mixin, Base class, or creating a new Mixin. This specific example was solved by adding an additional function triggered by an optional parameter in the DateTimeRangeMixin.

A Practical Example​

Here the process above is explained practically by going through the development of the assisted-text-per-source endpoint, which can be found in deepdesk-admin/backend/domains/recommendations/endpoints/endpoints.py.

Step 1: Define Use Case​

We want to move reporting currently in Looker to the studio (Looker report). It is the "Assisting Deepdesk Source" graph. This graph shows from which source assisted characters in a timeframe come. This can be things like personal suggestions, search, text generation, etc.

Step 2: Design Response Format​

Frontend needs an endpoint that can be filtered on:

  • A datetime range
  • A profile_code

It needs a response in the format of a Material UI charts series, where there is a series for each source for which there are suggestions. This is already an existing response format called SeriesResponse, which we can reuse. This model can be found in backend/domains/recommendations/endpoints/schemas.py.

It will use this response to generate a stacked bar chart.

Step 3: Develop SQL Query​

SELECT
-- We want aggregated metrics for a certain timeframe which can be dynamic (e.g., Year, Month, Week, Day, Hour, etc.)
dateTrunc('DAY', time_utc) AS timestamp,
-- We apply categorization, this categorization was provided by product
CASE
WHEN source::String = 'text_recommendation' THEN 'reply-suggestion'
WHEN source::String IN ('rec','semantic','trie','trie-with-typos','mixed') THEN 'autocomplete'
WHEN source::String = 'sticky-messages' THEN 'pinned-messages'
WHEN source::String IN ('studio-search','search-preview','search-autocomplete','search-default','search-personal','search-stickymessage') THEN 'search'
WHEN source::String = 'gpt' THEN 'text-generation'
WHEN source::String IN ('url_recommendation','url','search-url') THEN 'url-suggestion'
ELSE source::String
END AS source_category,
sum(len::Int32) AS value
FROM vodafoneziggo_analytics.events
LEFT ARRAY JOIN
data.suggestions[].source AS source,
data.suggestions[].len_text AS len
-- Only interested in assisted text which only happens in Send Message events
-- event is part of the order by key from the events table, prewhere allows for efficient filtering on fields in that order key
PREWHERE event = 'Send Message'
-- Select a relevant date range
WHERE
time_utc BETWEEN '2026-02-01 00:00:00' AND '2026-02-07 23:59:59'
-- Remove noise
AND len::Int32 > 0
-- A profile code filter
AND profile_code = 'vodafone'
-- Group by the aggregations
GROUP BY
timestamp, source_category
-- Order already in the database, more performant, this is how frontend will display it by default
ORDER BY
-- It's a timeseries so first by date
timestamp ASC,
-- Then most logical to display the largest values first, at the bottom of a stacked bar chart
value ASC

Step 4: Create SQL Template​

Add new file backend/domains/recommendations/repositories/sql_templates/assisted_text_per_source.sql

SELECT
-- Parameterize the datetime aggregation
dateTrunc({aggregation_level:String}, time_utc) AS timestamp,
CASE
WHEN source::String = 'text_recommendation' THEN 'reply-suggestion'
WHEN source::String IN ('rec','semantic','trie','trie-with-typos','mixed') THEN 'autocomplete'
WHEN source::String = 'sticky-messages' THEN 'pinned-messages'
WHEN source::String IN ('studio-search','search-preview','search-autocomplete','search-default','search-personal','search-stickymessage') THEN 'search'
WHEN source::String = 'gpt' THEN 'text-generation'
WHEN source::String IN ('url_recommendation','url','search-url') THEN 'url-suggestion'
ELSE source::String
END AS source_category,
sum(len::Int32) AS value
FROM {analytics_db}.events
LEFT ARRAY JOIN
data.suggestions[].source AS source,
data.suggestions[].len_text AS len
PREWHERE event = 'Send Message'
WHERE
-- Parameterize the datetime range selection
time_utc BETWEEN {datetime_from:DateTime64(3, 'UTC')} AND {datetime_to:DateTime64(3, 'UTC')}
AND len::Int32 > 0
-- Turned out that there are sometimes events without a profile_code ingested. This should not happen.
-- The number of these events was very low, so we decided to filter them out. (This was discovered later and added as a bugfix)
AND profile_code IS NOT NULL
-- Add a dynamic where clause, which allows for filtering by profile_code, but it's not necessary
{where_clause}
GROUP BY
timestamp, source_category
ORDER BY
timestamp ASC,
value ASC

Step 5: Define Schemas​

Add new model in backend/domains/recommendations/endpoints/schemas.py

# Selected the relevant Mixins
# DateTimeRange for datetime selection
# FilterQuery for the profile_code filtering
# AdditionParameters for aggregation level
class AssistedTextPerSourceQuery(DateTimeRangeQueryMixin, FilterQueryMixin, AdditionalParametersQueryMixin):
"""Query parameters for retrieving assisted text per source."""

# Add aggregation level field to the endpoint
aggregation_level: AggregationLevel = Field()
# Add a profile_code(s) filter to the endpoint
profile_codes: list[str] | None = Field(default=None, description="List of profile codes")

# Overwrite the mixin query with the actual filters
def get_filter_parameters(self) -> list[QueryParameter]:
params: list[QueryParameter] = []
if self.profile_codes:
params.append(ArrayParameter(name="profile_code", type=SupportedDatatypes.STRING, value=self.profile_codes))
return params

# Overwrite the mixin query with the actual additional parameters
def get_additional_parameters(self) -> list[QueryParameter]:
params: list[QueryParameter] = []
params.append(
ValueParameter(name="aggregation_level", type=SupportedDatatypes.STRING, value=self.aggregation_level)
)
return params

Response format already exists so no development needed.

Step 6: Implement Repository​

Add new function in backend/domains/recommendations/repositories/analytics.py

    async def get_assisted_text_per_source(self, query: AssistedTextPerSourceQuery) -> list[dict[str, Any]]:
# We rely on the predefined execute_template_query in the base repository shared across all domains
return await self._execute_template_query("assisted_text_per_source.sql", query)

Step 7: Implement Service​

Add new function in backend/domains/recommendations/services/analytics.py

    async def get_assisted_text_per_source(self, query: AssistedTextPerSourceQuery) -> SeriesResponse:
rows = await self.repository.get_assisted_text_per_source(query=query)
# We can reuse the existing function that parses into a series response, exception handling is already done in this function
return self._parse_charts_series_response(rows, series_field="source_category")

Step 8: Implement Endpoint​

Add new function in backend/domains/recommendations/endpoints/endpoints.py

@router.api_route(
# Set url, method and security scope
"/assisted-text-per-source", methods=["GET"], dependencies=[Security(jwt_or_oauth, scopes=["recommendations:read"])]
)
async def assisted_text_per_source(
analytics_service: Annotated[AnalyticsService, Depends(get_analytics_service)],
query: Annotated[AssistedTextPerSourceQuery, Query()],
) -> SeriesResponse:
# Because the service will raise an analytics exception when relevant, no exception handling needed here
response = await analytics_service.get_assisted_text_per_source(query=query)
return response

Step 9: Add Testing​

Add testing around all added functions, see the repository for examples.

Step 10: Merge & Notify​

Merge to main and notify frontend that the endpoint is ready for integration.