pandasanjay opened a new pull request, #35197:
URL: https://github.com/apache/beam/pull/35197

   **Description:**
   
   This pull request introduces a new `EnrichmentSourceHandler` for Apache 
Beam, `BigQueryStorageEnrichmentHandler`, designed to leverage the Google Cloud 
BigQuery Storage Read API for efficient data enrichment. This handler provides 
a high-performance alternative to traditional SQL-based BigQuery lookups within 
Beam pipelines.
   
   **Motivation and Context:**
   
   Enriching data by joining PCollection elements with data stored in BigQuery 
is a common use case. While existing methods often rely on executing SQL 
queries, the BigQuery Storage Read API offers a more direct and typically 
faster way to retrieve data, especially for large volumes or when fine-grained 
row-level access is needed. This handler aims to:
   
   *   Improve the performance of BigQuery enrichments.
   *   Reduce BigQuery query costs associated with SQL execution for enrichment 
tasks.
   *   Provide more flexible and programmatic control over data fetching and 
filtering.
   
   **Key Features and Improvements:**
   
   The `BigQueryStorageEnrichmentHandler` offers several enhancements:
   
   *   **Efficient Data Retrieval:** Utilizes the BigQuery Storage Read API for 
significantly faster data reads compared to SQL queries, especially for bulk 
lookups. Data is read in Apache Arrow format, minimizing 
serialization/deserialization overhead.
   *   **Flexible Filtering:**
       *   Supports static filter templates via `row_restriction_template`.
       *   Allows dynamic, per-element filter string generation using 
`row_restriction_template_fn`.
   *   **Advanced Keying and Value Extraction:**
       *   `fields`: Specifies input `beam.Row` fields for generating join keys 
and for use in filter templates.
       *   `additional_condition_fields`: Allows using input fields for 
filtering *without* including them in the join key.
       *   `condition_value_fn`: Provides complete control over generating the 
dictionary of values used for both filtering and join key creation.
   *   **Field Renaming/Aliasing:** Supports aliasing of selected BigQuery 
columns (e.g., `original_col as alias_col` in `column_names`) to prevent naming 
conflicts in the enriched `beam.Row`.
   *   **Batching Support:** Groups multiple input elements to make fewer 
`CreateReadSession` calls, reducing API overhead. Batch size and duration are 
configurable (`min_batch_size`, `max_batch_size`, `max_batch_duration_secs`).
   *   **Parallel Stream Reading:** (Experimental) Employs a 
`ThreadPoolExecutor` to read data from multiple streams of a BigQuery Read 
Session in parallel, potentially improving data fetching throughput. 
Concurrency is configurable via `max_parallel_streams`.
   *   **Custom Row Selection:** Includes a `latest_value_selector` callback 
that allows users to define custom logic for selecting the desired row when 
multiple BigQuery rows match a single input request (e.g., picking the record 
with the most recent timestamp). `primary_keys` can be used by this selector.
   *   **Automatic Client Management:** Manages the lifecycle of the 
`BigQueryReadClient`.
   
   **Advantages over Traditional SQL-based BigQuery Enrichment:**
   
   *   **Performance:** Direct access to table storage via the Storage Read API 
typically bypasses the SQL query processing engine, leading to lower latency 
and higher throughput, especially for fetching many individual rows or large 
data segments.
   *   **Cost Efficiency:** Reading data via the Storage API can be more 
cost-effective than running many small SQL queries, as Storage API pricing is 
based on data scanned, while query pricing involves slots and scanned data.
   *   **Scalability:** The streaming nature of the Storage Read API is 
well-suited for scalable data processing in Beam.
   *   **Reduced Query Complexity:** For simple lookups, it avoids the need to 
construct and manage SQL query strings dynamically.
   
   **Documentation:**
   
   Comprehensive documentation for this handler, including usage examples, 
parameter descriptions, features, and limitations, has been added in [ 
`docs/bigquery_storage_enrichment_handler.md`](https://github.com/pandasanjay/beam-custom-implementation/blob/main/docs/bigquery_storage_enrichment_handler.md).
   
   **Implementation Details:**
   
   The handler 
(`sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py`) manages 
`BigQueryReadClient` instances, constructs `ReadSession` requests with 
appropriate row restrictions and selected fields, and processes the resulting 
Arrow record batches. It integrates with Beam's `Enrichment` transform, 
providing batching and caching key generation.
   
   **Testing Considerations:**
   
   *   Unit tests for key generation, filter construction, and data processing 
logic.
   *   Integration tests against a live BigQuery instance.
   *   Performance benchmarks comparing against SQL-based handlers.
   
   This handler provides a powerful and efficient way to enrich data in Apache 
Beam pipelines using BigQuery.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/actions/workflows/build_wheels.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/actions/workflows/python_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/actions/workflows/java_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/actions/workflows/go_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to