pandasanjay opened a new pull request, #35197: URL: https://github.com/apache/beam/pull/35197
**Description:** This pull request introduces a new `EnrichmentSourceHandler` for Apache Beam, `BigQueryStorageEnrichmentHandler`, designed to leverage the Google Cloud BigQuery Storage Read API for efficient data enrichment. This handler provides a high-performance alternative to traditional SQL-based BigQuery lookups within Beam pipelines. **Motivation and Context:** Enriching data by joining PCollection elements with data stored in BigQuery is a common use case. While existing methods often rely on executing SQL queries, the BigQuery Storage Read API offers a more direct and typically faster way to retrieve data, especially for large volumes or when fine-grained row-level access is needed. This handler aims to: * Improve the performance of BigQuery enrichments. * Reduce BigQuery query costs associated with SQL execution for enrichment tasks. * Provide more flexible and programmatic control over data fetching and filtering. **Key Features and Improvements:** The `BigQueryStorageEnrichmentHandler` offers several enhancements: * **Efficient Data Retrieval:** Utilizes the BigQuery Storage Read API for significantly faster data reads compared to SQL queries, especially for bulk lookups. Data is read in Apache Arrow format, minimizing serialization/deserialization overhead. * **Flexible Filtering:** * Supports static filter templates via `row_restriction_template`. * Allows dynamic, per-element filter string generation using `row_restriction_template_fn`. * **Advanced Keying and Value Extraction:** * `fields`: Specifies input `beam.Row` fields for generating join keys and for use in filter templates. * `additional_condition_fields`: Allows using input fields for filtering *without* including them in the join key. * `condition_value_fn`: Provides complete control over generating the dictionary of values used for both filtering and join key creation. * **Field Renaming/Aliasing:** Supports aliasing of selected BigQuery columns (e.g., `original_col as alias_col` in `column_names`) to prevent naming conflicts in the enriched `beam.Row`. * **Batching Support:** Groups multiple input elements to make fewer `CreateReadSession` calls, reducing API overhead. Batch size and duration are configurable (`min_batch_size`, `max_batch_size`, `max_batch_duration_secs`). * **Parallel Stream Reading:** (Experimental) Employs a `ThreadPoolExecutor` to read data from multiple streams of a BigQuery Read Session in parallel, potentially improving data fetching throughput. Concurrency is configurable via `max_parallel_streams`. * **Custom Row Selection:** Includes a `latest_value_selector` callback that allows users to define custom logic for selecting the desired row when multiple BigQuery rows match a single input request (e.g., picking the record with the most recent timestamp). `primary_keys` can be used by this selector. * **Automatic Client Management:** Manages the lifecycle of the `BigQueryReadClient`. **Advantages over Traditional SQL-based BigQuery Enrichment:** * **Performance:** Direct access to table storage via the Storage Read API typically bypasses the SQL query processing engine, leading to lower latency and higher throughput, especially for fetching many individual rows or large data segments. * **Cost Efficiency:** Reading data via the Storage API can be more cost-effective than running many small SQL queries, as Storage API pricing is based on data scanned, while query pricing involves slots and scanned data. * **Scalability:** The streaming nature of the Storage Read API is well-suited for scalable data processing in Beam. * **Reduced Query Complexity:** For simple lookups, it avoids the need to construct and manage SQL query strings dynamically. **Documentation:** Comprehensive documentation for this handler, including usage examples, parameter descriptions, features, and limitations, has been added in [ `docs/bigquery_storage_enrichment_handler.md`](https://github.com/pandasanjay/beam-custom-implementation/blob/main/docs/bigquery_storage_enrichment_handler.md). **Implementation Details:** The handler (`sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py`) manages `BigQueryReadClient` instances, constructs `ReadSession` requests with appropriate row restrictions and selected fields, and processes the resulting Arrow record batches. It integrates with Beam's `Enrichment` transform, providing batching and caching key generation. **Testing Considerations:** * Unit tests for key generation, filter construction, and data processing logic. * Integration tests against a live BigQuery instance. * Performance benchmarks comparing against SQL-based handlers. This handler provides a powerful and efficient way to enrich data in Apache Beam pipelines using BigQuery. **Please** add a meaningful description for your change here ------------------------ Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) ------------------------------------------------------------------------------------------------ [](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org