This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3329edba79f [Python] Add feast feature store handler for enrichment 
transform (#30957)
3329edba79f is described below

commit 3329edba79f7b99af983b22cf44ab6e77a8ee987
Author: Ritesh Ghorse <riteshgho...@gmail.com>
AuthorDate: Fri Apr 26 14:31:28 2024 -0400

    [Python] Add feast feature store handler for enrichment transform (#30957)
    
    * add feast feature store handler
    
    * add changes, unit test
    
    * remove duplicate test, add doc
    
    * correct string formatting
    
    * add lambda, use filesystems, start test
    
    * update pydoc
---
 .github/trigger_files/beam_PostCommit_Python.json  |   1 -
 CHANGES.md                                         |  34 ++++
 .../enrichment_handlers/feast_feature_store.py     | 200 +++++++++++++++++++++
 .../feast_feature_store_it_test.py                 | 121 +++++++++++++
 .../feast_feature_store_test.py                    |  67 +++++++
 .../feast_tests_requirements.txt                   |  18 ++
 sdks/python/pytest.ini                             |   1 +
 sdks/python/scripts/generate_pydoc.sh              |   2 +-
 sdks/python/test-suites/direct/common.gradle       |  28 +++
 9 files changed, 470 insertions(+), 2 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 63bd5651def..c4edaa85a89 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,4 +1,3 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run"
 }
- 
diff --git a/CHANGES.md b/CHANGES.md
index 97894b08374..ec7fbe45668 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -53,6 +53,40 @@
 * ([#X](https://github.com/apache/beam/issues/X)).
 -->
 
+# [2.57.0] - Unreleased
+
+## Highlights
+
+* New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
+* New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
+
+## I/Os
+
+* Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+
+## New Features / Improvements
+
+* Added Feast feature store handler for enrichment transform (Python) 
([#30957](https://github.com/apache/beam/issues/30964)).
+
+## Breaking Changes
+
+* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+
+## Deprecations
+
+* X behavior is deprecated and will be removed in X versions 
([#X](https://github.com/apache/beam/issues/X)).
+
+## Bugfixes
+
+* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+
+## Security Fixes
+* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] 
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
+
+## Known Issues
+
+* ([#X](https://github.com/apache/beam/issues/X)).
+
 # [2.56.0] - Unreleased
 
 ## Highlights
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py
new file mode 100644
index 00000000000..dc2a71786f6
--- /dev/null
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store.py
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import tempfile
+from pathlib import Path
+from typing import Any
+from typing import Callable
+from typing import List
+from typing import Mapping
+from typing import Optional
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.transforms.enrichment import EnrichmentSourceHandler
+from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel
+from feast import FeatureStore
+
+__all__ = [
+    'FeastFeatureStoreEnrichmentHandler',
+]
+
+EntityRowFn = Callable[[beam.Row], Mapping[str, Any]]
+
+_LOGGER = logging.getLogger(__name__)
+
+LOCAL_FEATURE_STORE_YAML_FILENAME = 'fs_yaml_file.yaml'
+
+
+def download_fs_yaml_file(gcs_fs_yaml_file: str):
+  """Download the feature store config file for Feast."""
+  try:
+    with FileSystems.open(gcs_fs_yaml_file, 'r') as gcs_file:
+      with 
tempfile.NamedTemporaryFile(suffix=LOCAL_FEATURE_STORE_YAML_FILENAME,
+                                       delete=False) as local_file:
+        local_file.write(gcs_file.read())
+        return Path(local_file.name)
+  except Exception:
+    raise RuntimeError(
+        'error downloading the file %s locally to load the '
+        'Feast feature store.' % gcs_fs_yaml_file)
+
+
+def _validate_feature_names(feature_names, feature_service_name):
+  """Check if one of `feature_names` or `feature_service_name` is provided."""
+  if ((not feature_names and not feature_service_name) or
+      bool(feature_names and feature_service_name)):
+    raise ValueError(
+        'Please provide exactly one of a list of feature names to fetch '
+        'from online store (`feature_names`) or a feature service name for '
+        'the Feast online feature store (`feature_service_name`).')
+
+
+def _validate_feature_store_yaml_path_exists(fs_yaml_file):
+  """Check if the feature store yaml path exists."""
+  if not FileSystems.exists(fs_yaml_file):
+    raise ValueError(
+        'The feature store yaml path (%s) does not exist.' % fs_yaml_file)
+
+
+def _validate_entity_key_exists(entity_id, entity_row_fn):
+  """Checks if the entity key or a lambda to build entity key exists."""
+  if ((not entity_row_fn and not entity_id) or
+      bool(entity_row_fn and entity_id)):
+    raise ValueError(
+        "Please specify exactly one of a `entity_id` or a lambda "
+        "function with `entity_row_fn` to extract the entity id "
+        "from the input row.")
+
+
+class FeastFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row,
+                                                                 beam.Row]):
+  """Enrichment handler to interact with the Feast feature store.
+
+  To specify the features to fetch from Feast online store,
+  please specify exactly one of `feature_names` or `feature_service_name`.
+
+  Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment`
+  transform. To filter the features to enrich, use the `join_fn` param in
+  :class:`apache_beam.transforms.enrichment.Enrichment`.
+  """
+  def __init__(
+      self,
+      feature_store_yaml_path: str,
+      feature_names: Optional[List[str]] = None,
+      feature_service_name: Optional[str] = "",
+      full_feature_names: Optional[bool] = False,
+      entity_id: str = "",
+      *,
+      entity_row_fn: Optional[EntityRowFn] = None,
+      exception_level: ExceptionLevel = ExceptionLevel.WARN,
+  ):
+    """Initializes an instance of `FeastFeatureStoreEnrichmentHandler`.
+
+    Args:
+      feature_store_yaml_path (str): The path to a YAML configuration file for
+        the Feast feature store. See
+        https://docs.feast.dev/reference/feature-repository/feature-store-yaml
+        for configuration options supported by Feast.
+      feature_names: A list of feature names to be retrieved from the online
+        Feast feature store.
+      feature_service_name (str): The name of the feature service containing
+        the features to fetch from the online Feast feature store.
+      full_feature_names (bool): Whether to use full feature names
+        (including namespaces, etc.). Defaults to False.
+      entity_id (str): entity name for the entity associated with the features.
+        The `entity_id` is used to extract the entity value from the input row.
+        Please provide exactly one of `entity_id` or `entity_row_fn`.
+      entity_row_fn: a lambda function that takes an input `beam.Row` and
+        returns a dictionary with a mapping from the entity key column name to
+        entity key value. It is used to build/extract the entity dict for
+        feature retrieval. Please provide exactly one of `entity_id` or
+        `entity_row_fn`.
+        See https://docs.feast.dev/getting-started/concepts/feature-retrieval
+        for more information.
+      exception_level: a `enum.Enum` value from
+        `apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel`
+        to set the level when `None` feature values are fetched from the
+        online Feast store. Defaults to `ExceptionLevel.WARN`.
+    """
+    self.entity_id = entity_id
+    self.feature_store_yaml_path = feature_store_yaml_path
+    self.feature_names = feature_names
+    self.feature_service_name = feature_service_name
+    self.full_feature_names = full_feature_names
+    self.entity_row_fn = entity_row_fn
+    self._exception_level = exception_level
+    _validate_entity_key_exists(self.entity_id, self.entity_row_fn)
+    _validate_feature_store_yaml_path_exists(self.feature_store_yaml_path)
+    _validate_feature_names(self.feature_names, self.feature_service_name)
+
+  def __enter__(self):
+    """Connect with the Feast feature store."""
+    local_repo_path = download_fs_yaml_file(self.feature_store_yaml_path)
+    try:
+      self.store = FeatureStore(fs_yaml_file=local_repo_path)
+    except Exception:
+      raise RuntimeError(
+          'Invalid feature store yaml file provided. Make sure '
+          'the %s contains the valid configuration for Feast feature store.' %
+          self.feature_store_yaml_path)
+    if self.feature_service_name:
+      try:
+        self.features = self.store.get_feature_service(
+            self.feature_service_name)
+      except Exception:
+        raise RuntimeError(
+            'Could not find the feature service %s for the feature '
+            'store configured in %s.' %
+            (self.feature_service_name, self.feature_store_yaml_path))
+    else:
+      self.features = self.feature_names
+
+  def __call__(self, request: beam.Row, *args, **kwargs):
+    """Fetches feature values for an entity-id from the Feast feature store.
+
+    Args:
+      request: the input `beam.Row` to enrich.
+    """
+    if self.entity_row_fn:
+      entity_dict = self.entity_row_fn(request)
+    else:
+      request_dict = request._asdict()
+      entity_dict = {self.entity_id: request_dict[self.entity_id]}
+    feature_values = self.store.get_online_features(
+        features=self.features,
+        entity_rows=[entity_dict],
+        full_feature_names=self.full_feature_names).to_dict()
+    # get_online_features() returns a list of feature values per entity-id.
+    # Since we do this per entity, the list of feature values only contain
+    # a single element at position 0.
+    response_dict = {k: v[0] for k, v in feature_values.items()}
+    return request, beam.Row(**response_dict)
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    """Clean the instantiated Feast feature store client."""
+    self.store = None
+
+  def get_cache_key(self, request: beam.Row) -> str:
+    """Returns a string formatted with unique entity-id for the feature values.
+    """
+    if self.entity_row_fn:
+      entity_dict = self.entity_row_fn(request)
+      entity_id = list(entity_dict.keys())[0]
+    else:
+      entity_id = self.entity_id
+    return 'entity_id: %s' % request._asdict()[entity_id]
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py
 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py
new file mode 100644
index 00000000000..89cb39c2c19
--- /dev/null
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_it_test.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests Feast feature store enrichment handler for enrichment transform.
+
+See https://s.apache.org/feast-enrichment-test-setup
+to set up test feast feature repository.
+"""
+
+import unittest
+from typing import Any
+from typing import Mapping
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# pylint: disable=ungrouped-imports
+try:
+  from apache_beam.transforms.enrichment import Enrichment
+  from apache_beam.transforms.enrichment_handlers.feast_feature_store import \
+    FeastFeatureStoreEnrichmentHandler
+  from 
apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store_it_test 
import ValidateResponse  # pylint: disable=line-too-long
+except ImportError:
+  raise unittest.SkipTest(
+      'Feast feature store test dependencies are not installed.')
+
+
+def _entity_row_fn(request: beam.Row) -> Mapping[str, Any]:
+  entity_value = request.user_id  # type: ignore[attr-defined]
+  return {'user_id': entity_value}
+
+
+@pytest.mark.uses_feast
+class TestFeastEnrichmentHandler(unittest.TestCase):
+  def setUp(self) -> None:
+    self.feature_store_yaml_file = (
+        'gs://apache-beam-testing-enrichment/'
+        'feast-feature-store/repos/ecommerce/'
+        'feature_repo/feature_store.yaml')
+    self.feature_service_name = 'demograph_service'
+
+  def test_feast_enrichment(self):
+    requests = [
+        beam.Row(user_id=2, product_id=1),
+        beam.Row(user_id=6, product_id=2),
+        beam.Row(user_id=9, product_id=3),
+    ]
+    expected_fields = [
+        'user_id', 'product_id', 'state', 'country', 'gender', 'age'
+    ]
+    handler = FeastFeatureStoreEnrichmentHandler(
+        entity_id='user_id',
+        feature_store_yaml_path=self.feature_store_yaml_file,
+        feature_service_name=self.feature_service_name,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      _ = (
+          test_pipeline
+          | beam.Create(requests)
+          | Enrichment(handler)
+          | beam.ParDo(ValidateResponse(expected_fields)))
+
+  def test_feast_enrichment_bad_feature_service_name(self):
+    """Test raising an error when a bad feature service name is given."""
+    requests = [
+        beam.Row(user_id=1, product_id=1),
+    ]
+    handler = FeastFeatureStoreEnrichmentHandler(
+        entity_id='user_id',
+        feature_store_yaml_path=self.feature_store_yaml_file,
+        feature_service_name="bad_name",
+    )
+
+    with self.assertRaises(RuntimeError):
+      test_pipeline = beam.Pipeline()
+      _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      res = test_pipeline.run()
+      res.wait_until_finish()
+
+  def test_feast_enrichment_with_lambda(self):
+    requests = [
+        beam.Row(user_id=2, product_id=1),
+        beam.Row(user_id=6, product_id=2),
+        beam.Row(user_id=9, product_id=3),
+    ]
+    expected_fields = [
+        'user_id', 'product_id', 'state', 'country', 'gender', 'age'
+    ]
+    handler = FeastFeatureStoreEnrichmentHandler(
+        feature_store_yaml_path=self.feature_store_yaml_file,
+        feature_service_name=self.feature_service_name,
+        entity_row_fn=_entity_row_fn,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      _ = (
+          test_pipeline
+          | beam.Create(requests)
+          | Enrichment(handler)
+          | beam.ParDo(ValidateResponse(expected_fields)))
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py
 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py
new file mode 100644
index 00000000000..764086ab2c9
--- /dev/null
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_feature_store_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from parameterized import parameterized
+
+try:
+  from apache_beam.transforms.enrichment_handlers.feast_feature_store import \
+    FeastFeatureStoreEnrichmentHandler
+  from apache_beam.transforms.enrichment_handlers.feast_feature_store_it_test \
+    import _entity_row_fn
+except ImportError:
+  raise unittest.SkipTest(
+      'Feast feature store test dependencies are not installed.')
+
+
+class TestFeastFeatureStoreHandler(unittest.TestCase):
+  def setUp(self) -> None:
+    self.feature_store_yaml_file = (
+        'gs://apache-beam-testing-enrichment/'
+        'feast-feature-store/repos/ecommerce/'
+        'feature_repo/feature_store.yaml')
+    self.feature_service_name = 'demograph_service'
+
+  def test_feature_store_yaml_path_exists(self):
+    feature_store_yaml_path = 
'gs://apache-beam-testing-enrichment/invalid.yaml'
+    with self.assertRaises(ValueError):
+      _ = FeastFeatureStoreEnrichmentHandler(
+          entity_id='user_id',
+          feature_store_yaml_path=feature_store_yaml_path,
+          feature_service_name=self.feature_service_name,
+      )
+
+  def test_feast_enrichment_no_feature_service(self):
+    """Test raising an error in case of no feature service name."""
+    with self.assertRaises(ValueError):
+      _ = FeastFeatureStoreEnrichmentHandler(
+          entity_id='user_id',
+          feature_store_yaml_path=self.feature_store_yaml_file,
+      )
+
+  @parameterized.expand([('user_id', _entity_row_fn), ('', None)])
+  def test_feast_enrichment_invalid_args(self, entity_id, entity_row_fn):
+    with self.assertRaises(ValueError):
+      _ = FeastFeatureStoreEnrichmentHandler(
+          feature_store_yaml_path=self.feature_store_yaml_file,
+          entity_id=entity_id,
+          entity_row_fn=entity_row_fn,
+      )
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt
 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt
new file mode 100644
index 00000000000..3e0c1f50bd7
--- /dev/null
+++ 
b/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+feast[gcp]
\ No newline at end of file
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 8df74adbc62..b10acaac71c 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -67,6 +67,7 @@ markers =
     vertex_ai_postcommit: vertex ai postcommits that need additional deps.
     uses_testcontainer: tests that use testcontainers.
     uses_mock_api: tests that uses the mock API cluster.
+    uses_feast: tests that uses feast in some way
 
 # Default timeout intended for unit tests.
 # If certain tests need a different value, please see the docs on how to
diff --git a/sdks/python/scripts/generate_pydoc.sh 
b/sdks/python/scripts/generate_pydoc.sh
index 48b5e01d55a..c0955aa183b 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -135,7 +135,7 @@ autodoc_member_order = 'bysource'
 autodoc_mock_imports = ["tensorrt", "cuda", "torch",
     "onnxruntime", "onnx", "tensorflow", "tensorflow_hub",
     "tensorflow_transform", "tensorflow_metadata", "transformers", "xgboost", 
"datatable", "transformers",
-    "sentence_transformers", "redis", "tensorflow_text",
+    "sentence_transformers", "redis", "tensorflow_text", "feast",
     ]
 
 # Allow a special section for documenting DataFrame API
diff --git a/sdks/python/test-suites/direct/common.gradle 
b/sdks/python/test-suites/direct/common.gradle
index 1851a4d9cd0..c79c5f66abb 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -392,6 +392,33 @@ task testcontainersTest {
     }
 }
 
+// Integration tests that uses feast
+task feastIntegrationTest {
+  dependsOn 'installGcpTest'
+  dependsOn ':sdks:python:sdist'
+  def requirementsFile = 
"${rootDir}/sdks/python/apache_beam/transforms/enrichment_handlers/feast_tests_requirements.txt"
+  doFirst {
+      exec {
+        executable 'sh'
+        args '-c', ". ${envdir}/bin/activate && pip install -r 
$requirementsFile"
+      }
+    }
+  doLast {
+      def testOpts = basicTestOpts
+      def argMap = [
+          "test_opts": testOpts,
+          "suite": "postCommitIT-direct-py${pythonVersionSuffix}",
+          "collect": "uses_feast",
+          "runner": "TestDirectRunner"
+      ]
+      def cmdArgs = mapToArgString(argMap)
+      exec {
+        executable 'sh'
+        args '-c', ". ${envdir}/bin/activate && 
${runScriptsDir}/run_integration_test.sh $cmdArgs"
+      }
+    }
+}
+
 // Add all the RunInference framework IT tests to this gradle task that runs 
on Direct Runner Post commit suite.
 project.tasks.register("inferencePostCommitIT") {
   dependsOn = [
@@ -401,6 +428,7 @@ project.tasks.register("inferencePostCommitIT") {
   'xgboostInferenceTest',
   'transformersInferenceTest',
   'testcontainersTest',
+  'feastIntegrationTest',
   // (TODO) https://github.com/apache/beam/issues/25799
    // uncomment tfx bsl tests once tfx supports protobuf 4.x
   // 'tfxInferenceTest',

Reply via email to