Dragon Pastoral

凯龘牧歌

Chapter22 - Ad Click Event Aggregation

Posted at # SystemDesign

Ad Click Event Aggregation

Digital advertising is a big industry with the rise of Facebook, YouTube, TikTok, etc.

Hence, tracking ad click events is important. In this chapter, we explore how to design an ad click event aggregation system at Facebook/Google scale.

Digital advertising has a process called real-time bidding (RTB), where digital advertising inventory is bought and sold: digital-advertising-example

Speed of RTB is important as it usually occurs within a second. Data accuracy is also very important as it impacts how much money advertisers pay.

Based on ad click event aggregations, advertisers can make decisions such as adjust target audience and keywords.

Step 1 - Understand the Problem and Establish Design Scope

Functional requirements

Non-functional requirements

Back-of-the-envelope estimation

Step 2 - Propose High-Level Design and Get Buy-In

In this section, we discuss query API design, data model and high-level design.

Query API Design

The API is a contract between the client and the server. In our case, the client is the dashboard user - data scientist/analyst, advertiser, etc.

Here’s our functional requirements:

We need two endpoints to achieve those requirements. Filtering can be done via query parameters on one of them.

Aggregate number of clicks of ad_id in the last M minutes:

GET /v1/ads/{:ad_id}/aggregated_count

Query parameters:

Response:

Return top N most clicked ad_ids in the last M minutes

GET /v1/ads/popular_ads

Query parameters:

Response:

Data model

In our system, we have raw and aggregated data.

Raw data looks like this:

[AdClickEvent] ad001, 2021-01-01 00:00:01, user 1, 207.148.22.22, USA

Here’s an example in a structured format:

ad_idclick_timestampuseripcountry
ad0012021-01-01 00:00:01user1207.148.22.22USA
ad0012021-01-01 00:00:02user1207.148.22.22USA
ad0022021-01-01 00:00:02user2209.153.56.11USA

Here’s the aggregated version:

ad_idclick_minutefilter_idcount
ad00120210101000000122
ad00120210101000000233
ad00120210101000100121
ad00120210101000100236

The filter_id helps us achieve our filtering requirements.

filter*idregionIPuser_id
0012US*_
0013_123.1.2.3_

To support quickly returning top N most clicked ads in the last M minutes, we’ll also maintain this structure:

most_clicked_ads
window_sizeintegerThe aggregation window size (M) in minutes
update_time_minutetimestampLast updated timestamp (in 1-minute granularity)
most_clicked_adsarrayList of ad IDs in JSON format.

What are some pros and cons between storing raw data and storing aggregated data?

In our design, we’ll use a combination of both approaches:

When it comes to the database, there are several factors to take into consideration:

For the raw data, we can see that the average QPS is 10k and peak QPS is 50k, so the system is write-heavy. On the other hand, read traffic is low as raw data is mostly used as backup if anything goes wrong.

Relational databases can do the job, but it can be challenging to scale the writes. Alternatively, we can use Cassandra or InfluxDB which have better native support for heavy write loads.

Another option is to use Amazon S3 with a columnar data format like ORC, Parquet or AVRO. Since this setup is unfamiliar, we’ll stick to Cassandra.

For aggregated data, the workload is both read and write heavy as aggregated data is constantly queried for dashboards and alerts. It is also write-heavy as data is aggregated and written every minute by the aggregation service. Hence, we’ll use the same data store (Cassandra) here as well.

High-level design

Here’s how our system looks like: high-level-design-1

Data flows as an unbounded data stream on both inputs and outputs.

In order to avoid having a synchronous sink, where a consumer crashing can cause the whole system to stall, we’ll leverage asynchronous processing using message queues (Kafka) to decouple consumers and producers. high-level-design-2

The first message queue stores ad click event data:

ad_idclick_timestampuser_idipcountry

The second message queue contains ad click counts, aggregated per-minute:

ad_idclick_minutecount

As well as top N clicked ads aggregated per minute:

update_time_minutemost_clicked_ads

The second message queue is there in order to achieve end to end exactly-once atomic commit semantics: Kafka achieves atomic commit (and thus end-to-end exactly-once semantics) by layering a lightweight two-phase-commit protocol on top of its normal log-append operations. atomic-commit

For the aggregation service, using the MapReduce framework is a good option: ad-count-map-reduce top-100-map-reduce

Each node is responsible for one single task and it sends the processing result to the downstream node.

The map node is responsible for reading from the data source, then filtering and transforming the data.

For example, the map node can allocate data across different aggregation nodes based on the ad_id: map-node

Alternatively, we can distribute ads across Kafka partitions and let the aggregation nodes subscribe directly within a consumer group. However, the mapping node enables us to sanitize or transform the data before subsequent processing.

Another reason might be that we don’t have control over how data is produced, so events related to the same ad_id might go on different partitions.

The aggregate node counts ad click events by ad_id in-memory every minute.

The reduce node collects aggregated results from aggregate node and produces the final result: reduce-node

This DAG model uses the MapReduce paradigm. It takes big data and leverages parallel distributed computing to turn it into regular-sized data.

In the DAG model, intermediate data is stored in-memory and different nodes communicate with each other using TCP or shared memory.

Let’s explore how this model can now help us to achieve our various use-cases.

Use-case 1 - aggregate the number of clicks: use-case-1

Use-case 2 - return top N most clicked ads: use-case-2

Use-case 3 - data filtering: To support fast data filtering, we can predefine filtering criterias and pre-aggregate based on it:

ad_idclick_minutecountrycount
ad001202101010001USA100
ad001202101010001GPB200
ad001202101010001others3000
ad002202101010001USA10
ad002202101010001GPB25
ad002202101010001others12

This technique is called the star schema and is widely used in data warehouses. The filtering fields are called dimensions.

This approach has the following benefits:

A limitation of this approach is that it creates many more buckets and records, especially when we have lots of filtering criterias.

Step 3 - Design Deep Dive

Let’s dive deeper into some of the more interesting topics.

Streaming vs. Batching

The high-level architecture we proposed is a type of stream processing system. Here’s a comparison between three types of systems:

Services (Online system)Batch system (offline system)Streaming system (near real-time system)
ResponsivenessRespond to the client quicklyNo response to the client neededNo response to the client needed
InputUser requestsBounded input with finite size. A large amount of dataInput has no boundary (infinite streams)
OutputResponses to clientsMaterialized views, aggregated metrics, etc.Materialized views, aggregated metrics, etc.
Performance measurementAvailability, latencyThroughputThroughput, latency
ExampleOnline shoppingMapReduceFlink [13]

In our design, we used a mixture of batching and streaming.

We used streaming for processing data as it arrives and generates aggregated results in near real-time. We used batching, on the other hand, for historical data backup.

A system which contains two processing paths - batch and streaming, simultaneously, this architecture is called lambda. A disadvantage is that you have two processing paths with two different codebases to maintain.

Kappa is an alternative architecture, which combines batch and stream processing in one processing path. The key idea is to use a single stream processing engine.

Lambda architecture: lambda-architecture

Kappa architecture: kappa-architecture

Our high-level design uses Kappa architecture as reprocessing of historical data also goes through the aggregation service.

Whenever we have to recalculate aggregated data due to eg a major bug in aggregation logic, we can recalculate the aggregation from the raw data we store.

Time

We need a timestamp to perform aggregation. It can be generated in two places:

Due to the usage of async processing (message queues) and network delays, there can be significant difference between event time and processing time.

There is no perfect solution, we need to consider trade-offs:

ProsCons
Event timeAggregation results are more accurateClients might have the wrong time or timestamp might be generated by malicious users
Processing timeServer timestamp is more reliableThe timestamp is not accurate if event is late

Since data accuracy is important, we’ll use the event time for aggregation.

To mitigate the issue of delayed events, a technique called “watermark” can be leveraged. Notice that the watermark technique does not handle events that have long delays.

In the example below, event 2 misses the window where it needs to be aggregated: watermark-technique

However, if we purposefully extend the aggregation window, we can reduce the likelihood of missed events. The extended part of a window is called a “watermark”: watermark-2

There is always likelihood of missed events, regardless of the watermark’s size. But there is no use in optimizing for such low-probability events.

We can instead resolve such inconsistencies by doing end-of-day reconciliation.

Aggregation window

There are four types of window functions:

In our design, we leverage a tumbling window for ad click aggregations: tumbling-window

As well as a sliding window for the top N clicked ads in M minutes aggregation: sliding-window

Delivery guarantees

Since the data we’re aggregating is going to be used for billing, data accuracy is a priority.

Hence, we need to discuss:

There are three delivery guarantees we can use - at-most-once, at-least-once and exactly once.

In most circumstances, at-least-once is sufficient when a small amount of duplicates is acceptable. This is not the case for our system, though, as a difference in small percent can result in millions of dollars of discrepancy. Hence, we’ll need to use exactly-once delivery semantics.

Data deduplication

One of the most common data quality issues is duplicated data.

It can come from a wide range of sources:

Here’s an example of data duplication occurring due to failure to acknowledge an event on the last hop: data-duplication-example

In this example, offset 100 will be processed and sent downstream multiple times.

One option to try and mitigate this is to store the last seen offset in HDFS/S3, but this risks the result never reaching downstream: data-duplication-example-2

Finally, we can store the offset while interacting with downstream atomically. To achieve this, we need to implement a distributed transaction: data-duplication-example-3

Personal side-note: Alternatively, if the downstream system handles the aggregation result idempotently, there is no need for a distributed transaction.

Scale the system

Let’s discuss how we scale the system as it grows.

We have three independent components - message queue, aggregation service and database. Since they are decoupled, we can scale them independently.

How do we scale the message queue:

How do we scale the aggregation service: aggregation-service-scaling

How do we scale the database:

Another scalability issue to consider is the hotspot issue - what if an ad is more popular and gets more attention than others? hotspot-issue

Alternative, more sophisticated ways to handle the hotspot problem:

Fault Tolerance

Within the aggregation nodes, we are processing data in-memory. If a node goes down, the processed data is lost.

We can leverage consumer offsets in kafka to continue from where we left off once another node picks up the slack. However, there is additional intermediary state we need to maintain, as we’re aggregating the top N ads in M minutes.

We can make snapshots at a particular minute for the on-going aggregation: fault-tolerance-example

If a node goes down, the new node can read the latest committed consumer offset, as well as the latest snapshot to continue the job: fault-tolerance-recovery-example

Data monitoring and correctness

As the data we’re aggregating is critical as it’s used for billing, it is very important to have rigorous monitoring in place in order to ensure correctness.

Some metrics we might want to monitor:

We also need to implement a reconciliation flow which is a batch job, running at the end of the day. It calculates the aggregated results from the raw data and compares them against the actual data stored in the aggregation database: reconciliation-flow

Alternative design

In a generalist system design interview, you are not expected to know the internals of specialized software used in big data processing.

Explaining the thought process and discussing trade-offs is more important than knowing specific tools, which is why the chapter covers a generic solution.

An alternative design, which leverages off-the-shelf tooling, is to store ad click data in Hive with an ElasticSearch layer on top built for faster queries.

Aggregation is typically done in OLAP databases such as ClickHouse or Druid. alternative-design

Step 4 - Wrap up

Things we covered:

The ad click event aggregation is a typical big data processing system.

It would be easier to understand and design it if you have prior knowledge of related technologies: