damccorm commented on code in PR #33313:
URL: https://github.com/apache/beam/pull/33313#discussion_r1876233005
##########
sdks/python/apache_beam/transforms/enrichment.py:
##########
@@ -165,8 +171,9 @@ def expand(self,
# EnrichmentSourceHandler returns a tuple of (request,response).
return (
fetched_data
- | "enrichment_join" >>
- beam.Map(lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())))
+ | "enrichment_join" >> beam.Map(
+ lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())
+ if not self._use_custom_types else self._join_fn(x[0], x[1])))
Review Comment:
Does it make sense to expose this to a user? Wouldn't we always want an
enrichment handler to specify this?
##########
sdks/python/apache_beam/ml/rag/embeddings/huggingface.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""RAG-specific embedding implementations using HuggingFace models."""
+
+from typing import Optional
+import apache_beam as beam
+from apache_beam.ml.transforms.embeddings.huggingface import (
+ SentenceTransformer, _SentenceTransformerModelHandler)
+from apache_beam.ml.transforms.base import EmbeddingsManager,
_TextEmbeddingHandler
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.rag.embeddings.base import create_rag_adapter
+
+
+class HuggingfaceTextEmbeddings(EmbeddingsManager):
+ """SentenceTransformer embeddings for RAG pipeline.
+
+ Extends EmbeddingsManager to work with RAG-specific types:
+ - Input: Chunk objects containing text to embed
+ - Output: Embedding objects containing vector representations
+
+ The adapter automatically:
+ - Extracts text from Chunk.content.text
+ - Preserves Chunk.id in Embedding.id
+ - Copies Chunk.metadata to Embedding.metadata
+ - Converts model output to Embedding.dense_embedding
+ """
+ def __init__(
+ self, model_name: str, *, max_seq_length: Optional[int] = None,
**kwargs):
+ """Initialize RAG embeddings.
+
+ Args:
+ model_name: Name of the sentence-transformers model to use
+ max_seq_length: Maximum sequence length for the model
+ **kwargs: Additional arguments passed to parent
+ """
+ super().__init__(type_adapter=create_rag_adapter(), **kwargs)
+ self.model_name = model_name
+ self.max_seq_length = max_seq_length
+ self.model_class = SentenceTransformer
+
+ def get_model_handler(self):
+ """Returns model handler configured with RAG adapter."""
+ return _SentenceTransformerModelHandler(
+ model_class=self.model_class,
+ max_seq_length=self.max_seq_length,
+ model_name=self.model_name,
+ load_model_args=self.load_model_args,
+ min_batch_size=self.min_batch_size,
+ max_batch_size=self.max_batch_size,
+ large_model=self.large_model)
+
+ def get_ptransform_for_processing(self, **kwargs) -> beam.PTransform:
Review Comment:
Dropping this here, but it is generally applicable - there are a lot of
input/outputs which could use tighter type annotations. In general, I think
this is true of a lot of the embeddings/MLTransform code, but it would be great
to improve on that where we can.
For this one, it would be nice for this to return `beam.PTransform[str,
List[float]]` (I think)
##########
sdks/python/apache_beam/ml/rag/embeddings/base.py:
##########
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from apache_beam.ml.transforms.base import EmbeddingTypeAdapter
+from apache_beam.ml.rag.types import Embedding
+
+
+def create_rag_adapter() -> EmbeddingTypeAdapter:
+ """Creates adapter for converting between Chunk and Embedding types.
+
+ The adapter:
+ - Extracts text from Chunk.content.text for embedding
+ - Creates Embedding objects from model output
+ - Preserves Chunk.id and metadata in Embedding
+ - Sets sparse_embedding to None (dense embeddings only)
Review Comment:
I think that all we need to support sparse embeddings is to flip
dense/sparse in `dense_embedding=embeddings, sparse_embedding=None`, right? At
least in terms of this adapter? Should we just parameterize this?
##########
sdks/python/apache_beam/ml/rag/embeddings/base.py:
##########
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from apache_beam.ml.transforms.base import EmbeddingTypeAdapter
+from apache_beam.ml.rag.types import Embedding
+
+
+def create_rag_adapter() -> EmbeddingTypeAdapter:
+ """Creates adapter for converting between Chunk and Embedding types.
+
+ The adapter:
+ - Extracts text from Chunk.content.text for embedding
+ - Creates Embedding objects from model output
+ - Preserves Chunk.id and metadata in Embedding
+ - Sets sparse_embedding to None (dense embeddings only)
Review Comment:
Another thought - if we enabled that use case, it would be hard to generate
both dense and sparse embeddings in the same pipeline (because we're expecting
a `Chunk`, but as soon as you do this once it generates an `Embedding`). It
would be nice if this optionally accepted either a Chunk or an embedding.
I actually don't think we'd need to change any of the below code other than
naming to make it obvious that this can happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]