This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7acadb6  [BEAM-10708] Support streaming cache in beam_sql magic
     new a871a49  Merge pull request #15446 from KevinGG/ib-ts-coder
7acadb6 is described below

commit 7acadb6334e9bb24c1033e57d904e3522e495f52
Author: KevinGG <kawai...@gmail.com>
AuthorDate: Wed Sep 1 15:11:51 2021 -0700

    [BEAM-10708] Support streaming cache in beam_sql magic
    
    Updated the query source to always mark their element_type so that
    pickled Python coders are not introduced to the Java expansion service
    for SqlTransforms.
---
 sdks/python/apache_beam/runners/interactive/cache_manager.py        | 2 --
 .../apache_beam/runners/interactive/caching/streaming_cache.py      | 2 --
 sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py  | 6 +++++-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 886e56e..9ed0b25 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -208,8 +208,6 @@ class FileBasedCacheManager(CacheManager):
 
   def load_pcoder(self, *labels):
     saved_pcoder = self._saved_pcoders.get(self._path(*labels), None)
-    # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
-    # WindowedValueHolder.
     if saved_pcoder is None or isinstance(saved_pcoder,
                                           coders.FastPrimitivesCoder):
       return self._default_pcoder
diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 054c9a6..fc8a8aa 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -390,8 +390,6 @@ class StreamingCache(CacheManager):
   def load_pcoder(self, *labels):
     saved_pcoder = self._saved_pcoders.get(
         os.path.join(self._cache_dir, *labels), None)
-    # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
-    # WindowedValueHolder.
     if saved_pcoder is None or isinstance(saved_pcoder,
                                           coders.FastPrimitivesCoder):
       return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py 
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index cee3d34..1dc42e0 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -227,7 +227,11 @@ def pcolls_from_streaming_cache(
       endpoint=test_stream_service.endpoint)
   sql_source = {}
   for tag, output in output_pcolls.items():
-    sql_source[tag_to_name[tag]] = output
+    name = tag_to_name[tag]
+    # Must mark the element_type to avoid introducing pickled Python coder
+    # to the Java expansion service.
+    output.element_type = name_to_pcoll[name].element_type
+    sql_source[name] = output
   return sql_source
 
 

Reply via email to