This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7acadb6 [BEAM-10708] Support streaming cache in beam_sql magic new a871a49 Merge pull request #15446 from KevinGG/ib-ts-coder 7acadb6 is described below commit 7acadb6334e9bb24c1033e57d904e3522e495f52 Author: KevinGG <kawai...@gmail.com> AuthorDate: Wed Sep 1 15:11:51 2021 -0700 [BEAM-10708] Support streaming cache in beam_sql magic Updated the query source to always mark their element_type so that pickled Python coders are not introduced to the Java expansion service for SqlTransforms. --- sdks/python/apache_beam/runners/interactive/cache_manager.py | 2 -- .../apache_beam/runners/interactive/caching/streaming_cache.py | 2 -- sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py | 6 +++++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 886e56e..9ed0b25 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -208,8 +208,6 @@ class FileBasedCacheManager(CacheManager): def load_pcoder(self, *labels): saved_pcoder = self._saved_pcoders.get(self._path(*labels), None) - # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for - # WindowedValueHolder. if saved_pcoder is None or isinstance(saved_pcoder, coders.FastPrimitivesCoder): return self._default_pcoder diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py index 054c9a6..fc8a8aa 100644 --- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py +++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py @@ -390,8 +390,6 @@ class StreamingCache(CacheManager): def load_pcoder(self, *labels): saved_pcoder = self._saved_pcoders.get( os.path.join(self._cache_dir, *labels), None) - # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for - # WindowedValueHolder. if saved_pcoder is None or isinstance(saved_pcoder, coders.FastPrimitivesCoder): return self._default_pcoder diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py index cee3d34..1dc42e0 100644 --- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py +++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py @@ -227,7 +227,11 @@ def pcolls_from_streaming_cache( endpoint=test_stream_service.endpoint) sql_source = {} for tag, output in output_pcolls.items(): - sql_source[tag_to_name[tag]] = output + name = tag_to_name[tag] + # Must mark the element_type to avoid introducing pickled Python coder + # to the Java expansion service. + output.element_type = name_to_pcoll[name].element_type + sql_source[name] = output return sql_source