[beam] branch master updated (177c134 -> 43ce5ca)
This is an automated email from the ASF dual-hosted git repository. lostluck pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 177c134 [BEAM-7389] Add code snippet for CoGroupByKey (#9791) add 43ce5ca [Go SDK] Fix post commits: runtime error in logger No new revisions were added by this update. Summary of changes: sdks/go/pkg/beam/core/runtime/harness/logging.go | 2 +- .../pkg/beam/core/runtime/harness/logging_test.go | 51 ++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 sdks/go/pkg/beam/core/runtime/harness/logging_test.go
[beam] branch master updated (177c134 -> 43ce5ca)
This is an automated email from the ASF dual-hosted git repository. lostluck pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 177c134 [BEAM-7389] Add code snippet for CoGroupByKey (#9791) add 43ce5ca [Go SDK] Fix post commits: runtime error in logger No new revisions were added by this update. Summary of changes: sdks/go/pkg/beam/core/runtime/harness/logging.go | 2 +- .../pkg/beam/core/runtime/harness/logging_test.go | 51 ++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 sdks/go/pkg/beam/core/runtime/harness/logging_test.go
[beam] branch master updated (a9cace3 -> 177c134)
This is an automated email from the ASF dual-hosted git repository. altay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from a9cace3 [BEAM-8341]: basic bundling support for portable runner (#9777) add 177c134 [BEAM-7389] Add code snippet for CoGroupByKey (#9791) No new revisions were added by this update. Summary of changes: .../snippets/transforms/aggregation}/__init__.py | 0 .../values.py => aggregation/cogroupbykey.py} | 31 +- .../cogroupbykey_test.py} | 29 .../transforms/elementwise/partition_test.py | 10 +++ sdks/python/apache_beam/examples/snippets/util.py | 10 --- .../apache_beam/examples/snippets/util_test.py | 11 6 files changed, 60 insertions(+), 31 deletions(-) copy sdks/python/apache_beam/{io/gcp/tests => examples/snippets/transforms/aggregation}/__init__.py (100%) copy sdks/python/apache_beam/examples/snippets/transforms/{elementwise/values.py => aggregation/cogroupbykey.py} (64%) copy sdks/python/apache_beam/examples/snippets/transforms/{elementwise/values_test.py => aggregation/cogroupbykey_test.py} (61%)
[beam] branch revert-9854-BEAM-8457 created (now 1d59072)
This is an automated email from the ASF dual-hosted git repository. altay pushed a change to branch revert-9854-BEAM-8457 in repository https://gitbox.apache.org/repos/asf/beam.git. at 1d59072 Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook" This branch includes the following new commits: new 1d59072 Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook" The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[beam] 01/01: Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook"
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch revert-9854-BEAM-8457 in repository https://gitbox.apache.org/repos/asf/beam.git commit 1d59072b835283c48e5d047074e67e2db2911171 Author: Ahmet Altay AuthorDate: Thu Oct 24 17:25:22 2019 -0700 Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook" This reverts commit 1a8391da9222ab8d0493b0007bd60bdbeeb5e275. --- sdks/python/apache_beam/pipeline.py| 48 -- .../runners/dataflow/dataflow_runner.py| 10 - .../runners/interactive/interactive_runner.py | 2 +- 3 files changed, 9 insertions(+), 51 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5574a82..a776d30 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -171,10 +171,6 @@ class Pipeline(object): # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. self.applied_labels = set() -# A boolean value indicating whether the pipeline is created in an -# interactive environment such as interactive notebooks. Initialized as -# None. The value is set ad hoc when `pipeline.run()` is invoked. -self.interactive = None @property @deprecated(since='First stable release', @@ -399,56 +395,28 @@ class Pipeline(object): for override in replacements: self._check_replacement(override) - def run(self, test_runner_api=True, runner=None, options=None, - interactive=None): -"""Runs the pipeline. Returns whatever our runner returns after running. - -If another runner instance and options are provided, that runner will -execute the pipeline with the given options. If either of them is not set, -a ValueError is raised. The usage is similar to directly invoking -`runner.run_pipeline(pipeline, options)`. -Additionally, an interactive field can be set to override the pipeline's -self.interactive field to mark current pipeline as being initiated from an -interactive environment. -""" -from apache_beam.runners.interactive import interactive_runner -if interactive: - self.interactive = interactive -elif isinstance(self.runner, interactive_runner.InteractiveRunner): - self.interactive = True -else: - self.interactive = False -runner_in_use = self.runner -options_in_use = self._options -if runner and options: - runner_in_use = runner - options_in_use = options -elif not runner and options: - raise ValueError('Parameter runner is not given when parameter options ' - 'is given.') -elif not options and runner: - raise ValueError('Parameter options is not given when parameter runner ' - 'is given.') + def run(self, test_runner_api=True): +"""Runs the pipeline. Returns whatever our runner returns after running.""" + # When possible, invoke a round trip through the runner API. if test_runner_api and self._verify_runner_api_compatible(): return Pipeline.from_runner_api( self.to_runner_api(use_fake_coders=True), - runner_in_use, - options_in_use).run(test_runner_api=False, - interactive=self.interactive) + self.runner, + self._options).run(False) -if options_in_use.view_as(TypeOptions).runtime_type_check: +if self._options.view_as(TypeOptions).runtime_type_check: from apache_beam.typehints import typecheck self.visit(typecheck.TypeCheckVisitor()) -if options_in_use.view_as(SetupOptions).save_main_session: +if self._options.view_as(SetupOptions).save_main_session: # If this option is chosen, verify we can pickle the main session early. tmpdir = tempfile.mkdtemp() try: pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) finally: shutil.rmtree(tmpdir) -return runner_in_use.run_pipeline(self, options_in_use) +return self.runner.run_pipeline(self, self._options) def __enter__(self): return self diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f57be74..4928550 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -364,16 +364,6 @@ class DataflowRunner(PipelineRunner): def run_pipeline(self, pipeline, options): """Remotely executes entire pipeline or parts reachable from node.""" -# Label goog-dataflow-notebook if pipeline is initiated from interactive -# runner. -if pipeline.interactive: - notebook_version = ('goog-dataflow-notebook=' + - beam.version.__version__.replace('.', '_')) -
[beam] branch pabloem-patch-1 updated (1e56cbd -> ae7c6d6)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch pabloem-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. from 1e56cbd Encouraging the use of user@ and SO for questions. add ae7c6d6 Adding contact us No new revisions were added by this update. Summary of changes: website/src/_includes/footer.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (f67e3d8 -> a9cace3)
This is an automated email from the ASF dual-hosted git repository. xinyu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from f67e3d8 Merge pull request #8496 from [BEAM-5967] Add handling of DynamicMessage in ProtoCoder add a9cace3 [BEAM-8341]: basic bundling support for portable runner (#9777) No new revisions were added by this update. Summary of changes: .../beam/runners/samza/SamzaPipelineOptions.java | 12 ++ .../samza/SamzaPipelineOptionsValidator.java | 41 +- .../org/apache/beam/runners/samza/SamzaRunner.java | 7 +- .../apache/beam/runners/samza/runtime/DoFnOp.java | 141 ++--- .../beam/runners/samza/runtime/GroupByKeyOp.java | 6 +- .../org/apache/beam/runners/samza/runtime/Op.java | 2 +- .../beam/runners/samza/runtime/OpAdapter.java | 2 +- .../runners/samza/runtime/SamzaDoFnRunners.java| 74 --- .../translation/ParDoBoundMultiTranslator.java | 2 + .../samza/translation/SamzaPipelineTranslator.java | 4 + 10 files changed, 243 insertions(+), 48 deletions(-)
[beam] branch pabloem-patch-1 updated (16363f8 -> 1e56cbd)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch pabloem-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. from 16363f8 Point Support to Contact Us, and add Events link. add 1e56cbd Encouraging the use of user@ and SO for questions. No new revisions were added by this update. Summary of changes: website/src/community/contact-us.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-)
[beam] branch pabloem-patch-1 created (now 16363f8)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch pabloem-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. at 16363f8 Point Support to Contact Us, and add Events link. No new revisions were added by this update.
[beam] branch master updated (f21f417 -> f67e3d8)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from f21f417 Moving to 2.18.0-SNAPSHOT on master branch. new 25054b8 [BEAM-5967] Add handling of DynamicMessage in ProtoCoder new ba4a121 [BEAM-5967] Add Nullable annotations and corrected JavaDoc after review new cb77473 Backported ProtoDomainTest for schema aware PR new 64829e1 Split the ProtoCoder into ProtoCoder and DynamicProtoCoder. new f67e3d8 Merge pull request #8496 from [BEAM-5967] Add handling of DynamicMessage in ProtoCoder The 23594 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/beam/sdk/testing/CoderProperties.java | 11 + .../sdk/extensions/protobuf/DynamicProtoCoder.java | 205 + .../beam/sdk/extensions/protobuf/ProtoCoder.java | 63 +++--- .../beam/sdk/extensions/protobuf/ProtoDomain.java | 248 + .../extensions/protobuf/DynamicProtoCoderTest.java | 92 .../extensions/protobuf/IsDynamicMessageEqual.java | 69 ++ .../sdk/extensions/protobuf/ProtoCoderTest.java| 7 + .../sdk/extensions/protobuf/ProtoDomainTest.java | 55 + 8 files changed, 722 insertions(+), 28 deletions(-) create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java
[beam] branch master updated (f21f417 -> f67e3d8)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from f21f417 Moving to 2.18.0-SNAPSHOT on master branch. new 25054b8 [BEAM-5967] Add handling of DynamicMessage in ProtoCoder new ba4a121 [BEAM-5967] Add Nullable annotations and corrected JavaDoc after review new cb77473 Backported ProtoDomainTest for schema aware PR new 64829e1 Split the ProtoCoder into ProtoCoder and DynamicProtoCoder. new f67e3d8 Merge pull request #8496 from [BEAM-5967] Add handling of DynamicMessage in ProtoCoder The 23594 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/beam/sdk/testing/CoderProperties.java | 11 + .../sdk/extensions/protobuf/DynamicProtoCoder.java | 205 + .../beam/sdk/extensions/protobuf/ProtoCoder.java | 63 +++--- .../beam/sdk/extensions/protobuf/ProtoDomain.java | 248 + .../extensions/protobuf/DynamicProtoCoderTest.java | 92 .../extensions/protobuf/IsDynamicMessageEqual.java | 69 ++ .../sdk/extensions/protobuf/ProtoCoderTest.java| 7 + .../sdk/extensions/protobuf/ProtoDomainTest.java | 55 + 8 files changed, 722 insertions(+), 28 deletions(-) create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java create mode 100644 sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomain.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoderTest.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/IsDynamicMessageEqual.java create mode 100644 sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDomainTest.java
[beam] 01/01: Create release branch for version 2.17.0.
This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git commit bccb3bd3e376b21b0b4932628103d9e7c05a0843 Author: Mikhail Gryzykhin AuthorDate: Thu Oct 24 10:58:28 2019 -0700 Create release branch for version 2.17.0. --- gradle.properties | 2 +- runners/google-cloud-dataflow-java/build.gradle | 2 +- sdks/python/apache_beam/version.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/gradle.properties b/gradle.properties index 412f5a1..8a0fae7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,4 +24,4 @@ signing.gnupg.executable=gpg signing.gnupg.useLegacyGpg=true version=2.17.0-SNAPSHOT -python_sdk_version=2.17.0.dev +python_sdk_version=2.17.0 diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 1569d29..a76cd26 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -39,7 +39,7 @@ processResources { filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [ 'dataflow.legacy_environment_major_version' : '7', 'dataflow.fnapi_environment_major_version' : '7', -'dataflow.container_version' : 'beam-master-20190829' +'dataflow.container_version' : 'beam-2.17.0' ] } diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 1365114..10772d2 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -18,4 +18,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.17.0.dev' +__version__ = '2.17.0'
[beam] branch release-2.17.0 created (now bccb3bd)
This is an automated email from the ASF dual-hosted git repository. mikhail pushed a change to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git. at bccb3bd Create release branch for version 2.17.0. This branch includes the following new commits: new bccb3bd Create release branch for version 2.17.0. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[beam] branch master updated: Moving to 2.18.0-SNAPSHOT on master branch.
This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f21f417 Moving to 2.18.0-SNAPSHOT on master branch. f21f417 is described below commit f21f41724f1d9bb07b49644499240f7e9bfe Author: Mikhail Gryzykhin AuthorDate: Thu Oct 24 10:57:28 2019 -0700 Moving to 2.18.0-SNAPSHOT on master branch. --- .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 2 +- gradle.properties | 4 ++-- sdks/python/apache_beam/version.py| 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index b824535..3f7a4aa 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -300,7 +300,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' -project.version = '2.17.0' +project.version = '2.18.0' if (!isRelease(project)) { project.version += '-SNAPSHOT' } diff --git a/gradle.properties b/gradle.properties index 412f5a1..6eb4913 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,5 +23,5 @@ offlineRepositoryRoot=offline-repository signing.gnupg.executable=gpg signing.gnupg.useLegacyGpg=true -version=2.17.0-SNAPSHOT -python_sdk_version=2.17.0.dev +version=2.18.0-SNAPSHOT +python_sdk_version=2.18.0.dev diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 1365114..f32561a 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -18,4 +18,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.17.0.dev' +__version__ = '2.18.0.dev'
[beam] branch master updated: Revert "Moving to 2.18.0-SNAPSHOT on master branch."
This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7f93150 Revert "Moving to 2.18.0-SNAPSHOT on master branch." 7f93150 is described below commit 7f93150fe9bc9e8a5777196e43d90864072a94e7 Author: Mikhail Gryzykhin AuthorDate: Thu Oct 24 10:51:46 2019 -0700 Revert "Moving to 2.18.0-SNAPSHOT on master branch." This reverts commit 814222826b7f2ce5962226ddbd04245aada7c154. --- .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 2 +- gradle.properties | 4 ++-- sdks/python/apache_beam/version.py| 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3f7a4aa..b824535 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -300,7 +300,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' -project.version = '2.18.0' +project.version = '2.17.0' if (!isRelease(project)) { project.version += '-SNAPSHOT' } diff --git a/gradle.properties b/gradle.properties index 6eb4913..412f5a1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,5 +23,5 @@ offlineRepositoryRoot=offline-repository signing.gnupg.executable=gpg signing.gnupg.useLegacyGpg=true -version=2.18.0-SNAPSHOT -python_sdk_version=2.18.0.dev +version=2.17.0-SNAPSHOT +python_sdk_version=2.17.0.dev diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index f32561a..1365114 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -18,4 +18,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.18.0.dev' +__version__ = '2.17.0.dev'
[beam] branch master updated: Moving to 2.18.0-SNAPSHOT on master branch.
This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8142228 Moving to 2.18.0-SNAPSHOT on master branch. 8142228 is described below commit 814222826b7f2ce5962226ddbd04245aada7c154 Author: Mikhail Gryzykhin AuthorDate: Thu Oct 24 10:11:24 2019 -0700 Moving to 2.18.0-SNAPSHOT on master branch. --- .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy| 2 +- gradle.properties | 4 ++-- sdks/python/apache_beam/version.py| 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index b824535..3f7a4aa 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -300,7 +300,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' -project.version = '2.17.0' +project.version = '2.18.0' if (!isRelease(project)) { project.version += '-SNAPSHOT' } diff --git a/gradle.properties b/gradle.properties index 412f5a1..6eb4913 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,5 +23,5 @@ offlineRepositoryRoot=offline-repository signing.gnupg.executable=gpg signing.gnupg.useLegacyGpg=true -version=2.17.0-SNAPSHOT -python_sdk_version=2.17.0.dev +version=2.18.0-SNAPSHOT +python_sdk_version=2.18.0.dev diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 1365114..f32561a 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -18,4 +18,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.17.0.dev' +__version__ = '2.18.0.dev'
[beam] branch master updated (20b6913 -> 19799cd)
This is an automated email from the ASF dual-hosted git repository. kenn pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 20b6913 Merge pull request #9807 from echauchot/update_website_with_sequence_diagrams add 64263a5 [BEAM-8393] Fix Java BigQueryIO clustering support for multiple partitions add 19799cd Merge pull request #9784: [BEAM-8393] Fix Java BigQueryIO clustering support for multiple partitions No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[beam] branch master updated (20b6913 -> 19799cd)
This is an automated email from the ASF dual-hosted git repository. kenn pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 20b6913 Merge pull request #9807 from echauchot/update_website_with_sequence_diagrams add 64263a5 [BEAM-8393] Fix Java BigQueryIO clustering support for multiple partitions add 19799cd Merge pull request #9784: [BEAM-8393] Fix Java BigQueryIO clustering support for multiple partitions No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelpers discard 5
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelpers discard 5
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelpers discard 5
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelpers discard 5
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelpers discard 5
[beam] 35/37: Remove unneeded cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 27ef6de3fa90db6d59027f9a6fa792fc5787f6e9 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:51:49 2019 +0200 Remove unneeded cast --- .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java index 1983eaa..2fa4b1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.MapFunction; public final class KVHelpers { /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */ - public static MapFunction>, K> extractKey() { -return (MapFunction>, K>) wv -> wv.getValue().getKey(); + public static MapFunction>, K> extractKey() { +return wv -> wv.getValue().getKey(); } }
[beam] 18/37: Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 34e8aa8c31a561684eea2e2496757f9f3cae35d0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:14:32 2019 +0200 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations --- .../translation/helpers/EncoderHelpers.java| 18 ++ 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 91aaaf9..c9ab435 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -227,32 +227,34 @@ public class EncoderHelpers { /* CODE GENERATED: + final $javaType ${ev.value} try { - final $javaType ${ev.value} = + ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try { final "); + parts.add("final "); parts.add(" "); - parts.add(" ="); - parts.add("?"); - parts.add(":"); - parts.add("("); + parts.add(";try { "); + parts.add(" = "); + parts.add("? "); + parts.add(": ("); parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); - parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(javaType); args.add(ev.value()); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType);
[beam] 07/37: Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 2aaf07a41155f35ab36bda4c3c02a7ffa7bd66db Author: Etienne Chauchot AuthorDate: Thu Aug 29 15:10:40 2019 +0200 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities --- .../translation/helpers/EncoderHelpers.java| 64 +- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 8a4f1de..0765c78 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -100,13 +100,13 @@ public class EncoderHelpers { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(claz, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), classTag); /* @@ -126,16 +126,14 @@ public class EncoderHelpers { */ } - private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { -private Class claz; -private Coder beamCoder; private Expression child; +private Coder beamCoder; -private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { - this.claz = claz; +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { + this.child = child; this.beamCoder = beamCoder; - this.child = new BoundReference(0, new ObjectType(claz), true); } @Override public Expression child() { @@ -175,11 +173,18 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - return null; + switch (n) { +case 0: + return child; +case 1: + return beamCoder; +default: + throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); + } } @Override public int productArity() { - return 0; + return 2; } @Override public boolean canEqual(Object that) { @@ -194,11 +199,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return claz.equals(that.claz) && beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), claz, beamCoder); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -226,16 +231,16 @@ public class EncoderHelpers { override def dataType: DataType = BinaryType }*/ - private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ + public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private Class claz; -private Coder beamCoder; private Expression child; +private ClassTag classTag; +private Coder beamCoder; -private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { - this.claz = claz; +public DecodeUsingBeamCoder(Expression child, ClassTag classTag, Coder beamCoder) { + this.child = child; + this.classTag = classTag; this.beamCoder = beamCoder; - this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { @@ -267,7 +272,7 @@ public class EncoderHelpers { args.add(new VariableValue("deserialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", claz)); + return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", classTag.runtimeClass())); } @@ -280,17 +285,24 @@ public class EncoderHelpers { } @Override public DataType dataType() { -// return new ObjectTy
[beam] 08/37: Add a simple spark native test to test Beam coders wrapping into Spark Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e4478ffc2a9fd35d76080ff8f33cc8d3340cba1c Author: Etienne Chauchot AuthorDate: Fri Aug 30 17:34:13 2019 +0200 Add a simple spark native test to test Beam coders wrapping into Spark Encoders --- .../structuredstreaming/utils/EncodersTest.java| 29 ++ 1 file changed, 29 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java new file mode 100644 index 000..490e3dc --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -0,0 +1,29 @@ +package org.apache.beam.runners.spark.structuredstreaming.utils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.SparkSession; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +/** + * Test of the wrapping of Beam Coders as Spark ExpressionEncoders. + */ +public class EncodersTest { + + @Test + public void beamCoderToSparkEncoderTest() { +SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest") +.master("local[4]").getOrCreate(); +List data = new ArrayList<>(); +data.add(1); +data.add(2); +data.add(3); +//sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); + } +}
[beam] 32/37: Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c5e78a0f4552a094ba3914ef490629e136ac1beb Author: Etienne Chauchot AuthorDate: Tue Oct 1 17:52:32 2019 +0200 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner --- .../translation/batch/ParDoTranslatorBatch.java| 42 +-- .../translation/helpers/EncoderHelpers.java| 6 ++- .../translation/helpers/MultiOuputCoder.java | 49 + .../translation/helpers/Tuple2Coder.java | 62 -- 4 files changed, 81 insertions(+), 78 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 255adc8..f5a109e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -31,8 +31,10 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; @@ -84,12 +86,15 @@ class ParDoTranslatorBatch ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); // Init main variables -Dataset> inputDataSet = context.getDataset(context.getInput()); +PValue input = context.getInput(); +Dataset> inputDataSet = context.getDataset(input); Map, PValue> outputs = context.getOutputs(); TupleTag mainOutputTag = getTupleTag(context); List> outputTags = new ArrayList<>(outputs.keySet()); WindowingStrategy windowingStrategy = -((PCollection) context.getInput()).getWindowingStrategy(); +((PCollection) input).getWindowingStrategy(); +Coder inputCoder = ((PCollection) input).getCoder(); +Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows @@ -102,8 +107,6 @@ class ParDoTranslatorBatch SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context); Map, Coder> outputCoderMap = context.getOutputCoders(); -Coder inputCoder = ((PCollection) context.getInput()).getCoder(); - MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); List> additionalOutputTags = new ArrayList<>(); @@ -129,19 +132,25 @@ class ParDoTranslatorBatch broadcastStateData, doFnSchemaInformation); +MultiOuputCoder multipleOutputCoder = MultiOuputCoder +.of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder); Dataset, WindowedValue>> allOutputs = -inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); +inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder)); if (outputs.entrySet().size() > 1) { allOutputs.persist(); for (Map.Entry, PValue> output : outputs.entrySet()) { -pruneOutputFilteredByTag(context, allOutputs, output); +pruneOutputFilteredByTag(context, allOutputs, output, windowCoder); } } else { + Coder outputCoder = ((PCollection) outputs.get(mainOutputTag)).getCoder(); + Coder> windowedValueCoder = + (Coder>) + (Coder) WindowedValue.getFullCoder(outputCoder, windowCoder); Dataset> outputDataset = allOutputs.map( (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } } @@ -152,14 +161,14 @@ class ParDoTranslatorBatch JavaSparkConte
[beam] 34/37: Use beam encoders also in the output of the source translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 6a278395d77b3578da10a9621c85883a2d6f2ded Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:45:39 2019 +0200 Use beam encoders also in the output of the source translation --- .../translation/batch/ReadSourceTranslatorBatch.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index ceb87cf..6af7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9f1e34d..ea10272 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 31/37: Apply new Encoders to GroupByKey
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 039f58a6a07e567bb8c5636caecebc61dec9129e Author: Etienne Chauchot AuthorDate: Mon Sep 30 12:13:25 2019 +0200 Apply new Encoders to GroupByKey --- .../batch/GroupByKeyTranslatorBatch.java | 25 -- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 3e203a8..2970aa7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch Dataset>> input = context.getDataset(inputPCollection); +WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); +KvCoder kvCoder = (KvCoder) inputPCollection.getCoder(); + // group by key only +Coder keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset>> groupByKeyOnly = -input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); +input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( +keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable +Coder valueCoder = kvCoder.getValueCoder(); +WindowedValue.WindowedValueCoder wvCoder = +WindowedValue.FullWindowedValueCoder.of( +valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder()); +IterableCoder> iterableCoder = IterableCoder.of(wvCoder); Dataset>>> materialized = groupByKeyOnly.mapGroups( (MapGroupsFunction>, KV>>>) @@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch KV.of(key, Iterables.unmodifiableIterable(values)); return kv; }, -EncoderHelpers.kvEncoder()); +EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); -WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); -KvCoder coder = (KvCoder) inputPCollection.getCoder(); // group also by windows +WindowedValue.FullWindowedValueCoder>> outputCoder = WindowedValue.FullWindowedValueCoder +.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), +windowingStrategy.getWindowFn().windowCoder()); Dataset>>> output = materialized.flatMap( new GroupAlsoByWindowViaOutputBufferFn<>( windowingStrategy, new InMemoryStateInternalsFactory<>(), -SystemReduceFn.buffering(coder.getValueCoder()), +SystemReduceFn.buffering(valueCoder), context.getSerializableOptions()), -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(outputCoder)); context.putDataset(context.getOutput(), output); }
[beam] 29/37: Apply new Encoders to AggregatorCombiner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 29f7e93c954cc26425a052c0f1c19ec6e6c9fe66 Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:55:20 2019 +0200 Apply new Encoders to AggregatorCombiner --- .../translation/batch/AggregatorCombiner.java | 22 +- .../batch/CombinePerKeyTranslatorBatch.java| 20 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java index 0e3229e..d14569a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -52,13 +54,25 @@ class AggregatorCombiner private final Combine.CombineFn combineFn; private WindowingStrategy windowingStrategy; private TimestampCombiner timestampCombiner; + private IterableCoder> accumulatorCoder; + private IterableCoder> outputCoder; public AggregatorCombiner( Combine.CombineFn combineFn, - WindowingStrategy windowingStrategy) { + WindowingStrategy windowingStrategy, + Coder accumulatorCoder, + Coder outputCoder) { this.combineFn = combineFn; this.windowingStrategy = (WindowingStrategy) windowingStrategy; this.timestampCombiner = windowingStrategy.getTimestampCombiner(); +this.accumulatorCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +accumulatorCoder, windowingStrategy.getWindowFn().windowCoder())); +this.outputCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +outputCoder, windowingStrategy.getWindowFn().windowCoder())); } @Override @@ -142,14 +156,12 @@ class AggregatorCombiner @Override public Encoder>> bufferEncoder() { -// TODO replace with accumulatorCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(accumulatorCoder); } @Override public Encoder>> outputEncoder() { -// TODO replace with outputCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(outputCoder); } private Set collectAccumulatorsWindows(Iterable> accumulators) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index 33b037a..be238b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; @@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); -Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); -Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); +KvCoder inputCoder = (KvCoder) input.getCoder(); +Coder keyCoder = inputCoder.getKeyCoder(); +KvCoder outputKVCoder = (KvCoder) output.getCoder(); +Coder outputCoder = outputKVCoder.getValueCoder(); KeyValueGroupedDataset>> groupedDataset = inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); +Coder accumulatorC
[beam] 23/37: Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c33fddadcc3f38474e0aeb440c0d3fac718ee5a6 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:42:00 2019 +0200 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index e7c5bb7..218dc0a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -149,7 +149,7 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (java.io.IOException e) { +} catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -162,7 +162,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + " = baos.toByteArray();}} catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -265,7 +265,7 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (java.io.IOException e) { + } catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -280,7 +280,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + ")); } catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 04/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 5fa6331e0356953870e6ed614b0ce5e5c801fab1 Author: Etienne Chauchot AuthorDate: Mon Aug 26 15:22:12 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part + Fix EncoderHelpers.fromBeamCoder() visibility --- .../translation/helpers/EncoderHelpers.java| 64 ++ 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index b072803..ab24e37 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -94,7 +96,7 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - private Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder, Class claz){ List serialiserList = new ArrayList<>(); serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); @@ -103,7 +105,8 @@ public class EncoderHelpers { SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(classTag, coder), classTag); +new DecodeUsingBeamCoder<>(claz, coder), +classTag); /* ExpressionEncoder[T]( @@ -150,8 +153,8 @@ public class EncoderHelpers { List instructions = new ArrayList<>(); instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + StringContext stringContext = new StringContext(parts); Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); @@ -160,7 +163,7 @@ public class EncoderHelpers { args.add(new VariableValue("javaType", String.class)); args.add(new SimpleExprValue("input.isNull", Boolean.class)); args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("$serialize", String.class)); + args.add(new VariableValue("serialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); @@ -229,24 +232,61 @@ public class EncoderHelpers { private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private ClassTag classTag; +private Class claz; private Coder beamCoder; +private Expression child; -private DecodeUsingBeamCoder(ClassTag classTag, Coder beamCoder) { - this.classTag = classTag; +private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { + this.claz = claz; this.beamCoder = beamCoder; + this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { - return new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); + return child; } @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { - return null; + // Code to deserialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + + String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; + String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);"; + + String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize;"; + + List instructions = new ArrayList<>(); + instructions.add(outside); + Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + + StringContext stringContext = new StringContext(parts); + Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); + List args = new ArrayList<>(); + args.add(new SimpleExprValue("input.value", ExprValue.class)); + args.add(new VariableValue("javaType", String.class)); + args.add(new VariableValue("beamCoder", Coder.class));
[beam] branch spark-runner_structured-streaming updated (46ed555 -> 620a27a)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 46ed555 Remove Encoders based on kryo now that we call Beam coders in the runner discard 25d0401 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard ebc53fd Remove unneeded cast discard ad29daf Use beam encoders also in the output of the source translation discard 507bbd8 Fix javadoc discard c980d4c Fix typo discard fb3aa34 Add missing windowedValue Encoder call in Pardo discard ee2c0e6 Apply spotless discard 31c91a9 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 868204f Apply new Encoders to GroupByKey discard 30c662a Create a Tuple2Coder to encode scale tuple2 discard d093ffe Apply spotless discard 6edcfa2 Apply new Encoders to AggregatorCombiner discard 5beb435 Apply new Encoders to Window assign translation discard ab7d24c Ignore long time failing test: SparkMetricsSinkTest discard 3ac3c71 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard bcbb697 Apply new Encoders to Read source discard aa25e85 Apply new Encoders to CombinePerKey discard f0f2078 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard 3a333fb Put Encoders expressions serializable discard cfdf4a4 Improve exceptions catching discard b879123 Apply spotless and checkstyle and add javadocs discard 0fe6f9b Add an assert of equality in the encoders test discard d8b8b42 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard ef69410 Fix equal and hashcode discard 4351304 Remove example code discard c4a4464 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 91e923c Cast coder instanciated by reflection discard 723c004 Add try catch around reflexion call in lazy init of beam coder discard 8bbf991 Fix beam coder lazy init using reflexion discard 959664f Fix getting the output value in code generation discard 668227b Fix ExpressionEncoder generated code: typos, try catch, fqcn discard cbd7c2b Fix warning in coder construction by reflexion discard 2c94eef Fix call to scala Fucntion1 in coder lazy init discard a758985 Lazy init coder because coder instance cannot be interpolated by catalyst discard b11e100 Fix code generation in Beam coder wrapper discard 2bf4cd9 Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard e96af88 Fix visibility of serializer and deserializer discard 23735e4 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder discard a5d49f5 Fix scala Product in Encoders to avoid StackEverflow discard 95fd272 type erasure: spark encoders require a Class, pass Object and cast to Class discard 84f2cbd9 Fix EncoderHelpers.fromBeamCoder() visibility discard d613d6b Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard 031754c Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard c350188 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard a524036 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 0cedc7a Add a TODO on perf improvement of Pardo translation new 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag new 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply new a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part new 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part new c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class new fff5092 Fix scala Product in Encoders to avoid StackEverflow new 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities new e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders new d5645ff Fix code generation in Beam coder wrapper new e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst new fdba22d Fix warning in coder construction by reflexion new 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn new d7c9a4a Fix getting the output value in code generation new 0cf2c87 Fix bea
[beam] 26/37: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 68d3d6798950888590fca915782d5288fe2d1e5a Author: Etienne Chauchot AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..ceb87cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..9f1e34d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 13/37: Fix getting the output value in code generation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d7c9a4a59768687ff051ab0f28462e6376648e43 Author: Etienne Chauchot AuthorDate: Wed Sep 4 16:50:17 2019 +0200 Fix getting the output value in code generation --- .../translation/helpers/EncoderHelpers.java| 37 +- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index dff308a..a452da0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType; import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; @@ -144,34 +142,42 @@ public class EncoderHelpers { /* CODE GENERATED +byte[] ${ev.value}; try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); -final byte[] output; if ({input.isNull}) -output = null; -else -output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +${ev.value} = null; +else{ +$beamCoder.encode(${input.value}, baos); +${ev.value} = baos.toByteArray(); +} } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); - parts.add(") output = null; else output ="); + parts.add("byte[] "); + parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add(") "); + parts.add(" = null; else{"); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(", baos); "); + parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); + + args.add(ev.value()); args.add(input.isNull()); + args.add(ev.value()); args.add(beamCoder); args.add(input.value()); + args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), - new VariableValue("output", Array.class)); + return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); } @@ -263,7 +269,7 @@ public class EncoderHelpers { /* CODE GENERATED: try { - final $javaType output = + final $javaType ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); @@ -274,7 +280,8 @@ public class EncoderHelpers { List parts = new ArrayList<>(); parts.add("try { final "); - parts.add(" output ="); + parts.add(" "); + parts.add(" ="); parts.add("?"); parts.add(":"); parts.add("("); @@ -286,6 +293,7 @@ public class EncoderHelpers { List args = new ArrayList<>(); args.add(javaType); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType); @@ -294,8 +302,7 @@ public class EncoderHelpers { Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); -
[beam] 17/37: Fix equal and hashcode
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit f48067b87be26773de91d076c4ad249f54890db0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:49:37 2019 +0200 Fix equal and hashcode --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 83243b3..91aaaf9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -195,11 +195,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder) && child.equals(that.child); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), beamCoder); + return Objects.hash(super.hashCode(), child, beamCoder); } } @@ -306,11 +306,11 @@ public class EncoderHelpers { return false; } DecodeUsingBeamCoder that = (DecodeUsingBeamCoder) o; - return classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); + return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), classTag, beamCoder); + return Objects.hash(super.hashCode(), child, classTag, beamCoder); } } }
[beam] 20/37: Apply spotless and checkstyle and add javadocs
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c6f2ac9b21f7cfb9e1e81675cdf7f511b794559d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:35:34 2019 +0200 Apply spotless and checkstyle and add javadocs --- .../translation/helpers/EncoderHelpers.java| 137 + .../structuredstreaming/utils/EncodersTest.java| 32 +++-- 2 files changed, 113 insertions(+), 56 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c9ab435..f990121 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -89,21 +89,31 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ - /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ + /** + * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code + * generation). + */ + public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); +serialiserList.add( +new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), +new DecodeUsingBeamCoder<>( +new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); } + /** + * Catalyst Expression that serializes elements using Beam {@link Coder}. + * + * @param : Type of elements ot be serialized. + */ public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; @@ -114,13 +124,16 @@ public class EncoderHelpers { this.beamCoder = beamCoder; } -@Override public Expression child() { +@Override +public Expression child() { return child; } -@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { +@Override +public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); + String accessCode = + ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -140,14 +153,17 @@ public class EncoderHelpers { */ List parts = new ArrayList<>(); parts.add("byte[] "); - parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add( + ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); parts.add(") "); parts.add(" = null; else{"); parts.add(".encode("); parts.add(", baos); "); - parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add( + " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + StringContext sc = + new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); @@ -157,18 +173,19 @@ public class EncoderHelpers { args.add(accessCode); args.add(input.value()); args.add(ev.value()); - Block code = (new Block.BlockHelper(sc)) - .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + Block code = + (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); + return ev.copy(input.code().$plus(code), input.isNull(), ev.value()); } - -@Override public DataType dataType() { +
[beam] 06/37: Fix scala Product in Encoders to avoid StackEverflow
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fff509246b4ed9810c137ba2c9bd7811e3d95079 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:58:32 2019 +0200 Fix scala Product in Encoders to avoid StackEverflow --- .../translation/helpers/EncoderHelpers.java| 18 -- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 9cb8f29..8a4f1de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -175,16 +175,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) { @@ -291,16 +286,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) {
[beam] 22/37: Put Encoders expressions serializable
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c8bfcf367c6a4ac855fa2b9d549fa26c39b8be81 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:31:36 2019 +0200 Put Encoders expressions serializable --- .../structuredstreaming/translation/helpers/EncoderHelpers.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f4ea6fa..e7c5bb7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -114,7 +115,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private Coder beamCoder; @@ -229,7 +231,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class DecodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private ClassTag classTag;
[beam] 03/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit a5c7da32d46d74ab4b79ebb34dcad4842f225c62 Author: Etienne Chauchot AuthorDate: Mon Aug 26 14:32:17 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part --- .../translation/helpers/EncoderHelpers.java| 245 + 1 file changed, 245 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index d44fe27..b072803 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -17,11 +17,40 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; +import static org.apache.spark.sql.types.DataTypes.BinaryType; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.expressions.BoundReference; +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NonSQLExpression; +import org.apache.spark.sql.catalyst.expressions.UnaryExpression; +import org.apache.spark.sql.catalyst.expressions.codegen.Block; +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.ObjectType; +import scala.StringContext; import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { @@ -64,4 +93,220 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ + /** A way to construct encoders using generic serializers. */ + private Encoder fromBeamCoder(Coder coder, Class claz){ + +List serialiserList = new ArrayList<>(); +serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +ClassTag classTag = ClassTag$.MODULE$.apply(claz); +return new ExpressionEncoder<>( +SchemaHelpers.binarySchema(), +false, +JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), +new DecodeUsingBeamCoder<>(classTag, coder), classTag); + +/* +ExpressionEncoder[T]( +schema = new StructType().add("value", BinaryType), +flat = true, +serializer = Seq( +EncodeUsingSerializer( +BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), +deserializer = +DecodeUsingSerializer[T]( +Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), +classTag[T], +kryo = useKryo), +clsTag = classTag[T] +) +*/ + } + + private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + +private Class claz; +private Coder beamCoder; +private Expression child; + +private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { + this.claz = claz; + this.beamCoder = beamCoder; + this.child = new BoundReference(0, new ObjectType(claz), true); +} + +@Override public Expression child() { + return child; +} + +@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { + // Code to serialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; + + String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();"; + + String outside = "final $javaType o
[beam] 01/37: Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58 Author: Etienne Chauchot AuthorDate: Thu Jul 18 10:58:35 2019 +0200 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag --- .../translation/batch/ParDoTranslatorBatch.java | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 46808b7..742c1b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -133,10 +133,14 @@ class ParDoTranslatorBatch inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); if (outputs.entrySet().size() > 1) { allOutputs.persist(); -} - -for (Map.Entry, PValue> output : outputs.entrySet()) { - pruneOutputFilteredByTag(context, allOutputs, output); + for (Map.Entry, PValue> output : outputs.entrySet()) { +pruneOutputFilteredByTag(context, allOutputs, output); + } +} else { + Dataset> outputDataset = allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, + EncoderHelpers.windowedValueEncoder()); + context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 11/37: Fix warning in coder construction by reflexion
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fdba22d33205db9b039e82204e6e95f9c0e69d50 Author: Etienne Chauchot AuthorDate: Wed Sep 4 14:55:32 2019 +0200 Fix warning in coder construction by reflexion --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 694bc24..1d89101 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -293,7 +293,7 @@ public class EncoderHelpers { @Override public Object nullSafeEval(Object input) { try { -Coder beamCoder = coderClass.newInstance(); +Coder beamCoder = coderClass.getDeclaredConstructor().newInstance(); return beamCoder.decode(new ByteArrayInputStream((byte[]) input)); } catch (Exception e) { throw new IllegalStateException("Error decoding bytes for coder: " + coderClass, e); @@ -373,13 +373,13 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.newInstance(); +v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); */ List parts = new ArrayList<>(); parts.add(""); parts.add(" = ("); parts.add(") "); -parts.add(".newInstance();"); +parts.add(".getDeclaredConstructor().newInstance();"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1);
[beam] 21/37: Wrap exceptions in UserCoderExceptions
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 72c267cc91f75a446a949825a216d4101bbca37d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:53:10 2019 +0200 Wrap exceptions in UserCoderExceptions --- .../translation/helpers/EncoderHelpers.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f990121..f4ea6fa 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -147,8 +147,8 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (Exception e) { - throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} catch (java.io.IOException e) { + throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ List parts = new ArrayList<>(); @@ -160,7 +160,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -262,8 +262,8 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (Exception e) { -throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } catch (java.io.IOException e) { +throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -277,7 +277,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 36/37: Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 824b3445e99a0fc084b612b790c7d458689a4fd4 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); -Class claz = (Class) Object.class; +Class claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz);
[beam] 05/37: type erasure: spark encoders require a Class, pass Object and cast to Class
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c9e3534029811aabc00d09471ec78f943ba34028 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:57:53 2019 +0200 type erasure: spark encoders require a Class, pass Object and cast to Class --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index ab24e37..9cb8f29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -96,9 +96,10 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ List serialiserList = new ArrayList<>(); +Class claz = (Class) Object.class; serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>(
[beam] 10/37: Lazy init coder because coder instance cannot be interpolated by catalyst
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e6b68a8f21aba2adcb7543eae806d71e08c0bff3 Author: Etienne Chauchot AuthorDate: Mon Sep 2 17:55:24 2019 +0200 Lazy init coder because coder instance cannot be interpolated by catalyst --- runners/spark/build.gradle | 1 + .../translation/helpers/EncoderHelpers.java| 63 +++--- .../structuredstreaming/utils/EncodersTest.java| 3 +- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 73a710b..a948ef1 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -77,6 +77,7 @@ dependencies { provided "com.esotericsoftware.kryo:kryo:2.21" runtimeOnly library.java.jackson_module_scala runtimeOnly "org.scala-lang:scala-library:2.11.8" + compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0" testCompile project(":sdks:java:io:kafka") testCompile project(path: ":sdks:java:core", configuration: "shadowTest") // SparkStateInternalsTest extends abstract StateInternalsTest diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index cc862cd..694bc24 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; -import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; @@ -94,17 +93,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ + public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), classTag); /* @@ -127,11 +126,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Coder beamCoder; +private Class> coderClass; -public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { +public EncodeUsingBeamCoder(Expression child, Class> coderClass) { this.child = child; - this.beamCoder = beamCoder; + this.coderClass = coderClass; } @Override public Expression child() { @@ -140,6 +139,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. + String beamCoder = lazyInitBeamCoder(ctx, coderClass); ExprCode input = child.genCode(ctx); /* @@ -170,6 +170,7 @@ public class EncoderHelpers { new VariableValue("output", Array.class)); } + @Override public DataType dataType() { return BinaryType; } @@ -179,7 +180,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return beamCoder; + return coderClass; default: throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); } @@ -201,11 +202,11 @@ public class EncoderHelpers { return false; } E
[beam] 27/37: Ignore long time failing test: SparkMetricsSinkTest
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c48d03213e5848aec8217d9b136ecc153d1d1d3c Author: Etienne Chauchot AuthorDate: Fri Sep 27 10:41:55 2019 +0200 Ignore long time failing test: SparkMetricsSinkTest --- .../aggregators/metrics/sink/SparkMetricsSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index dd23c05..9d56f0c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; @@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ +@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] 25/37: Apply new Encoders to Read source
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f Author: Etienne Chauchot AuthorDate: Fri Sep 6 17:49:10 2019 +0200 Apply new Encoders to Read source --- .../translation/batch/ReadSourceTranslatorBatch.java | 8 ++-- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 7 +-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 6ae6646..2dcf66f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + Dataset> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), -EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index 6ee0e07..ac74c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -43,13 +43,11 @@ public final class RowHelpers { * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}. */ public static MapFunction> extractWindowedValueFromRowMapFunction( - Coder coder) { + WindowedValue.WindowedValueCoder windowedValueCoder) { return (MapFunction>) value -> { // there is only one value put in each Row by the InputPartitionReader byte[] bytes = (byte[]) value.get(0); - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); return windowedValueCoder.decode(new ByteArrayInputStream(bytes)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index c3d07ff..9e03d96 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWind
[beam] 02/37: Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb Author: Alexey Romanenko AuthorDate: Fri Jul 19 15:48:32 2019 +0200 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply --- .../translation/TranslationContext.java | 15 +++ .../translation/batch/ParDoTranslatorBatch.java | 8 +--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index f1bafd33..75f3ddf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -78,6 +78,21 @@ public class TranslationContext { sparkConf.setJars(options.getFilesToStage().toArray(new String[0])); } +// By default, Spark defines 200 as a number of sql partitions. This seems too much for local +// mode, so try to align with value of "sparkMaster" option in this case. +// We should not overwrite this value (or any user-defined spark configuration value) if the +// user has already configured it. +String sparkMaster = options.getSparkMaster(); +if (sparkMaster != null +&& sparkMaster.startsWith("local[") +&& System.getProperty("spark.sql.shuffle.partitions") == null) { + int numPartitions = + Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1)); + if (numPartitions > 0) { +sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions)); + } +} + this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 742c1b0..255adc8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -137,9 +137,11 @@ class ParDoTranslatorBatch pruneOutputFilteredByTag(context, allOutputs, output); } } else { - Dataset> outputDataset = allOutputs.map( - (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + Dataset> outputDataset = + allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) + value -> value._2, + EncoderHelpers.windowedValueEncoder()); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 09/37: Fix code generation in Beam coder wrapper
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d5645ff60aa99608a9ee3b8a5be6c58f9ac3903b Author: Etienne Chauchot AuthorDate: Mon Sep 2 15:45:24 2019 +0200 Fix code generation in Beam coder wrapper --- .../translation/helpers/EncoderHelpers.java| 93 -- .../structuredstreaming/utils/EncodersTest.java| 4 +- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0765c78..cc862cd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -42,15 +42,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; -import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; +import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; -import scala.collection.Seq; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -143,29 +141,33 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. ExprCode input = child.genCode(ctx); - String javaType = CodeGenerator.javaType(dataType()); - String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; - - String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();"; - - String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize;"; - List instructions = new ArrayList<>(); - instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + /* +CODE GENERATED + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final bytes[] output; + if ({input.isNull}) + output = null; + else + output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + */ + List parts = new ArrayList<>(); + parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add(") output = null; else output ="); + parts.add(".encode("); + parts.add(", baos); baos.toByteArray();"); + + StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); - StringContext stringContext = new StringContext(parts); - Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); - args.add(new VariableValue("beamCoder", Coder.class)); - args.add(new SimpleExprValue("input.value", ExprValue.class)); - args.add(new VariableValue("javaType", String.class)); - args.add(new SimpleExprValue("input.isNull", Boolean.class)); - args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("serialize", String.class)); - Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); + args.add(input.isNull()); + args.add(beamCoder); + args.add(input.value()); + Block code = (new Block.BlockHelper(sc)) + .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + + return ev.copy(input.code().$plus(code), input.isNull(), + new VariableValue("output", Array.class)); } @Override public DataType dataType() { @@ -252,27 +254,38 @@ public class EncoderHelpers { ExprCode input = child.genCode(ctx); String javaType = CodeGenerator.javaType(dataType()); - String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; - String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);"; +/* + CODE GENERATED: + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(d
[beam] 14/37: Fix beam coder lazy init using reflexion: use .clas + try catch + cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 0cf2c8759a64c81c1d4f83f74a759ae3dafd1f83 Author: Etienne Chauchot AuthorDate: Thu Sep 5 10:07:32 2019 +0200 Fix beam coder lazy init using reflexion: use .clas + try catch + cast --- .../translation/helpers/EncoderHelpers.java | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a452da0..0751c4c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -388,18 +389,22 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); +try { +v1 = coderClass.class.getDeclaredConstructor().newInstance(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ -List parts = new ArrayList<>(); -parts.add(""); + List parts = new ArrayList<>(); +parts.add("try {"); parts.add(" = ("); -parts.add(") "); -parts.add(".getDeclaredConstructor().newInstance();"); + parts.add(") "); + parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1); -args.add(coderClass.getName()); -args.add(coderClass.getName()); + args.add(coderClass.getName()); + args.add(coderClass.getName()); return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); })); return beamCoderInstance;
[beam] 30/37: Create a Tuple2Coder to encode scala tuple2
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 21accab89a4333b32003121269ab31b436e0dd2c Author: Etienne Chauchot AuthorDate: Mon Sep 30 11:25:04 2019 +0200 Create a Tuple2Coder to encode scala tuple2 --- .../translation/helpers/Tuple2Coder.java | 62 ++ 1 file changed, 62 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java new file mode 100644 index 000..1743a01 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java @@ -0,0 +1,62 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import scala.Tuple2; + +/** + * Beam coder to encode/decode Tuple2 scala types. + * @param first field type parameter + * @param second field type parameter + */ +public class Tuple2Coder extends StructuredCoder> { + private final Coder firstFieldCoder; + private final Coder secondFieldCoder; + + public static Tuple2Coder of(Coder firstFieldCoder, Coder secondFieldCoder) { +return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder); + } + + private Tuple2Coder(Coder firstFieldCoder, Coder secondFieldCoder) { +this.firstFieldCoder = firstFieldCoder; +this.secondFieldCoder = secondFieldCoder; + } + + + @Override public void encode(Tuple2 value, OutputStream outStream) + throws IOException { +firstFieldCoder.encode(value._1(), outStream); +secondFieldCoder.encode(value._2(), outStream); + } + + @Override public Tuple2 decode(InputStream inStream) throws IOException { +T1 firstField = firstFieldCoder.decode(inStream); +T2 secondField = secondFieldCoder.decode(inStream); +return Tuple2.apply(firstField, secondField); + } + + @Override public List> getCoderArguments() { +return Arrays.asList(firstFieldCoder, secondFieldCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { +verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder); +verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder); + } + + /** Returns the coder for first field. */ + public Coder getFirstFieldCoder() { +return firstFieldCoder; + } + + /** Returns the coder for second field. */ + public Coder getSecondFieldCoder() { +return secondFieldCoder; + } +}
[beam] 16/37: Remove example code
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit ca01777b5bd593c7caa5a6be6136abe662b8a4e5 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:33:23 2019 +0200 Remove example code --- .../translation/helpers/EncoderHelpers.java| 69 -- 1 file changed, 69 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 3f7c102..83243b3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -102,22 +102,6 @@ public class EncoderHelpers { JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); - -/* -ExpressionEncoder[T]( -schema = new StructType().add("value", BinaryType), -flat = true, -serializer = Seq( -EncodeUsingSerializer( -BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), -deserializer = -DecodeUsingSerializer[T]( -Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), -classTag[T], -kryo = useKryo), -clsTag = classTag[T] -) -*/ } public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { @@ -219,30 +203,6 @@ public class EncoderHelpers { } } - /*case class EncodeUsingSerializer(child: Expression, kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -serializerInstance.serialize(input).array() -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableScodererializerIfNeeded(ctx) -// Code to serialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val serialize = s"$serializer.serialize(${input.value}, null).array()" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = BinaryType - }*/ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ private Expression child; @@ -353,33 +313,4 @@ public class EncoderHelpers { return Objects.hash(super.hashCode(), classTag, beamCoder); } } -/* -case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]]) -serializerInstance.deserialize(inputBytes) -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableSerializerIfNeeded(ctx) -// Code to deserialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val deserialize = -s"($javaType) $serializer.deserialize(java.nio.ByteBuffer.wrap(${input.value}), null)" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = ObjectType(tag.runtimeClass) - } -*/ - - }
[beam] 28/37: Apply new Encoders to Window assign translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 7f1060aa189a625400a1fbcfc2503d3e721ade8f Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:22:15 2019 +0200 Apply new Encoders to Window assign translation --- .../translation/batch/WindowAssignTranslatorBatch.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index fb37f97..576b914 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { context.putDataset(output, inputDataset); } else { + WindowFn windowFn = assignTransform.getWindowFn(); + WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder + .of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( - WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), - EncoderHelpers.windowedValueEncoder()); + WindowingHelpers.assignWindowsMapFunction(windowFn), + EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); context.putDataset(output, outputDataset); } }
[beam] 15/37: Remove lazy init of beam coder because there is no generic way on instanciating a beam coder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 50060a804d95ed1006db98d1fd2c4243ba1fc532 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:20:30 2019 +0200 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder --- .../translation/helpers/EncoderHelpers.java| 68 +++--- .../structuredstreaming/utils/EncodersTest.java| 2 +- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0751c4c..3f7c102 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; -import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; import java.util.ArrayList; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -92,17 +90,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ + public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); /* @@ -125,11 +123,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Class> coderClass; +private Coder beamCoder; -public EncodeUsingBeamCoder(Expression child, Class> coderClass) { +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { this.child = child; - this.coderClass = coderClass; + this.beamCoder = beamCoder; } @Override public Expression child() { @@ -138,7 +136,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String beamCoder = lazyInitBeamCoder(ctx, coderClass); + String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -172,7 +170,7 @@ public class EncoderHelpers { args.add(ev.value()); args.add(input.isNull()); args.add(ev.value()); - args.add(beamCoder); + args.add(accessCode); args.add(input.value()); args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) @@ -191,7 +189,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return coderClass; + return beamCoder; default: throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); } @@ -213,11 +211,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return coderClass.equals(that.coderClass); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), coderClass); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -249,12 +247,12 @@ public class EncoderHelpers { private Expression child; private ClassTag classTag; -private Class> coderClass; +private Coder be
[beam] 19/37: Add an assert of equality in the encoders test
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 78b2d2243f0732dd802d9e6f855607d2c2f06e59 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:28:05 2019 +0200 Add an assert of equality in the encoders test --- .../runners/spark/structuredstreaming/utils/EncodersTest.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java index b3a6273..c6b8631 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -1,10 +1,12 @@ package org.apache.beam.runners.spark.structuredstreaming.utils; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.junit.Test; import org.junit.runner.RunWith; @@ -24,7 +26,9 @@ public class EncodersTest { data.add(1); data.add(2); data.add(3); -sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); -//sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); +Dataset dataset = sparkSession +.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +List results = dataset.collectAsList(); +assertEquals(data, results); } }
[beam] 12/37: Fix ExpressionEncoder generated code: typos, try catch, fqcn
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 8b07ec8ad0a22732aa6096c24135d942c3928787 Author: Etienne Chauchot AuthorDate: Wed Sep 4 15:38:41 2019 +0200 Fix ExpressionEncoder generated code: typos, try catch, fqcn --- .../translation/helpers/EncoderHelpers.java| 38 +- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 1d89101..dff308a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -144,18 +144,22 @@ public class EncoderHelpers { /* CODE GENERATED - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final bytes[] output; - if ({input.isNull}) - output = null; - else - output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + try { +java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); +final byte[] output; +if ({input.isNull}) +output = null; +else +output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ List parts = new ArrayList<>(); - parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); parts.add(") output = null; else output ="); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();"); + parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -258,21 +262,25 @@ public class EncoderHelpers { /* CODE GENERATED: - final $javaType output = - ${input.isNull} ? - ${CodeGenerator.defaultValue(dataType)} : - ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value})); + try { + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(dataType)} : + ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); + } catch (IOException e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List parts = new ArrayList<>(); - parts.add("final "); + parts.add("try { final "); parts.add(" output ="); parts.add("?"); parts.add(":"); parts.add("("); parts.add(") "); - parts.add(".decode(new ByteArrayInputStream("); - parts.add("));"); + parts.add(".decode(new java.io.ByteArrayInputStream("); + parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 37/37: Remove Encoders based on kryo now that we call Beam coders in the runner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 620a27a06b61fce5b3f5f15a62e05ffe3153b2ab Author: Etienne Chauchot AuthorDate: Wed Oct 23 14:11:14 2019 +0200 Remove Encoders based on kryo now that we call Beam coders in the runner --- .../translation/helpers/EncoderHelpers.java| 41 +- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c07c9dd..704b6fe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -51,46 +51,7 @@ import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { - // 1. use actual class and not object to avoid Spark fallback to GenericRowWithSchema. - // 2. use raw class because only raw classes can be used with kryo. Cast to Class to allow - // the type inference mechanism to infer for ex Encoder> to get back the type - // checking - - /* - - Encoders for internal spark runner objects - */ - - /** - * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo - */ - @SuppressWarnings("unchecked") - public static Encoder windowedValueEncoder() { -return Encoders.kryo((Class) WindowedValue.class); - } - - /** Get a bytes {@link Encoder} for {@link KV}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static Encoder kvEncoder() { -return Encoders.kryo((Class) KV.class); - } - - /** Get a bytes {@link Encoder} for {@code T}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static Encoder genericEncoder() { -return Encoders.kryo((Class) Object.class); - } - - /* - */ - /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */ - /* - -public static Encoder> tuple2Encoder() { - return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); -} - */ - - /* +/* - Bridges from Beam Coders to Spark Encoders */
[beam] 24/37: Apply new Encoders to CombinePerKey
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 7d456b42c1bafef6eab281dc2ed2dd098f8bda6a Author: Etienne Chauchot AuthorDate: Fri Sep 6 13:24:18 2019 +0200 Apply new Encoders to CombinePerKey --- .../translation/batch/CombinePerKeyTranslatorBatch.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index e0e80dd..33b037a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -56,8 +58,11 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); +Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); +Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); + KeyValueGroupedDataset>> groupedDataset = -inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); +inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); Dataset>>> combinedDataset = groupedDataset.agg( @@ -66,6 +71,10 @@ class CombinePerKeyTranslatorBatch .toColumn()); // expand the list into separate elements and put the key back into the elements +Coder> kvCoder = KvCoder.of(keyCoder, outputTCoder); +WindowedValue.WindowedValueCoder> wvCoder = +WindowedValue.FullWindowedValueCoder.of( +kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); Dataset>> outputDataset = combinedDataset.flatMap( (FlatMapFunction< @@ -85,7 +94,7 @@ class CombinePerKeyTranslatorBatch } return result.iterator(); }, -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(wvCoder)); context.putDataset(output, outputDataset); } }
[beam] 33/37: Apply spotless, fix typo and javadoc
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 62a87b62a953221ccb465ce83dc2ab095d9d49a4 Author: Etienne Chauchot AuthorDate: Thu Oct 24 11:58:01 2019 +0200 Apply spotless, fix typo and javadoc --- .../batch/GroupByKeyTranslatorBatch.java | 8 ++-- .../batch/WindowAssignTranslatorBatch.java | 6 +-- .../translation/helpers/EncoderHelpers.java| 16 +++ .../translation/helpers/MultiOuputCoder.java | 51 +- .../translation/helpers/RowHelpers.java| 2 +- .../metrics/sink/SparkMetricsSinkTest.java | 2 +- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 2970aa7..3ebe477 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -62,8 +62,7 @@ class GroupByKeyTranslatorBatch // group by key only Coder keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset>> groupByKeyOnly = -input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( -keyCoder)); +input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable Coder valueCoder = kvCoder.getValueCoder(); @@ -92,8 +91,9 @@ class GroupByKeyTranslatorBatch EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); // group also by windows -WindowedValue.FullWindowedValueCoder>> outputCoder = WindowedValue.FullWindowedValueCoder -.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), +WindowedValue.FullWindowedValueCoder>> outputCoder = +WindowedValue.FullWindowedValueCoder.of( +KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), windowingStrategy.getWindowFn().windowCoder()); Dataset>>> output = materialized.flatMap( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 576b914..4ac8a3f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -46,12 +46,12 @@ class WindowAssignTranslatorBatch context.putDataset(output, inputDataset); } else { WindowFn windowFn = assignTransform.getWindowFn(); - WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder - .of(input.getCoder(), windowFn.windowCoder()); + WindowedValue.FullWindowedValueCoder windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( WindowingHelpers.assignWindowsMapFunction(windowFn), - EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDataset(output, outputDataset); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a4f0320..2f3bced 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; -import scala.Tuple2; import scala.collection.JavaConversions; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -81,14 +80,15 @@ public class EncoderHelpers { return Encoders.kryo((Class) Object.class); } -/* - */ -/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//* + /* + */ + /** Ge
[beam] 19/37: Add an assert of equality in the encoders test
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 78b2d2243f0732dd802d9e6f855607d2c2f06e59 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:28:05 2019 +0200 Add an assert of equality in the encoders test --- .../runners/spark/structuredstreaming/utils/EncodersTest.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java index b3a6273..c6b8631 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -1,10 +1,12 @@ package org.apache.beam.runners.spark.structuredstreaming.utils; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.junit.Test; import org.junit.runner.RunWith; @@ -24,7 +26,9 @@ public class EncodersTest { data.add(1); data.add(2); data.add(3); -sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); -//sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); +Dataset dataset = sparkSession +.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +List results = dataset.collectAsList(); +assertEquals(data, results); } }
[beam] 30/37: Create a Tuple2Coder to encode scala tuple2
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 21accab89a4333b32003121269ab31b436e0dd2c Author: Etienne Chauchot AuthorDate: Mon Sep 30 11:25:04 2019 +0200 Create a Tuple2Coder to encode scala tuple2 --- .../translation/helpers/Tuple2Coder.java | 62 ++ 1 file changed, 62 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java new file mode 100644 index 000..1743a01 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java @@ -0,0 +1,62 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import scala.Tuple2; + +/** + * Beam coder to encode/decode Tuple2 scala types. + * @param first field type parameter + * @param second field type parameter + */ +public class Tuple2Coder extends StructuredCoder> { + private final Coder firstFieldCoder; + private final Coder secondFieldCoder; + + public static Tuple2Coder of(Coder firstFieldCoder, Coder secondFieldCoder) { +return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder); + } + + private Tuple2Coder(Coder firstFieldCoder, Coder secondFieldCoder) { +this.firstFieldCoder = firstFieldCoder; +this.secondFieldCoder = secondFieldCoder; + } + + + @Override public void encode(Tuple2 value, OutputStream outStream) + throws IOException { +firstFieldCoder.encode(value._1(), outStream); +secondFieldCoder.encode(value._2(), outStream); + } + + @Override public Tuple2 decode(InputStream inStream) throws IOException { +T1 firstField = firstFieldCoder.decode(inStream); +T2 secondField = secondFieldCoder.decode(inStream); +return Tuple2.apply(firstField, secondField); + } + + @Override public List> getCoderArguments() { +return Arrays.asList(firstFieldCoder, secondFieldCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { +verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder); +verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder); + } + + /** Returns the coder for first field. */ + public Coder getFirstFieldCoder() { +return firstFieldCoder; + } + + /** Returns the coder for second field. */ + public Coder getSecondFieldCoder() { +return secondFieldCoder; + } +}
[beam] 02/37: Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb Author: Alexey Romanenko AuthorDate: Fri Jul 19 15:48:32 2019 +0200 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply --- .../translation/TranslationContext.java | 15 +++ .../translation/batch/ParDoTranslatorBatch.java | 8 +--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index f1bafd33..75f3ddf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -78,6 +78,21 @@ public class TranslationContext { sparkConf.setJars(options.getFilesToStage().toArray(new String[0])); } +// By default, Spark defines 200 as a number of sql partitions. This seems too much for local +// mode, so try to align with value of "sparkMaster" option in this case. +// We should not overwrite this value (or any user-defined spark configuration value) if the +// user has already configured it. +String sparkMaster = options.getSparkMaster(); +if (sparkMaster != null +&& sparkMaster.startsWith("local[") +&& System.getProperty("spark.sql.shuffle.partitions") == null) { + int numPartitions = + Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1)); + if (numPartitions > 0) { +sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions)); + } +} + this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 742c1b0..255adc8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -137,9 +137,11 @@ class ParDoTranslatorBatch pruneOutputFilteredByTag(context, allOutputs, output); } } else { - Dataset> outputDataset = allOutputs.map( - (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + Dataset> outputDataset = + allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) + value -> value._2, + EncoderHelpers.windowedValueEncoder()); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 11/37: Fix warning in coder construction by reflexion
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fdba22d33205db9b039e82204e6e95f9c0e69d50 Author: Etienne Chauchot AuthorDate: Wed Sep 4 14:55:32 2019 +0200 Fix warning in coder construction by reflexion --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 694bc24..1d89101 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -293,7 +293,7 @@ public class EncoderHelpers { @Override public Object nullSafeEval(Object input) { try { -Coder beamCoder = coderClass.newInstance(); +Coder beamCoder = coderClass.getDeclaredConstructor().newInstance(); return beamCoder.decode(new ByteArrayInputStream((byte[]) input)); } catch (Exception e) { throw new IllegalStateException("Error decoding bytes for coder: " + coderClass, e); @@ -373,13 +373,13 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.newInstance(); +v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); */ List parts = new ArrayList<>(); parts.add(""); parts.add(" = ("); parts.add(") "); -parts.add(".newInstance();"); +parts.add(".getDeclaredConstructor().newInstance();"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1);
[beam] 13/37: Fix getting the output value in code generation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d7c9a4a59768687ff051ab0f28462e6376648e43 Author: Etienne Chauchot AuthorDate: Wed Sep 4 16:50:17 2019 +0200 Fix getting the output value in code generation --- .../translation/helpers/EncoderHelpers.java| 37 +- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index dff308a..a452da0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType; import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; @@ -144,34 +142,42 @@ public class EncoderHelpers { /* CODE GENERATED +byte[] ${ev.value}; try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); -final byte[] output; if ({input.isNull}) -output = null; -else -output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +${ev.value} = null; +else{ +$beamCoder.encode(${input.value}, baos); +${ev.value} = baos.toByteArray(); +} } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); - parts.add(") output = null; else output ="); + parts.add("byte[] "); + parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add(") "); + parts.add(" = null; else{"); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(", baos); "); + parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); + + args.add(ev.value()); args.add(input.isNull()); + args.add(ev.value()); args.add(beamCoder); args.add(input.value()); + args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), - new VariableValue("output", Array.class)); + return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); } @@ -263,7 +269,7 @@ public class EncoderHelpers { /* CODE GENERATED: try { - final $javaType output = + final $javaType ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); @@ -274,7 +280,8 @@ public class EncoderHelpers { List parts = new ArrayList<>(); parts.add("try { final "); - parts.add(" output ="); + parts.add(" "); + parts.add(" ="); parts.add("?"); parts.add(":"); parts.add("("); @@ -286,6 +293,7 @@ public class EncoderHelpers { List args = new ArrayList<>(); args.add(javaType); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType); @@ -294,8 +302,7 @@ public class EncoderHelpers { Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); -
[beam] 23/37: Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c33fddadcc3f38474e0aeb440c0d3fac718ee5a6 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:42:00 2019 +0200 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index e7c5bb7..218dc0a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -149,7 +149,7 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (java.io.IOException e) { +} catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -162,7 +162,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + " = baos.toByteArray();}} catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -265,7 +265,7 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (java.io.IOException e) { + } catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -280,7 +280,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + ")); } catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 08/37: Add a simple spark native test to test Beam coders wrapping into Spark Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e4478ffc2a9fd35d76080ff8f33cc8d3340cba1c Author: Etienne Chauchot AuthorDate: Fri Aug 30 17:34:13 2019 +0200 Add a simple spark native test to test Beam coders wrapping into Spark Encoders --- .../structuredstreaming/utils/EncodersTest.java| 29 ++ 1 file changed, 29 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java new file mode 100644 index 000..490e3dc --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -0,0 +1,29 @@ +package org.apache.beam.runners.spark.structuredstreaming.utils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.SparkSession; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +/** + * Test of the wrapping of Beam Coders as Spark ExpressionEncoders. + */ +public class EncodersTest { + + @Test + public void beamCoderToSparkEncoderTest() { +SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest") +.master("local[4]").getOrCreate(); +List data = new ArrayList<>(); +data.add(1); +data.add(2); +data.add(3); +//sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); + } +}
[beam] 29/37: Apply new Encoders to AggregatorCombiner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 29f7e93c954cc26425a052c0f1c19ec6e6c9fe66 Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:55:20 2019 +0200 Apply new Encoders to AggregatorCombiner --- .../translation/batch/AggregatorCombiner.java | 22 +- .../batch/CombinePerKeyTranslatorBatch.java| 20 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java index 0e3229e..d14569a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -52,13 +54,25 @@ class AggregatorCombiner private final Combine.CombineFn combineFn; private WindowingStrategy windowingStrategy; private TimestampCombiner timestampCombiner; + private IterableCoder> accumulatorCoder; + private IterableCoder> outputCoder; public AggregatorCombiner( Combine.CombineFn combineFn, - WindowingStrategy windowingStrategy) { + WindowingStrategy windowingStrategy, + Coder accumulatorCoder, + Coder outputCoder) { this.combineFn = combineFn; this.windowingStrategy = (WindowingStrategy) windowingStrategy; this.timestampCombiner = windowingStrategy.getTimestampCombiner(); +this.accumulatorCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +accumulatorCoder, windowingStrategy.getWindowFn().windowCoder())); +this.outputCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +outputCoder, windowingStrategy.getWindowFn().windowCoder())); } @Override @@ -142,14 +156,12 @@ class AggregatorCombiner @Override public Encoder>> bufferEncoder() { -// TODO replace with accumulatorCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(accumulatorCoder); } @Override public Encoder>> outputEncoder() { -// TODO replace with outputCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(outputCoder); } private Set collectAccumulatorsWindows(Iterable> accumulators) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index 33b037a..be238b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; @@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); -Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); -Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); +KvCoder inputCoder = (KvCoder) input.getCoder(); +Coder keyCoder = inputCoder.getKeyCoder(); +KvCoder outputKVCoder = (KvCoder) output.getCoder(); +Coder outputCoder = outputKVCoder.getValueCoder(); KeyValueGroupedDataset>> groupedDataset = inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); +Coder accumulatorC
[beam] 07/37: Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 2aaf07a41155f35ab36bda4c3c02a7ffa7bd66db Author: Etienne Chauchot AuthorDate: Thu Aug 29 15:10:40 2019 +0200 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities --- .../translation/helpers/EncoderHelpers.java| 64 +- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 8a4f1de..0765c78 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -100,13 +100,13 @@ public class EncoderHelpers { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(claz, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), classTag); /* @@ -126,16 +126,14 @@ public class EncoderHelpers { */ } - private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { -private Class claz; -private Coder beamCoder; private Expression child; +private Coder beamCoder; -private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { - this.claz = claz; +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { + this.child = child; this.beamCoder = beamCoder; - this.child = new BoundReference(0, new ObjectType(claz), true); } @Override public Expression child() { @@ -175,11 +173,18 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - return null; + switch (n) { +case 0: + return child; +case 1: + return beamCoder; +default: + throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); + } } @Override public int productArity() { - return 0; + return 2; } @Override public boolean canEqual(Object that) { @@ -194,11 +199,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return claz.equals(that.claz) && beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), claz, beamCoder); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -226,16 +231,16 @@ public class EncoderHelpers { override def dataType: DataType = BinaryType }*/ - private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ + public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private Class claz; -private Coder beamCoder; private Expression child; +private ClassTag classTag; +private Coder beamCoder; -private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { - this.claz = claz; +public DecodeUsingBeamCoder(Expression child, ClassTag classTag, Coder beamCoder) { + this.child = child; + this.classTag = classTag; this.beamCoder = beamCoder; - this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { @@ -267,7 +272,7 @@ public class EncoderHelpers { args.add(new VariableValue("deserialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", claz)); + return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", classTag.runtimeClass())); } @@ -280,17 +285,24 @@ public class EncoderHelpers { } @Override public DataType dataType() { -// return new ObjectTy
[beam] 37/37: Remove Encoders based on kryo now that we call Beam coders in the runner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 620a27a06b61fce5b3f5f15a62e05ffe3153b2ab Author: Etienne Chauchot AuthorDate: Wed Oct 23 14:11:14 2019 +0200 Remove Encoders based on kryo now that we call Beam coders in the runner --- .../translation/helpers/EncoderHelpers.java| 41 +- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c07c9dd..704b6fe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -51,46 +51,7 @@ import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { - // 1. use actual class and not object to avoid Spark fallback to GenericRowWithSchema. - // 2. use raw class because only raw classes can be used with kryo. Cast to Class to allow - // the type inference mechanism to infer for ex Encoder> to get back the type - // checking - - /* - - Encoders for internal spark runner objects - */ - - /** - * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo - */ - @SuppressWarnings("unchecked") - public static Encoder windowedValueEncoder() { -return Encoders.kryo((Class) WindowedValue.class); - } - - /** Get a bytes {@link Encoder} for {@link KV}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static Encoder kvEncoder() { -return Encoders.kryo((Class) KV.class); - } - - /** Get a bytes {@link Encoder} for {@code T}. Bytes serialisation is issued by Kryo */ - @SuppressWarnings("unchecked") - public static Encoder genericEncoder() { -return Encoders.kryo((Class) Object.class); - } - - /* - */ - /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */ - /* - -public static Encoder> tuple2Encoder() { - return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); -} - */ - - /* +/* - Bridges from Beam Coders to Spark Encoders */
[beam] 06/37: Fix scala Product in Encoders to avoid StackEverflow
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fff509246b4ed9810c137ba2c9bd7811e3d95079 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:58:32 2019 +0200 Fix scala Product in Encoders to avoid StackEverflow --- .../translation/helpers/EncoderHelpers.java| 18 -- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 9cb8f29..8a4f1de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -175,16 +175,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) { @@ -291,16 +286,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) {
[beam] 26/37: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 68d3d6798950888590fca915782d5288fe2d1e5a Author: Etienne Chauchot AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..ceb87cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..9f1e34d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 34/37: Use beam encoders also in the output of the source translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 6a278395d77b3578da10a9621c85883a2d6f2ded Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:45:39 2019 +0200 Use beam encoders also in the output of the source translation --- .../translation/batch/ReadSourceTranslatorBatch.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index ceb87cf..6af7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9f1e34d..ea10272 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 04/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 5fa6331e0356953870e6ed614b0ce5e5c801fab1 Author: Etienne Chauchot AuthorDate: Mon Aug 26 15:22:12 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part + Fix EncoderHelpers.fromBeamCoder() visibility --- .../translation/helpers/EncoderHelpers.java| 64 ++ 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index b072803..ab24e37 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -94,7 +96,7 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - private Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder, Class claz){ List serialiserList = new ArrayList<>(); serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); @@ -103,7 +105,8 @@ public class EncoderHelpers { SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(classTag, coder), classTag); +new DecodeUsingBeamCoder<>(claz, coder), +classTag); /* ExpressionEncoder[T]( @@ -150,8 +153,8 @@ public class EncoderHelpers { List instructions = new ArrayList<>(); instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + StringContext stringContext = new StringContext(parts); Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); @@ -160,7 +163,7 @@ public class EncoderHelpers { args.add(new VariableValue("javaType", String.class)); args.add(new SimpleExprValue("input.isNull", Boolean.class)); args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("$serialize", String.class)); + args.add(new VariableValue("serialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); @@ -229,24 +232,61 @@ public class EncoderHelpers { private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private ClassTag classTag; +private Class claz; private Coder beamCoder; +private Expression child; -private DecodeUsingBeamCoder(ClassTag classTag, Coder beamCoder) { - this.classTag = classTag; +private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { + this.claz = claz; this.beamCoder = beamCoder; + this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { - return new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); + return child; } @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { - return null; + // Code to deserialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + + String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; + String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);"; + + String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize;"; + + List instructions = new ArrayList<>(); + instructions.add(outside); + Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + + StringContext stringContext = new StringContext(parts); + Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); + List args = new ArrayList<>(); + args.add(new SimpleExprValue("input.value", ExprValue.class)); + args.add(new VariableValue("javaType", String.class)); + args.add(new VariableValue("beamCoder", Coder.class));
[beam] 18/37: Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 34e8aa8c31a561684eea2e2496757f9f3cae35d0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:14:32 2019 +0200 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations --- .../translation/helpers/EncoderHelpers.java| 18 ++ 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 91aaaf9..c9ab435 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -227,32 +227,34 @@ public class EncoderHelpers { /* CODE GENERATED: + final $javaType ${ev.value} try { - final $javaType ${ev.value} = + ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try { final "); + parts.add("final "); parts.add(" "); - parts.add(" ="); - parts.add("?"); - parts.add(":"); - parts.add("("); + parts.add(";try { "); + parts.add(" = "); + parts.add("? "); + parts.add(": ("); parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); - parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(javaType); args.add(ev.value()); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType);
[beam] 28/37: Apply new Encoders to Window assign translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 7f1060aa189a625400a1fbcfc2503d3e721ade8f Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:22:15 2019 +0200 Apply new Encoders to Window assign translation --- .../translation/batch/WindowAssignTranslatorBatch.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index fb37f97..576b914 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { context.putDataset(output, inputDataset); } else { + WindowFn windowFn = assignTransform.getWindowFn(); + WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder + .of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( - WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), - EncoderHelpers.windowedValueEncoder()); + WindowingHelpers.assignWindowsMapFunction(windowFn), + EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); context.putDataset(output, outputDataset); } }
[beam] 22/37: Put Encoders expressions serializable
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c8bfcf367c6a4ac855fa2b9d549fa26c39b8be81 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:31:36 2019 +0200 Put Encoders expressions serializable --- .../structuredstreaming/translation/helpers/EncoderHelpers.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f4ea6fa..e7c5bb7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -114,7 +115,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private Coder beamCoder; @@ -229,7 +231,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class DecodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private ClassTag classTag;
[beam] 12/37: Fix ExpressionEncoder generated code: typos, try catch, fqcn
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 8b07ec8ad0a22732aa6096c24135d942c3928787 Author: Etienne Chauchot AuthorDate: Wed Sep 4 15:38:41 2019 +0200 Fix ExpressionEncoder generated code: typos, try catch, fqcn --- .../translation/helpers/EncoderHelpers.java| 38 +- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 1d89101..dff308a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -144,18 +144,22 @@ public class EncoderHelpers { /* CODE GENERATED - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final bytes[] output; - if ({input.isNull}) - output = null; - else - output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + try { +java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); +final byte[] output; +if ({input.isNull}) +output = null; +else +output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ List parts = new ArrayList<>(); - parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); parts.add(") output = null; else output ="); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();"); + parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -258,21 +262,25 @@ public class EncoderHelpers { /* CODE GENERATED: - final $javaType output = - ${input.isNull} ? - ${CodeGenerator.defaultValue(dataType)} : - ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value})); + try { + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(dataType)} : + ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); + } catch (IOException e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List parts = new ArrayList<>(); - parts.add("final "); + parts.add("try { final "); parts.add(" output ="); parts.add("?"); parts.add(":"); parts.add("("); parts.add(") "); - parts.add(".decode(new ByteArrayInputStream("); - parts.add("));"); + parts.add(".decode(new java.io.ByteArrayInputStream("); + parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 33/37: Apply spotless, fix typo and javadoc
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 62a87b62a953221ccb465ce83dc2ab095d9d49a4 Author: Etienne Chauchot AuthorDate: Thu Oct 24 11:58:01 2019 +0200 Apply spotless, fix typo and javadoc --- .../batch/GroupByKeyTranslatorBatch.java | 8 ++-- .../batch/WindowAssignTranslatorBatch.java | 6 +-- .../translation/helpers/EncoderHelpers.java| 16 +++ .../translation/helpers/MultiOuputCoder.java | 51 +- .../translation/helpers/RowHelpers.java| 2 +- .../metrics/sink/SparkMetricsSinkTest.java | 2 +- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 2970aa7..3ebe477 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -62,8 +62,7 @@ class GroupByKeyTranslatorBatch // group by key only Coder keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset>> groupByKeyOnly = -input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( -keyCoder)); +input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable Coder valueCoder = kvCoder.getValueCoder(); @@ -92,8 +91,9 @@ class GroupByKeyTranslatorBatch EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); // group also by windows -WindowedValue.FullWindowedValueCoder>> outputCoder = WindowedValue.FullWindowedValueCoder -.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), +WindowedValue.FullWindowedValueCoder>> outputCoder = +WindowedValue.FullWindowedValueCoder.of( +KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), windowingStrategy.getWindowFn().windowCoder()); Dataset>>> output = materialized.flatMap( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 576b914..4ac8a3f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -46,12 +46,12 @@ class WindowAssignTranslatorBatch context.putDataset(output, inputDataset); } else { WindowFn windowFn = assignTransform.getWindowFn(); - WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder - .of(input.getCoder(), windowFn.windowCoder()); + WindowedValue.FullWindowedValueCoder windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( WindowingHelpers.assignWindowsMapFunction(windowFn), - EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDataset(output, outputDataset); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a4f0320..2f3bced 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; -import scala.Tuple2; import scala.collection.JavaConversions; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -81,14 +80,15 @@ public class EncoderHelpers { return Encoders.kryo((Class) Object.class); } -/* - */ -/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//* + /* + */ + /** Ge
[beam] 25/37: Apply new Encoders to Read source
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f Author: Etienne Chauchot AuthorDate: Fri Sep 6 17:49:10 2019 +0200 Apply new Encoders to Read source --- .../translation/batch/ReadSourceTranslatorBatch.java | 8 ++-- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 7 +-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 6ae6646..2dcf66f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + Dataset> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), -EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index 6ee0e07..ac74c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -43,13 +43,11 @@ public final class RowHelpers { * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}. */ public static MapFunction> extractWindowedValueFromRowMapFunction( - Coder coder) { + WindowedValue.WindowedValueCoder windowedValueCoder) { return (MapFunction>) value -> { // there is only one value put in each Row by the InputPartitionReader byte[] bytes = (byte[]) value.get(0); - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); return windowedValueCoder.decode(new ByteArrayInputStream(bytes)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index c3d07ff..9e03d96 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWind
[beam] 03/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit a5c7da32d46d74ab4b79ebb34dcad4842f225c62 Author: Etienne Chauchot AuthorDate: Mon Aug 26 14:32:17 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part --- .../translation/helpers/EncoderHelpers.java| 245 + 1 file changed, 245 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index d44fe27..b072803 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -17,11 +17,40 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; +import static org.apache.spark.sql.types.DataTypes.BinaryType; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.expressions.BoundReference; +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NonSQLExpression; +import org.apache.spark.sql.catalyst.expressions.UnaryExpression; +import org.apache.spark.sql.catalyst.expressions.codegen.Block; +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.ObjectType; +import scala.StringContext; import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { @@ -64,4 +93,220 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ + /** A way to construct encoders using generic serializers. */ + private Encoder fromBeamCoder(Coder coder, Class claz){ + +List serialiserList = new ArrayList<>(); +serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +ClassTag classTag = ClassTag$.MODULE$.apply(claz); +return new ExpressionEncoder<>( +SchemaHelpers.binarySchema(), +false, +JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), +new DecodeUsingBeamCoder<>(classTag, coder), classTag); + +/* +ExpressionEncoder[T]( +schema = new StructType().add("value", BinaryType), +flat = true, +serializer = Seq( +EncodeUsingSerializer( +BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), +deserializer = +DecodeUsingSerializer[T]( +Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), +classTag[T], +kryo = useKryo), +clsTag = classTag[T] +) +*/ + } + + private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + +private Class claz; +private Coder beamCoder; +private Expression child; + +private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { + this.claz = claz; + this.beamCoder = beamCoder; + this.child = new BoundReference(0, new ObjectType(claz), true); +} + +@Override public Expression child() { + return child; +} + +@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { + // Code to serialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; + + String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();"; + + String outside = "final $javaType o
[beam] 27/37: Ignore long time failing test: SparkMetricsSinkTest
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c48d03213e5848aec8217d9b136ecc153d1d1d3c Author: Etienne Chauchot AuthorDate: Fri Sep 27 10:41:55 2019 +0200 Ignore long time failing test: SparkMetricsSinkTest --- .../aggregators/metrics/sink/SparkMetricsSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index dd23c05..9d56f0c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; @@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ +@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] 35/37: Remove unneeded cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 27ef6de3fa90db6d59027f9a6fa792fc5787f6e9 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:51:49 2019 +0200 Remove unneeded cast --- .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java index 1983eaa..2fa4b1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.MapFunction; public final class KVHelpers { /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */ - public static MapFunction>, K> extractKey() { -return (MapFunction>, K>) wv -> wv.getValue().getKey(); + public static MapFunction>, K> extractKey() { +return wv -> wv.getValue().getKey(); } }
[beam] 09/37: Fix code generation in Beam coder wrapper
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d5645ff60aa99608a9ee3b8a5be6c58f9ac3903b Author: Etienne Chauchot AuthorDate: Mon Sep 2 15:45:24 2019 +0200 Fix code generation in Beam coder wrapper --- .../translation/helpers/EncoderHelpers.java| 93 -- .../structuredstreaming/utils/EncodersTest.java| 4 +- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0765c78..cc862cd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -42,15 +42,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; -import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; +import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; -import scala.collection.Seq; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -143,29 +141,33 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. ExprCode input = child.genCode(ctx); - String javaType = CodeGenerator.javaType(dataType()); - String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; - - String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();"; - - String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize;"; - List instructions = new ArrayList<>(); - instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + /* +CODE GENERATED + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final bytes[] output; + if ({input.isNull}) + output = null; + else + output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + */ + List parts = new ArrayList<>(); + parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add(") output = null; else output ="); + parts.add(".encode("); + parts.add(", baos); baos.toByteArray();"); + + StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); - StringContext stringContext = new StringContext(parts); - Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); - args.add(new VariableValue("beamCoder", Coder.class)); - args.add(new SimpleExprValue("input.value", ExprValue.class)); - args.add(new VariableValue("javaType", String.class)); - args.add(new SimpleExprValue("input.isNull", Boolean.class)); - args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("serialize", String.class)); - Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); + args.add(input.isNull()); + args.add(beamCoder); + args.add(input.value()); + Block code = (new Block.BlockHelper(sc)) + .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + + return ev.copy(input.code().$plus(code), input.isNull(), + new VariableValue("output", Array.class)); } @Override public DataType dataType() { @@ -252,27 +254,38 @@ public class EncoderHelpers { ExprCode input = child.genCode(ctx); String javaType = CodeGenerator.javaType(dataType()); - String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; - String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);"; +/* + CODE GENERATED: + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(d
[beam] 15/37: Remove lazy init of beam coder because there is no generic way on instanciating a beam coder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 50060a804d95ed1006db98d1fd2c4243ba1fc532 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:20:30 2019 +0200 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder --- .../translation/helpers/EncoderHelpers.java| 68 +++--- .../structuredstreaming/utils/EncodersTest.java| 2 +- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0751c4c..3f7c102 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; -import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; import java.util.ArrayList; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -92,17 +90,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ + public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); /* @@ -125,11 +123,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Class> coderClass; +private Coder beamCoder; -public EncodeUsingBeamCoder(Expression child, Class> coderClass) { +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { this.child = child; - this.coderClass = coderClass; + this.beamCoder = beamCoder; } @Override public Expression child() { @@ -138,7 +136,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String beamCoder = lazyInitBeamCoder(ctx, coderClass); + String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -172,7 +170,7 @@ public class EncoderHelpers { args.add(ev.value()); args.add(input.isNull()); args.add(ev.value()); - args.add(beamCoder); + args.add(accessCode); args.add(input.value()); args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) @@ -191,7 +189,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return coderClass; + return beamCoder; default: throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); } @@ -213,11 +211,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return coderClass.equals(that.coderClass); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), coderClass); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -249,12 +247,12 @@ public class EncoderHelpers { private Expression child; private ClassTag classTag; -private Class> coderClass; +private Coder be
[beam] 32/37: Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c5e78a0f4552a094ba3914ef490629e136ac1beb Author: Etienne Chauchot AuthorDate: Tue Oct 1 17:52:32 2019 +0200 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner --- .../translation/batch/ParDoTranslatorBatch.java| 42 +-- .../translation/helpers/EncoderHelpers.java| 6 ++- .../translation/helpers/MultiOuputCoder.java | 49 + .../translation/helpers/Tuple2Coder.java | 62 -- 4 files changed, 81 insertions(+), 78 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 255adc8..f5a109e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -31,8 +31,10 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; @@ -84,12 +86,15 @@ class ParDoTranslatorBatch ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); // Init main variables -Dataset> inputDataSet = context.getDataset(context.getInput()); +PValue input = context.getInput(); +Dataset> inputDataSet = context.getDataset(input); Map, PValue> outputs = context.getOutputs(); TupleTag mainOutputTag = getTupleTag(context); List> outputTags = new ArrayList<>(outputs.keySet()); WindowingStrategy windowingStrategy = -((PCollection) context.getInput()).getWindowingStrategy(); +((PCollection) input).getWindowingStrategy(); +Coder inputCoder = ((PCollection) input).getCoder(); +Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows @@ -102,8 +107,6 @@ class ParDoTranslatorBatch SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context); Map, Coder> outputCoderMap = context.getOutputCoders(); -Coder inputCoder = ((PCollection) context.getInput()).getCoder(); - MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); List> additionalOutputTags = new ArrayList<>(); @@ -129,19 +132,25 @@ class ParDoTranslatorBatch broadcastStateData, doFnSchemaInformation); +MultiOuputCoder multipleOutputCoder = MultiOuputCoder +.of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder); Dataset, WindowedValue>> allOutputs = -inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); +inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder)); if (outputs.entrySet().size() > 1) { allOutputs.persist(); for (Map.Entry, PValue> output : outputs.entrySet()) { -pruneOutputFilteredByTag(context, allOutputs, output); +pruneOutputFilteredByTag(context, allOutputs, output, windowCoder); } } else { + Coder outputCoder = ((PCollection) outputs.get(mainOutputTag)).getCoder(); + Coder> windowedValueCoder = + (Coder>) + (Coder) WindowedValue.getFullCoder(outputCoder, windowCoder); Dataset> outputDataset = allOutputs.map( (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } } @@ -152,14 +161,14 @@ class ParDoTranslatorBatch JavaSparkConte
[beam] 31/37: Apply new Encoders to GroupByKey
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 039f58a6a07e567bb8c5636caecebc61dec9129e Author: Etienne Chauchot AuthorDate: Mon Sep 30 12:13:25 2019 +0200 Apply new Encoders to GroupByKey --- .../batch/GroupByKeyTranslatorBatch.java | 25 -- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 3e203a8..2970aa7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch Dataset>> input = context.getDataset(inputPCollection); +WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); +KvCoder kvCoder = (KvCoder) inputPCollection.getCoder(); + // group by key only +Coder keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset>> groupByKeyOnly = -input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); +input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( +keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable +Coder valueCoder = kvCoder.getValueCoder(); +WindowedValue.WindowedValueCoder wvCoder = +WindowedValue.FullWindowedValueCoder.of( +valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder()); +IterableCoder> iterableCoder = IterableCoder.of(wvCoder); Dataset>>> materialized = groupByKeyOnly.mapGroups( (MapGroupsFunction>, KV>>>) @@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch KV.of(key, Iterables.unmodifiableIterable(values)); return kv; }, -EncoderHelpers.kvEncoder()); +EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); -WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); -KvCoder coder = (KvCoder) inputPCollection.getCoder(); // group also by windows +WindowedValue.FullWindowedValueCoder>> outputCoder = WindowedValue.FullWindowedValueCoder +.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), +windowingStrategy.getWindowFn().windowCoder()); Dataset>>> output = materialized.flatMap( new GroupAlsoByWindowViaOutputBufferFn<>( windowingStrategy, new InMemoryStateInternalsFactory<>(), -SystemReduceFn.buffering(coder.getValueCoder()), +SystemReduceFn.buffering(valueCoder), context.getSerializableOptions()), -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(outputCoder)); context.putDataset(context.getOutput(), output); }
[beam] 36/37: Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 824b3445e99a0fc084b612b790c7d458689a4fd4 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); -Class claz = (Class) Object.class; +Class claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz);
[beam] branch spark-runner_structured-streaming updated (46ed555 -> 620a27a)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 46ed555 Remove Encoders based on kryo now that we call Beam coders in the runner discard 25d0401 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard ebc53fd Remove unneeded cast discard ad29daf Use beam encoders also in the output of the source translation discard 507bbd8 Fix javadoc discard c980d4c Fix typo discard fb3aa34 Add missing windowedValue Encoder call in Pardo discard ee2c0e6 Apply spotless discard 31c91a9 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 868204f Apply new Encoders to GroupByKey discard 30c662a Create a Tuple2Coder to encode scale tuple2 discard d093ffe Apply spotless discard 6edcfa2 Apply new Encoders to AggregatorCombiner discard 5beb435 Apply new Encoders to Window assign translation discard ab7d24c Ignore long time failing test: SparkMetricsSinkTest discard 3ac3c71 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard bcbb697 Apply new Encoders to Read source discard aa25e85 Apply new Encoders to CombinePerKey discard f0f2078 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard 3a333fb Put Encoders expressions serializable discard cfdf4a4 Improve exceptions catching discard b879123 Apply spotless and checkstyle and add javadocs discard 0fe6f9b Add an assert of equality in the encoders test discard d8b8b42 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard ef69410 Fix equal and hashcode discard 4351304 Remove example code discard c4a4464 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 91e923c Cast coder instanciated by reflection discard 723c004 Add try catch around reflexion call in lazy init of beam coder discard 8bbf991 Fix beam coder lazy init using reflexion discard 959664f Fix getting the output value in code generation discard 668227b Fix ExpressionEncoder generated code: typos, try catch, fqcn discard cbd7c2b Fix warning in coder construction by reflexion discard 2c94eef Fix call to scala Fucntion1 in coder lazy init discard a758985 Lazy init coder because coder instance cannot be interpolated by catalyst discard b11e100 Fix code generation in Beam coder wrapper discard 2bf4cd9 Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard e96af88 Fix visibility of serializer and deserializer discard 23735e4 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder discard a5d49f5 Fix scala Product in Encoders to avoid StackEverflow discard 95fd272 type erasure: spark encoders require a Class, pass Object and cast to Class discard 84f2cbd9 Fix EncoderHelpers.fromBeamCoder() visibility discard d613d6b Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard 031754c Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard c350188 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard a524036 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 0cedc7a Add a TODO on perf improvement of Pardo translation new 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag new 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply new a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part new 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part new c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class new fff5092 Fix scala Product in Encoders to avoid StackEverflow new 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities new e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders new d5645ff Fix code generation in Beam coder wrapper new e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst new fdba22d Fix warning in coder construction by reflexion new 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn new d7c9a4a Fix getting the output value in code generation new 0cf2c87 Fix bea
[beam] 14/37: Fix beam coder lazy init using reflexion: use .clas + try catch + cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 0cf2c8759a64c81c1d4f83f74a759ae3dafd1f83 Author: Etienne Chauchot AuthorDate: Thu Sep 5 10:07:32 2019 +0200 Fix beam coder lazy init using reflexion: use .clas + try catch + cast --- .../translation/helpers/EncoderHelpers.java | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a452da0..0751c4c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -388,18 +389,22 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); +try { +v1 = coderClass.class.getDeclaredConstructor().newInstance(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ -List parts = new ArrayList<>(); -parts.add(""); + List parts = new ArrayList<>(); +parts.add("try {"); parts.add(" = ("); -parts.add(") "); -parts.add(".getDeclaredConstructor().newInstance();"); + parts.add(") "); + parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1); -args.add(coderClass.getName()); -args.add(coderClass.getName()); + args.add(coderClass.getName()); + args.add(coderClass.getName()); return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); })); return beamCoderInstance;
[beam] 21/37: Wrap exceptions in UserCoderExceptions
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 72c267cc91f75a446a949825a216d4101bbca37d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:53:10 2019 +0200 Wrap exceptions in UserCoderExceptions --- .../translation/helpers/EncoderHelpers.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f990121..f4ea6fa 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -147,8 +147,8 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (Exception e) { - throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} catch (java.io.IOException e) { + throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ List parts = new ArrayList<>(); @@ -160,7 +160,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -262,8 +262,8 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (Exception e) { -throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } catch (java.io.IOException e) { +throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -277,7 +277,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 16/37: Remove example code
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit ca01777b5bd593c7caa5a6be6136abe662b8a4e5 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:33:23 2019 +0200 Remove example code --- .../translation/helpers/EncoderHelpers.java| 69 -- 1 file changed, 69 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 3f7c102..83243b3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -102,22 +102,6 @@ public class EncoderHelpers { JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); - -/* -ExpressionEncoder[T]( -schema = new StructType().add("value", BinaryType), -flat = true, -serializer = Seq( -EncodeUsingSerializer( -BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), -deserializer = -DecodeUsingSerializer[T]( -Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), -classTag[T], -kryo = useKryo), -clsTag = classTag[T] -) -*/ } public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { @@ -219,30 +203,6 @@ public class EncoderHelpers { } } - /*case class EncodeUsingSerializer(child: Expression, kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -serializerInstance.serialize(input).array() -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableScodererializerIfNeeded(ctx) -// Code to serialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val serialize = s"$serializer.serialize(${input.value}, null).array()" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = BinaryType - }*/ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ private Expression child; @@ -353,33 +313,4 @@ public class EncoderHelpers { return Objects.hash(super.hashCode(), classTag, beamCoder); } } -/* -case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]]) -serializerInstance.deserialize(inputBytes) -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableSerializerIfNeeded(ctx) -// Code to deserialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val deserialize = -s"($javaType) $serializer.deserialize(java.nio.ByteBuffer.wrap(${input.value}), null)" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = ObjectType(tag.runtimeClass) - } -*/ - - }
[beam] 01/37: Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58 Author: Etienne Chauchot AuthorDate: Thu Jul 18 10:58:35 2019 +0200 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag --- .../translation/batch/ParDoTranslatorBatch.java | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 46808b7..742c1b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -133,10 +133,14 @@ class ParDoTranslatorBatch inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); if (outputs.entrySet().size() > 1) { allOutputs.persist(); -} - -for (Map.Entry, PValue> output : outputs.entrySet()) { - pruneOutputFilteredByTag(context, allOutputs, output); + for (Map.Entry, PValue> output : outputs.entrySet()) { +pruneOutputFilteredByTag(context, allOutputs, output); + } +} else { + Dataset> outputDataset = allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, + EncoderHelpers.windowedValueEncoder()); + context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 05/37: type erasure: spark encoders require a Class, pass Object and cast to Class
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c9e3534029811aabc00d09471ec78f943ba34028 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:57:53 2019 +0200 type erasure: spark encoders require a Class, pass Object and cast to Class --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index ab24e37..9cb8f29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -96,9 +96,10 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ List serialiserList = new ArrayList<>(); +Class claz = (Class) Object.class; serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>(
[beam] 24/37: Apply new Encoders to CombinePerKey
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 7d456b42c1bafef6eab281dc2ed2dd098f8bda6a Author: Etienne Chauchot AuthorDate: Fri Sep 6 13:24:18 2019 +0200 Apply new Encoders to CombinePerKey --- .../translation/batch/CombinePerKeyTranslatorBatch.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index e0e80dd..33b037a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -56,8 +58,11 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); +Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); +Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); + KeyValueGroupedDataset>> groupedDataset = -inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); +inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); Dataset>>> combinedDataset = groupedDataset.agg( @@ -66,6 +71,10 @@ class CombinePerKeyTranslatorBatch .toColumn()); // expand the list into separate elements and put the key back into the elements +Coder> kvCoder = KvCoder.of(keyCoder, outputTCoder); +WindowedValue.WindowedValueCoder> wvCoder = +WindowedValue.FullWindowedValueCoder.of( +kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); Dataset>> outputDataset = combinedDataset.flatMap( (FlatMapFunction< @@ -85,7 +94,7 @@ class CombinePerKeyTranslatorBatch } return result.iterator(); }, -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(wvCoder)); context.putDataset(output, outputDataset); } }
[beam] 20/37: Apply spotless and checkstyle and add javadocs
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c6f2ac9b21f7cfb9e1e81675cdf7f511b794559d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:35:34 2019 +0200 Apply spotless and checkstyle and add javadocs --- .../translation/helpers/EncoderHelpers.java| 137 + .../structuredstreaming/utils/EncodersTest.java| 32 +++-- 2 files changed, 113 insertions(+), 56 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c9ab435..f990121 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -89,21 +89,31 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ - /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ + /** + * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code + * generation). + */ + public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); +serialiserList.add( +new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), +new DecodeUsingBeamCoder<>( +new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); } + /** + * Catalyst Expression that serializes elements using Beam {@link Coder}. + * + * @param : Type of elements ot be serialized. + */ public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; @@ -114,13 +124,16 @@ public class EncoderHelpers { this.beamCoder = beamCoder; } -@Override public Expression child() { +@Override +public Expression child() { return child; } -@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { +@Override +public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); + String accessCode = + ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -140,14 +153,17 @@ public class EncoderHelpers { */ List parts = new ArrayList<>(); parts.add("byte[] "); - parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add( + ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); parts.add(") "); parts.add(" = null; else{"); parts.add(".encode("); parts.add(", baos); "); - parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add( + " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + StringContext sc = + new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); @@ -157,18 +173,19 @@ public class EncoderHelpers { args.add(accessCode); args.add(input.value()); args.add(ev.value()); - Block code = (new Block.BlockHelper(sc)) - .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + Block code = + (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); + return ev.copy(input.code().$plus(code), input.isNull(), ev.value()); } - -@Override public DataType dataType() { +
[beam] 10/37: Lazy init coder because coder instance cannot be interpolated by catalyst
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e6b68a8f21aba2adcb7543eae806d71e08c0bff3 Author: Etienne Chauchot AuthorDate: Mon Sep 2 17:55:24 2019 +0200 Lazy init coder because coder instance cannot be interpolated by catalyst --- runners/spark/build.gradle | 1 + .../translation/helpers/EncoderHelpers.java| 63 +++--- .../structuredstreaming/utils/EncodersTest.java| 3 +- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 73a710b..a948ef1 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -77,6 +77,7 @@ dependencies { provided "com.esotericsoftware.kryo:kryo:2.21" runtimeOnly library.java.jackson_module_scala runtimeOnly "org.scala-lang:scala-library:2.11.8" + compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0" testCompile project(":sdks:java:io:kafka") testCompile project(path: ":sdks:java:core", configuration: "shadowTest") // SparkStateInternalsTest extends abstract StateInternalsTest diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index cc862cd..694bc24 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; -import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; @@ -94,17 +93,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ + public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), classTag); /* @@ -127,11 +126,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Coder beamCoder; +private Class> coderClass; -public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { +public EncodeUsingBeamCoder(Expression child, Class> coderClass) { this.child = child; - this.beamCoder = beamCoder; + this.coderClass = coderClass; } @Override public Expression child() { @@ -140,6 +139,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. + String beamCoder = lazyInitBeamCoder(ctx, coderClass); ExprCode input = child.genCode(ctx); /* @@ -170,6 +170,7 @@ public class EncoderHelpers { new VariableValue("output", Array.class)); } + @Override public DataType dataType() { return BinaryType; } @@ -179,7 +180,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return beamCoder; + return coderClass; default: throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); } @@ -201,11 +202,11 @@ public class EncoderHelpers { return false; } E
[beam] 17/37: Fix equal and hashcode
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit f48067b87be26773de91d076c4ad249f54890db0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:49:37 2019 +0200 Fix equal and hashcode --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 83243b3..91aaaf9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -195,11 +195,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder) && child.equals(that.child); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), beamCoder); + return Objects.hash(super.hashCode(), child, beamCoder); } } @@ -306,11 +306,11 @@ public class EncoderHelpers { return false; } DecodeUsingBeamCoder that = (DecodeUsingBeamCoder) o; - return classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); + return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), classTag, beamCoder); + return Objects.hash(super.hashCode(), child, classTag, beamCoder); } } }
[beam] 36/37: Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 824b3445e99a0fc084b612b790c7d458689a4fd4 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); -Class claz = (Class) Object.class; +Class claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz);