Our team manages and owns a user behavioral and product analytics pipeline that processes and stores a large amount of data every day. When we redesigned our data pipeline we chose to use BigQuery as the Data Warehousing system as it offers rapid SQL queries and interactive analysis of massive datasets. BigQuery supports both Batch loading and Streaming ingestion.
With batch loading, you load the source data into a BigQuery table in a single batch operation. For example, the data source could be a CSV or Parquet file. Traditional extract, transform, and load (ETL) jobs fall into this category.
With streaming, you send the data one record at a time or in micro-batches (x records in a micro-batch) to the streaming endpoint. You use Streaming APIs provided by Google.
Streaming data into BigQuery | Google Cloud
Instead of using a job to load data into BigQuery, you can choose to stream your data into BigQuery one record at a…
PoC to Stream Massive Amount of Data to BQ
I did this PoC before taking the decision to move with the Streaming ingestion for our Pipeline. There are two reasons which made me do this proof of concept.
- Evaluate capabilities to publish our peak traffic data to Big Query. Our Pipeline handles 50+ billion events every day and in that 25 billion events are to a single table. I took this table for my PoC.
- Uncover challenges due to remote Streaming. The data that I want to stream is on-premise and we have an interconnect from the on-premise data center to Google cloud.
As this is a PoC I wanted to get done with it sooner, Hence I used spark structured streaming to consume events from Kafka and Stream to BigQuery. But spark did not have any connector to BigQuery that writes the data into BigQuery using streaming APIs. All connectors that I evaluated were writing into the GCS bucket and then performing a batch load to BigQuery. Hence I decided to write a BigQuery streaming sink for spark and use it for my PoC.
With the custom spark sink, I could write a simple ETL streaming job that consumes and writes the data into BigQuery post transformations.
Daily Events : 25 Billion
Peak Traffic : 420K Events (1.25 GB) per/sec
On-Premise to Cloud network Interconnect Usage : 20 Gbps
I started with 10% traffic and gradually ramped to 100% traffic and I could successfully complete this PoC. Some of the learnings that I had during this PoC are documented below.
BigQuery streaming API gives an option to batch your rows into a single request and call the BigQuery endpoint. I tried with various batch sizes for my use case and observed when batch size was 500 records (1.25 MB) latency was around 450ms — 650ms and when the batch size was 1000 records the latency was around 1100ms — 1500ms. I chose 500 as batch size as it gave a better latency for our use case.
Streaming Insertion Quota Limit:
The default quota for streaming maximum bytes per second is 1GB per GCP project. Any ingestion above this limit would result in BigQueryException with quotaExceeded Error. When I ramped our ingestion traffic near 60% I hit this limit and requests started to fail with the above Exception.
The way to handle this error is to increase the project level insertion quota as per your need by creating a ticket to Google and implement retry logic to retry the same request on every error to avoid losing the failed record in the final table.
This quota usage can be monitored with the below BigQuery SQL and adjust the quota well before you reach the quota limit.
SELECT start_timestamp, SUM(total_requests) AS total_requests, SUM(total_rows) AS total_rows, SUM(total_input_bytes) AS total_input_bytes, SUM(IF(error_code IN (“QUOTA_EXCEEDED”, “RATE_LIMIT_EXCEEDED”), total_requests, 0)) AS quota_error, SUM(IF(error_code IN (“INVALID_VALUE”, “NOT_FOUND”, “SCHEMA_INCOMPATIBLE”, “BILLING_NOT_ENABLED”, “ACCESS_DENIED”, “CUNAUTHENTICATED”), total_requests, 0)) AS user_error, SUM(IF(error_code IN (“CONNECTION_ERROR”,“INTERNAL_ERROR”), total_requests, 0)) AS server_error, SUM(IF(error_code IS NULL, 0, total_requests)) AS total_error, FROM `region-us`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT GROUP BY start_timestamp ORDER BY 1 DESC
The quota is not calculated based on your actual streaming payload to BigQuery but based on the streaming payload + metadata. Your payload is sent inside a JSON and quota is computed based on the received bytes at the BigQuery end. So the additional tags from JSON are also counted in the current quota calculation.
In my PoC, I observed that during streaming ingestion data gets duplicated, and duplicate records are found in the final table.
BigQuery is also providing a way to enable deduplication. There are two ways you can create “RowToInsert” object from the BiqQuery client. One without deduplication enabled and one with deduplication enabled.
// 1. insertion without dedupe
RowToInsert row = InsertAllRequest.RowToInsert.of(content);// 2. insertion with dedupe
RowToInsert row = InsertAllRequest.RowToInsert.of(uniqueInsertId,content);
When I ingested without enabling “dedupe” one record is getting duplicated for every 500,000 records ingested.
When I enabled “dedupe” only one record is getting duplicated for every 5 million records ingested. Enabling deduplication does not guarantee 100% duplication removal rather it is only the best effort to remove duplicates.
BigQueyr uses “uniqueInsertId” present in every record to identify and remove the duplicates. And this deduplication is performed against the last 10 minutes of the received records.
Caching on Streaming Ingestion Table:
BigQuery provides basic caching for the batch ingested tables. When you run a duplicate query, BigQuery attempts to reuse cached results. To retrieve data from the cache.
However, if the data is ingested to the table through streaming then Bigquery does not provide any caching on it.
Higher Latency Querying Streaming Table:
I observed higher latency when querying the streaming table compared to the batch ingested table even when the amount of data scanned from the streaming table is lesser.
The reason for this higher latency in the case of the streaming table is due to the requirements to scan the streaming buffer for the data. Streaming buffer is the place the ingested records are stored initially and then the records are flushed to permanent BigQuery Capacitor columnar storage.
Capacitor Columnar storage is optimized for read-heavy operations and BigQuery Streaming buffer is optimized for write-heavy operations. When you perform read operations from a streaming table you end up reading data from write optimized streaming buffer and that's the exact reason for higher latency. The same is reflected in the above picture, the totalPartitionsProcessed is 2 for the Streaming table but it is 1 for the Batch ingested table. You shouldn’t worry about this additional latency as you are processing the latest data included from the streaming buffer. The additional latency can be 15–30 seconds depending upon the amount of data that you are querying.
The streaming buffer can take 10–90 minutes to flush the data to the permanent Columnar storage. In the latest version of the BigQuery backend (v2) this is reduced to 2–3 minutes. As of writing this blog the default version of the BigQuery backend is v1.