[beam] branch master updated (0256f38 -> 7627c82)
This is an automated email from the ASF dual-hosted git repository. chamikara pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0256f38 [BEAM-8314] Add aggregation logic to beam_fn_api metric counter updat… (#9679) add c5bbb51 Adds a pipeline option to Python SDK for controlling the number of threads per worker. add 7627c82 Merge pull request #9675: [BEAM-8318] Adds a pipeline option to Python SDK for controlling the number of threads per worker. No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/options/pipeline_options.py | 10 ++ .../apache_beam/runners/dataflow/internal/apiclient.py | 3 +++ .../apache_beam/runners/dataflow/internal/apiclient_test.py | 12 3 files changed, 25 insertions(+)
[beam] branch master updated (19f8812 -> 0256f38)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 19f8812 Merge pull request #9361: [BEAM-7990] Add ability to read parquet files into PCollection add 0256f38 [BEAM-8314] Add aggregation logic to beam_fn_api metric counter updat… (#9679) No new revisions were added by this update. Summary of changes: .../dataflow/worker/DataflowOperationContext.java | 3 +- .../worker/MetricsToCounterUpdateConverter.java| 1 + .../dataflow/worker/StreamingDataflowWorker.java | 43 +- .../CounterUpdateAggregator.java} | 23 +++--- .../worker/counters/CounterUpdateAggregators.java | 75 + .../DistributionCounterUpdateAggregator.java | 65 +++ .../counters/MeanCounterUpdateAggregator.java | 55 + .../counters/SumCounterUpdateAggregator.java | 47 +++ ...ntMonitoringInfoToCounterUpdateTransformer.java | 3 +- ...ecMonitoringInfoToCounterUpdateTransformer.java | 3 +- ...ntMonitoringInfoToCounterUpdateTransformer.java | 3 +- ...onMonitoringInfoToCounterUpdateTransformer.java | 3 +- ...erMonitoringInfoToCounterUpdateTransformer.java | 3 +- .../worker/BatchModeExecutionContextTest.java | 11 +-- .../dataflow/worker/IsmSideInputReaderTest.java| 3 +- .../worker/StreamingModeExecutionContextTest.java | 5 +- .../worker/StreamingStepMetricsContainerTest.java | 2 +- .../dataflow/worker/WorkItemStatusClientTest.java | 9 +- .../counters/CounterUpdateAggregatorsTest.java | 96 ++ .../DistributionCounterUpdateAggregatorTest.java | 72 .../counters/MeanCounterUpdateAggregatorTest.java | 66 +++ .../counters/SumCounterUpdateAggregatorTest.java | 62 ++ 22 files changed, 622 insertions(+), 31 deletions(-) copy runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/{fn/control/MonitoringInfoToCounterUpdateTransformer.java => counters/CounterUpdateAggregator.java} (56%) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterUpdateAggregators.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/DistributionCounterUpdateAggregator.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/MeanCounterUpdateAggregator.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/SumCounterUpdateAggregator.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterUpdateAggregatorsTest.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/DistributionCounterUpdateAggregatorTest.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/MeanCounterUpdateAggregatorTest.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/SumCounterUpdateAggregatorTest.java
[GitHub] [beam-wheels] aaltay merged pull request #14: Update user guide with helpful links
aaltay merged pull request #14: Update user guide with helpful links URL: https://github.com/apache/beam-wheels/pull/14 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam-wheels] branch master updated: Update user guide with helpful links (#14)
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam-wheels.git The following commit(s) were added to refs/heads/master by this push: new bafcd5d Update user guide with helpful links (#14) bafcd5d is described below commit bafcd5defde22e90bdc26448f041ad7714654ccd Author: Mark Liu AuthorDate: Fri Sep 27 18:08:17 2019 -0700 Update user guide with helpful links (#14) Update user guide with helpful links (#14) --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index af9c16c..4bdab68 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,6 @@ There are 2 major parts in this repository. ## User Guide -* Create and push a new branch(e.g. release-2.6.0) into the beam-wheels repository, which will trigger the travis build of that version. +* Create and push a new branch(e.g. release-2.6.0) into the beam-wheels repository, which will trigger the travis build of that version. Found your build in https://travis-ci.org/apache/beam-wheels. -* Confirm that build successful and wheels get staged in beam-wheels-staging gcs bucket. +* Confirm that build successful and wheels get staged in `beam-wheels-staging` gcs bucket ([link](https://console.cloud.google.com/storage/browser/beam-wheels-staging?project=apache-beam-testing)).
[beam] branch master updated (1572ce0 -> 19f8812)
This is an automated email from the ASF dual-hosted git repository. apilloud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 1572ce0 [BEAM-7919] Add MongoDB IO integration test for py3.5 (#9639) add b2ed0da Add ReadFromParquetBatched and ReadAllFromParquetBatched add 19f8812 Merge pull request #9361: [BEAM-7990] Add ability to read parquet files into PCollection No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/io/parquetio.py | 119 sdks/python/apache_beam/io/parquetio_test.py | 133 --- sdks/python/apache_beam/testing/util.py | 12 ++- 3 files changed, 208 insertions(+), 56 deletions(-)
svn commit: r36101 - /dev/beam/2.16.0/python/
Author: markliu Date: Fri Sep 27 21:38:45 2019 New Revision: 36101 Log: Sign and hash Python wheels for 2.16.0 Added: dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl.sha512 dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl (with props) dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl.asc dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl.sha512 Added: dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl == Binary file - no diff avail
[GitHub] [beam-wheels] markflyhigh opened a new pull request #14: Update user guide with helpful links
markflyhigh opened a new pull request #14: Update user guide with helpful links URL: https://github.com/apache/beam-wheels/pull/14 +R: @aaltay @apilloud This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam-wheels] branch master updated (875a998 -> 64dfc5c)
This is an automated email from the ASF dual-hosted git repository. markliu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam-wheels.git. from 875a998 Merge pull request #11: Update build process to not require personal travis repo add d175b38 Enable logging for gcs deploy new 64dfc5c Merge pull request #13: Enable logging for gcs deploy The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .travis.yml | 2 ++ 1 file changed, 2 insertions(+)
[GitHub] [beam-wheels] markflyhigh merged pull request #13: Enable logging for gcs deploy
markflyhigh merged pull request #13: Enable logging for gcs deploy URL: https://github.com/apache/beam-wheels/pull/13 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam-wheels] 01/01: Merge pull request #13: Enable logging for gcs deploy
This is an automated email from the ASF dual-hosted git repository. markliu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam-wheels.git commit 64dfc5c1c195a0086a97fb436f29143d1ccd6520 Merge: 875a998 d175b38 Author: Mark Liu AuthorDate: Fri Sep 27 14:08:45 2019 -0700 Merge pull request #13: Enable logging for gcs deploy .travis.yml | 2 ++ 1 file changed, 2 insertions(+)
[GitHub] [beam-wheels] apilloud commented on issue #13: Enable logging for gcs deploy
apilloud commented on issue #13: Enable logging for gcs deploy URL: https://github.com/apache/beam-wheels/pull/13#issuecomment-536094533 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam-wheels] markflyhigh opened a new pull request #13: Enable logging for gcs deploy
markflyhigh opened a new pull request #13: Enable logging for gcs deploy URL: https://github.com/apache/beam-wheels/pull/13 This will surface any error during GCS upload and fail the test if upload filed. Example build after this change: (build failed) https://travis-ci.org/apache/beam-wheels/builds/590549047 Example build before this change: (build silently pass but gcs staging actually failed) https://travis-ci.org/apache/beam-wheels/builds/590136362 +R: @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch markflyhigh-patch-1 deleted (was 8ea4755)
This is an automated email from the ASF dual-hosted git repository. markliu pushed a change to branch markflyhigh-patch-1 in repository https://gitbox.apache.org/repos/asf/beam.git. was 8ea4755 Update verify release branch in release-guide.md The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[GitHub] [beam-wheels] markflyhigh closed pull request #12: Test PR to stage wheels for 2.16.0
markflyhigh closed pull request #12: Test PR to stage wheels for 2.16.0 URL: https://github.com/apache/beam-wheels/pull/12 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated (469618c -> 1572ce0)
This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 469618c Merge pull request #9649: [BEAM-8157] Increase parallelism for Python PVR tests add 1572ce0 [BEAM-7919] Add MongoDB IO integration test for py3.5 (#9639) No new revisions were added by this update. Summary of changes: .../job_PostCommit_Python_MongoDBIO_IT.groovy | 1 + sdks/python/test-suites/direct/py35/build.gradle | 31 ++ 2 files changed, 32 insertions(+)
[beam-wheels] branch release-2.16.0 updated (875a998 -> af493c9)
This is an automated email from the ASF dual-hosted git repository. markliu pushed a change to branch release-2.16.0 in repository https://gitbox.apache.org/repos/asf/beam-wheels.git. from 875a998 Merge pull request #11: Update build process to not require personal travis repo add af493c9 Use Travis master branch to should logging activity No new revisions were added by this update. Summary of changes: .travis.yml | 2 ++ 1 file changed, 2 insertions(+)
[GitHub] [beam-wheels] markflyhigh opened a new pull request #12: Test PR to stage wheels for 2.16.0
markflyhigh opened a new pull request #12: Test PR to stage wheels for 2.16.0 URL: https://github.com/apache/beam-wheels/pull/12 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated (e78943c -> 469618c)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e78943c Merge pull request #9672: [BEAM-8317] Add (skipped) test for aggregating after a filter add 42cae0a [BEAM-8157] Increase parallelism for Python PVR tests add 469618c Merge pull request #9649: [BEAM-8157] Increase parallelism for Python PVR tests No new revisions were added by this update. Summary of changes: .../runners/portability/flink_runner_test.py | 26 +++--- 1 file changed, 13 insertions(+), 13 deletions(-)
[beam] 02/03: Apply new Encoders to AggregatorCombiner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 6edcfa2dd2e00a43dd3961d87a783fdb195cdf37 Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:55:20 2019 +0200 Apply new Encoders to AggregatorCombiner --- .../translation/batch/AggregatorCombiner.java | 22 +- .../batch/CombinePerKeyTranslatorBatch.java| 20 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java index 0e3229e..d14569a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -52,13 +54,25 @@ class AggregatorCombiner private final Combine.CombineFn combineFn; private WindowingStrategy windowingStrategy; private TimestampCombiner timestampCombiner; + private IterableCoder> accumulatorCoder; + private IterableCoder> outputCoder; public AggregatorCombiner( Combine.CombineFn combineFn, - WindowingStrategy windowingStrategy) { + WindowingStrategy windowingStrategy, + Coder accumulatorCoder, + Coder outputCoder) { this.combineFn = combineFn; this.windowingStrategy = (WindowingStrategy) windowingStrategy; this.timestampCombiner = windowingStrategy.getTimestampCombiner(); +this.accumulatorCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +accumulatorCoder, windowingStrategy.getWindowFn().windowCoder())); +this.outputCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +outputCoder, windowingStrategy.getWindowFn().windowCoder())); } @Override @@ -142,14 +156,12 @@ class AggregatorCombiner @Override public Encoder>> bufferEncoder() { -// TODO replace with accumulatorCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(accumulatorCoder); } @Override public Encoder>> outputEncoder() { -// TODO replace with outputCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(outputCoder); } private Set collectAccumulatorsWindows(Iterable> accumulators) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index 33b037a..be238b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; @@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); -Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); -Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); +KvCoder inputCoder = (KvCoder) input.getCoder(); +Coder keyCoder = inputCoder.getKeyCoder(); +KvCoder outputKVCoder = (KvCoder) output.getCoder(); +Coder outputCoder = outputKVCoder.getValueCoder(); KeyValueGroupedDataset>> groupedDataset = inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); +Coder accumulatorC
[beam] 01/03: Apply new Encoders to Window assign translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 5beb435f46ea82ed0380e11e7751bdb6fbbbcee4 Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:22:15 2019 +0200 Apply new Encoders to Window assign translation --- .../translation/batch/WindowAssignTranslatorBatch.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index fb37f97..576b914 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { context.putDataset(output, inputDataset); } else { + WindowFn windowFn = assignTransform.getWindowFn(); + WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder + .of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( - WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), - EncoderHelpers.windowedValueEncoder()); + WindowingHelpers.assignWindowsMapFunction(windowFn), + EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); context.putDataset(output, outputDataset); } }
[beam] 03/03: Apply spotless
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d093ffedcd38f5a00cf2e9dd3aee65b430a15dbd Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:55:43 2019 +0200 Apply spotless --- .../translation/batch/WindowAssignTranslatorBatch.java| 4 ++-- .../aggregators/metrics/sink/SparkMetricsSinkTest.java| 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 576b914..59cc32a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -46,8 +46,8 @@ class WindowAssignTranslatorBatch context.putDataset(output, inputDataset); } else { WindowFn windowFn = assignTransform.getWindowFn(); - WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder - .of(input.getCoder(), windowFn.windowCoder()); + WindowedValue.FullWindowedValueCoder windoweVdalueCoder = + WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( WindowingHelpers.assignWindowsMapFunction(windowFn), diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index 9d56f0c..de405a4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -41,7 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ -@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") +@Ignore("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] branch spark-runner_structured-streaming updated (ab7d24c -> d093ffe)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from ab7d24c Ignore long time failing test: SparkMetricsSinkTest new 5beb435 Apply new Encoders to Window assign translation new 6edcfa2 Apply new Encoders to AggregatorCombiner new d093ffe Apply spotless The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../translation/batch/AggregatorCombiner.java | 22 +- .../batch/CombinePerKeyTranslatorBatch.java| 20 .../batch/WindowAssignTranslatorBatch.java | 8 ++-- .../metrics/sink/SparkMetricsSinkTest.java | 2 +- 4 files changed, 40 insertions(+), 12 deletions(-)
[beam] branch spark-runner_structured-streaming updated (c6cca7d -> ab7d24c)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard c6cca7d Ignore long time failing test: SparkMetricsSinkTest discard 97e8a19 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder new 3ac3c71 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder new ab7d24c Ignore long time failing test: SparkMetricsSinkTest This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (c6cca7d) \ N -- N -- N refs/heads/spark-runner_structured-streaming (ab7d24c) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../translation/batch/ReadSourceTranslatorBatch.java | 2 ++ .../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-)
[beam] 02/02: Ignore long time failing test: SparkMetricsSinkTest
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit ab7d24c6b4a8bcbb9e90a99fd7e96940efb83122 Author: Etienne Chauchot AuthorDate: Fri Sep 27 10:41:55 2019 +0200 Ignore long time failing test: SparkMetricsSinkTest --- .../aggregators/metrics/sink/SparkMetricsSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index dd23c05..9d56f0c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; @@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ +@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] 01/02: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 3ac3c717ad11248211fa0e2a0b077b1ea2602287 Author: Etienne Chauchot AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..ceb87cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..9f1e34d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 02/03: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 97e8a19ef1c2c8d868a580d8a97ae41acaec2978 Author: Etienne Chauchot AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 7 --- .../translation/streaming/ReadSourceTranslatorStreaming.java | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..c9a69d4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,14 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..ea10272 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,8 +71,9 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
[beam] 03/03: Ignore long time failing test: SparkMetricsSinkTest
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c6cca7dcad367e2479d0cb292892473d56a205da Author: Etienne Chauchot AuthorDate: Fri Sep 27 10:41:55 2019 +0200 Ignore long time failing test: SparkMetricsSinkTest --- .../aggregators/metrics/sink/SparkMetricsSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index dd23c05..9d56f0c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; @@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ +@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] branch spark-runner_structured-streaming updated (aa25e85 -> c6cca7d)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from aa25e85 Apply new Encoders to CombinePerKey new bcbb697 Apply new Encoders to Read source new 97e8a19 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder new c6cca7d Ignore long time failing test: SparkMetricsSinkTest The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../translation/batch/ReadSourceTranslatorBatch.java | 7 ++- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 8 ++-- .../aggregators/metrics/sink/SparkMetricsSinkTest.java| 2 ++ 4 files changed, 15 insertions(+), 6 deletions(-)
[beam] 01/03: Apply new Encoders to Read source
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit bcbb69785106dba375414291eae956276e74b0fe Author: Etienne Chauchot AuthorDate: Fri Sep 6 17:49:10 2019 +0200 Apply new Encoders to Read source --- .../translation/batch/ReadSourceTranslatorBatch.java | 8 ++-- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 7 +-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 6ae6646..2dcf66f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + Dataset> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), -EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index 6ee0e07..ac74c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -43,13 +43,11 @@ public final class RowHelpers { * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}. */ public static MapFunction> extractWindowedValueFromRowMapFunction( - Coder coder) { + WindowedValue.WindowedValueCoder windowedValueCoder) { return (MapFunction>) value -> { // there is only one value put in each Row by the InputPartitionReader byte[] bytes = (byte[]) value.get(0); - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); return windowedValueCoder.decode(new ByteArrayInputStream(bytes)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index c3d07ff..9e03d96 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWind