[beam] branch master updated (0256f38 -> 7627c82)

2019-09-27 Thread chamikara
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 0256f38  [BEAM-8314] Add aggregation logic to beam_fn_api metric 
counter updat… (#9679)
 add c5bbb51  Adds a pipeline option to Python SDK for controlling the 
number of threads per worker.
 add 7627c82  Merge pull request #9675: [BEAM-8318] Adds a pipeline option 
to Python SDK for controlling the number of threads per worker.

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/options/pipeline_options.py  | 10 ++
 .../apache_beam/runners/dataflow/internal/apiclient.py   |  3 +++
 .../apache_beam/runners/dataflow/internal/apiclient_test.py  | 12 
 3 files changed, 25 insertions(+)



[beam] branch master updated (19f8812 -> 0256f38)

2019-09-27 Thread goenka
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 19f8812  Merge pull request #9361: [BEAM-7990] Add ability to read 
parquet files into PCollection
 add 0256f38  [BEAM-8314] Add aggregation logic to beam_fn_api metric 
counter updat… (#9679)

No new revisions were added by this update.

Summary of changes:
 .../dataflow/worker/DataflowOperationContext.java  |  3 +-
 .../worker/MetricsToCounterUpdateConverter.java|  1 +
 .../dataflow/worker/StreamingDataflowWorker.java   | 43 +-
 .../CounterUpdateAggregator.java}  | 23 +++---
 .../worker/counters/CounterUpdateAggregators.java  | 75 +
 .../DistributionCounterUpdateAggregator.java   | 65 +++
 .../counters/MeanCounterUpdateAggregator.java  | 55 +
 .../counters/SumCounterUpdateAggregator.java   | 47 +++
 ...ntMonitoringInfoToCounterUpdateTransformer.java |  3 +-
 ...ecMonitoringInfoToCounterUpdateTransformer.java |  3 +-
 ...ntMonitoringInfoToCounterUpdateTransformer.java |  3 +-
 ...onMonitoringInfoToCounterUpdateTransformer.java |  3 +-
 ...erMonitoringInfoToCounterUpdateTransformer.java |  3 +-
 .../worker/BatchModeExecutionContextTest.java  | 11 +--
 .../dataflow/worker/IsmSideInputReaderTest.java|  3 +-
 .../worker/StreamingModeExecutionContextTest.java  |  5 +-
 .../worker/StreamingStepMetricsContainerTest.java  |  2 +-
 .../dataflow/worker/WorkItemStatusClientTest.java  |  9 +-
 .../counters/CounterUpdateAggregatorsTest.java | 96 ++
 .../DistributionCounterUpdateAggregatorTest.java   | 72 
 .../counters/MeanCounterUpdateAggregatorTest.java  | 66 +++
 .../counters/SumCounterUpdateAggregatorTest.java   | 62 ++
 22 files changed, 622 insertions(+), 31 deletions(-)
 copy 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/{fn/control/MonitoringInfoToCounterUpdateTransformer.java
 => counters/CounterUpdateAggregator.java} (56%)
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/CounterUpdateAggregators.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/DistributionCounterUpdateAggregator.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/MeanCounterUpdateAggregator.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/SumCounterUpdateAggregator.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/CounterUpdateAggregatorsTest.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/DistributionCounterUpdateAggregatorTest.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/MeanCounterUpdateAggregatorTest.java
 create mode 100644 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/counters/SumCounterUpdateAggregatorTest.java



[GitHub] [beam-wheels] aaltay merged pull request #14: Update user guide with helpful links

2019-09-27 Thread GitBox
aaltay merged pull request #14: Update user guide with helpful links
URL: https://github.com/apache/beam-wheels/pull/14
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam-wheels] branch master updated: Update user guide with helpful links (#14)

2019-09-27 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam-wheels.git


The following commit(s) were added to refs/heads/master by this push:
 new bafcd5d  Update user guide with helpful links (#14)
bafcd5d is described below

commit bafcd5defde22e90bdc26448f041ad7714654ccd
Author: Mark Liu 
AuthorDate: Fri Sep 27 18:08:17 2019 -0700

Update user guide with helpful links (#14)

Update user guide with helpful links (#14)
---
 README.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index af9c16c..4bdab68 100644
--- a/README.md
+++ b/README.md
@@ -38,6 +38,6 @@ There are 2 major parts in this repository.
 
 ## User Guide
 
-* Create and push a new branch(e.g. release-2.6.0) into the beam-wheels 
repository, which will trigger the travis build of that version.
+* Create and push a new branch(e.g. release-2.6.0) into the beam-wheels 
repository, which will trigger the travis build of that version. Found your 
build in https://travis-ci.org/apache/beam-wheels.
 
-* Confirm that build successful and wheels get staged in beam-wheels-staging 
gcs bucket.
+* Confirm that build successful and wheels get staged in `beam-wheels-staging` 
gcs bucket 
([link](https://console.cloud.google.com/storage/browser/beam-wheels-staging?project=apache-beam-testing)).



[beam] branch master updated (1572ce0 -> 19f8812)

2019-09-27 Thread apilloud
This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 1572ce0  [BEAM-7919] Add MongoDB IO integration test for py3.5 (#9639)
 add b2ed0da  Add ReadFromParquetBatched and ReadAllFromParquetBatched
 add 19f8812  Merge pull request #9361: [BEAM-7990] Add ability to read 
parquet files into PCollection

No new revisions were added by this update.

Summary of changes:
 sdks/python/apache_beam/io/parquetio.py  | 119 
 sdks/python/apache_beam/io/parquetio_test.py | 133 ---
 sdks/python/apache_beam/testing/util.py  |  12 ++-
 3 files changed, 208 insertions(+), 56 deletions(-)



svn commit: r36101 - /dev/beam/2.16.0/python/

2019-09-27 Thread markliu
Author: markliu
Date: Fri Sep 27 21:38:45 2019
New Revision: 36101

Log:
Sign and hash Python wheels for 2.16.0


Added:

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
   (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl   
(with props)
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_i686.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl  
 (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-manylinux1_x86_64.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl   
(with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_i686.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl 
  (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl.sha512

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
   (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl   
(with props)
dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_i686.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl  
 (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl.sha512

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
   (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl   
(with props)
dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_i686.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl  
 (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp36-cp36m-manylinux1_x86_64.whl.sha512

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
   (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl   
(with props)
dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_i686.whl.sha512
dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl  
 (with props)

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl.asc

dev/beam/2.16.0/python/apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl.sha512

Added: 
dev/beam/2.16.0/python/apache_beam-2.16.0-cp27-cp27m-macosx_10_6_intel.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl
==
Binary file - no diff avail

[GitHub] [beam-wheels] markflyhigh opened a new pull request #14: Update user guide with helpful links

2019-09-27 Thread GitBox
markflyhigh opened a new pull request #14: Update user guide with helpful links
URL: https://github.com/apache/beam-wheels/pull/14
 
 
   +R: @aaltay @apilloud 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam-wheels] branch master updated (875a998 -> 64dfc5c)

2019-09-27 Thread markliu
This is an automated email from the ASF dual-hosted git repository.

markliu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam-wheels.git.


from 875a998  Merge pull request #11: Update build process to not require 
personal travis repo
 add d175b38  Enable logging for gcs deploy
 new 64dfc5c  Merge pull request #13: Enable logging for gcs deploy

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml | 2 ++
 1 file changed, 2 insertions(+)



[GitHub] [beam-wheels] markflyhigh merged pull request #13: Enable logging for gcs deploy

2019-09-27 Thread GitBox
markflyhigh merged pull request #13: Enable logging for gcs deploy
URL: https://github.com/apache/beam-wheels/pull/13
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam-wheels] 01/01: Merge pull request #13: Enable logging for gcs deploy

2019-09-27 Thread markliu
This is an automated email from the ASF dual-hosted git repository.

markliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam-wheels.git

commit 64dfc5c1c195a0086a97fb436f29143d1ccd6520
Merge: 875a998 d175b38
Author: Mark Liu 
AuthorDate: Fri Sep 27 14:08:45 2019 -0700

Merge pull request #13: Enable logging for gcs deploy

 .travis.yml | 2 ++
 1 file changed, 2 insertions(+)



[GitHub] [beam-wheels] apilloud commented on issue #13: Enable logging for gcs deploy

2019-09-27 Thread GitBox
apilloud commented on issue #13: Enable logging for gcs deploy
URL: https://github.com/apache/beam-wheels/pull/13#issuecomment-536094533
 
 
   LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam-wheels] markflyhigh opened a new pull request #13: Enable logging for gcs deploy

2019-09-27 Thread GitBox
markflyhigh opened a new pull request #13: Enable logging for gcs deploy
URL: https://github.com/apache/beam-wheels/pull/13
 
 
   This will surface any error during GCS upload and fail the test if upload 
filed.
   
   Example build after this change: (build failed)
   https://travis-ci.org/apache/beam-wheels/builds/590549047
   
   Example build before this change: (build silently pass but gcs staging 
actually failed)
   https://travis-ci.org/apache/beam-wheels/builds/590136362
   
   +R: @aaltay 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch markflyhigh-patch-1 deleted (was 8ea4755)

2019-09-27 Thread markliu
This is an automated email from the ASF dual-hosted git repository.

markliu pushed a change to branch markflyhigh-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git.


 was 8ea4755  Update verify release branch in release-guide.md

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[GitHub] [beam-wheels] markflyhigh closed pull request #12: Test PR to stage wheels for 2.16.0

2019-09-27 Thread GitBox
markflyhigh closed pull request #12: Test PR to stage wheels for 2.16.0
URL: https://github.com/apache/beam-wheels/pull/12
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated (469618c -> 1572ce0)

2019-09-27 Thread tvalentyn
This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 469618c  Merge pull request #9649: [BEAM-8157] Increase parallelism 
for Python PVR tests
 add 1572ce0  [BEAM-7919] Add MongoDB IO integration test for py3.5 (#9639)

No new revisions were added by this update.

Summary of changes:
 .../job_PostCommit_Python_MongoDBIO_IT.groovy  |  1 +
 sdks/python/test-suites/direct/py35/build.gradle   | 31 ++
 2 files changed, 32 insertions(+)



[beam-wheels] branch release-2.16.0 updated (875a998 -> af493c9)

2019-09-27 Thread markliu
This is an automated email from the ASF dual-hosted git repository.

markliu pushed a change to branch release-2.16.0
in repository https://gitbox.apache.org/repos/asf/beam-wheels.git.


from 875a998  Merge pull request #11: Update build process to not require 
personal travis repo
 add af493c9  Use Travis master branch to should logging activity

No new revisions were added by this update.

Summary of changes:
 .travis.yml | 2 ++
 1 file changed, 2 insertions(+)



[GitHub] [beam-wheels] markflyhigh opened a new pull request #12: Test PR to stage wheels for 2.16.0

2019-09-27 Thread GitBox
markflyhigh opened a new pull request #12: Test PR to stage wheels for 2.16.0
URL: https://github.com/apache/beam-wheels/pull/12
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated (e78943c -> 469618c)

2019-09-27 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from e78943c  Merge pull request #9672: [BEAM-8317] Add (skipped) test for 
aggregating after a filter
 add 42cae0a  [BEAM-8157] Increase parallelism for Python PVR tests
 add 469618c  Merge pull request #9649: [BEAM-8157] Increase parallelism 
for Python PVR tests

No new revisions were added by this update.

Summary of changes:
 .../runners/portability/flink_runner_test.py   | 26 +++---
 1 file changed, 13 insertions(+), 13 deletions(-)



[beam] 02/03: Apply new Encoders to AggregatorCombiner

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6edcfa2dd2e00a43dd3961d87a783fdb195cdf37
Author: Etienne Chauchot 
AuthorDate: Fri Sep 27 11:55:20 2019 +0200

Apply new Encoders to AggregatorCombiner
---
 .../translation/batch/AggregatorCombiner.java  | 22 +-
 .../batch/CombinePerKeyTranslatorBatch.java| 20 
 2 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
index 0e3229e..d14569a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -52,13 +54,25 @@ class AggregatorCombiner
   private final Combine.CombineFn combineFn;
   private WindowingStrategy windowingStrategy;
   private TimestampCombiner timestampCombiner;
+  private IterableCoder> accumulatorCoder;
+  private IterableCoder> outputCoder;
 
   public AggregatorCombiner(
   Combine.CombineFn combineFn,
-  WindowingStrategy windowingStrategy) {
+  WindowingStrategy windowingStrategy,
+  Coder accumulatorCoder,
+  Coder outputCoder) {
 this.combineFn = combineFn;
 this.windowingStrategy = (WindowingStrategy) windowingStrategy;
 this.timestampCombiner = windowingStrategy.getTimestampCombiner();
+this.accumulatorCoder =
+IterableCoder.of(
+WindowedValue.FullWindowedValueCoder.of(
+accumulatorCoder, 
windowingStrategy.getWindowFn().windowCoder()));
+this.outputCoder =
+IterableCoder.of(
+WindowedValue.FullWindowedValueCoder.of(
+outputCoder, windowingStrategy.getWindowFn().windowCoder()));
   }
 
   @Override
@@ -142,14 +156,12 @@ class AggregatorCombiner
 
   @Override
   public Encoder>> bufferEncoder() {
-// TODO replace with accumulatorCoder if possible
-return EncoderHelpers.genericEncoder();
+return EncoderHelpers.fromBeamCoder(accumulatorCoder);
   }
 
   @Override
   public Encoder>> outputEncoder() {
-// TODO replace with outputCoder if possible
-return EncoderHelpers.genericEncoder();
+return EncoderHelpers.fromBeamCoder(outputCoder);
   }
 
   private Set collectAccumulatorsWindows(Iterable> 
accumulators) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 33b037a..be238b5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -23,6 +23,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
@@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch
 
 Dataset>> inputDataset = 
context.getDataset(input);
 
-Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0);
-Coder outputTCoder = (Coder) 
output.getCoder().getCoderArguments().get(1);
+KvCoder inputCoder = (KvCoder) input.getCoder();
+Coder keyCoder = inputCoder.getKeyCoder();
+KvCoder outputKVCoder = (KvCoder) 
output.getCoder();
+Coder outputCoder = outputKVCoder.getValueCoder();
 
 KeyValueGroupedDataset>> groupedDataset =
 inputDataset.groupByKey(KVHelpers.extractKey(), 
EncoderHelpers.fromBeamCoder(keyCoder));
 
+Coder accumulatorC

[beam] 01/03: Apply new Encoders to Window assign translation

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5beb435f46ea82ed0380e11e7751bdb6fbbbcee4
Author: Etienne Chauchot 
AuthorDate: Fri Sep 27 11:22:15 2019 +0200

Apply new Encoders to Window assign translation
---
 .../translation/batch/WindowAssignTranslatorBatch.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index fb37f97..576b914 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -23,6 +23,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch
 if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
   context.putDataset(output, inputDataset);
 } else {
+  WindowFn windowFn = assignTransform.getWindowFn();
+  WindowedValue.FullWindowedValueCoder windoweVdalueCoder = 
WindowedValue.FullWindowedValueCoder
+  .of(input.getCoder(), windowFn.windowCoder());
   Dataset> outputDataset =
   inputDataset.map(
-  
WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
-  EncoderHelpers.windowedValueEncoder());
+  WindowingHelpers.assignWindowsMapFunction(windowFn),
+  EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
   context.putDataset(output, outputDataset);
 }
   }



[beam] 03/03: Apply spotless

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d093ffedcd38f5a00cf2e9dd3aee65b430a15dbd
Author: Etienne Chauchot 
AuthorDate: Fri Sep 27 11:55:43 2019 +0200

Apply spotless
---
 .../translation/batch/WindowAssignTranslatorBatch.java| 4 ++--
 .../aggregators/metrics/sink/SparkMetricsSinkTest.java| 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 576b914..59cc32a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -46,8 +46,8 @@ class WindowAssignTranslatorBatch
   context.putDataset(output, inputDataset);
 } else {
   WindowFn windowFn = assignTransform.getWindowFn();
-  WindowedValue.FullWindowedValueCoder windoweVdalueCoder = 
WindowedValue.FullWindowedValueCoder
-  .of(input.getCoder(), windowFn.windowCoder());
+  WindowedValue.FullWindowedValueCoder windoweVdalueCoder =
+  WindowedValue.FullWindowedValueCoder.of(input.getCoder(), 
windowFn.windowCoder());
   Dataset> outputDataset =
   inputDataset.map(
   WindowingHelpers.assignWindowsMapFunction(windowFn),
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
index 9d56f0c..de405a4 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -41,7 +41,7 @@ import org.junit.rules.ExternalResource;
  * A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
  * streaming modes.
  */
-@Ignore ("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+@Ignore("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
 public class SparkMetricsSinkTest {
   @Rule public ExternalResource inMemoryMetricsSink = new 
InMemoryMetricsSinkRule();
   @Rule public final TestPipeline pipeline = TestPipeline.create();



[beam] branch spark-runner_structured-streaming updated (ab7d24c -> d093ffe)

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ab7d24c  Ignore long time failing test: SparkMetricsSinkTest
 new 5beb435  Apply new Encoders to Window assign translation
 new 6edcfa2  Apply new Encoders to AggregatorCombiner
 new d093ffe  Apply spotless

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/batch/AggregatorCombiner.java  | 22 +-
 .../batch/CombinePerKeyTranslatorBatch.java| 20 
 .../batch/WindowAssignTranslatorBatch.java |  8 ++--
 .../metrics/sink/SparkMetricsSinkTest.java |  2 +-
 4 files changed, 40 insertions(+), 12 deletions(-)



[beam] branch spark-runner_structured-streaming updated (c6cca7d -> ab7d24c)

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


 discard c6cca7d  Ignore long time failing test: SparkMetricsSinkTest
 discard 97e8a19  Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
 new 3ac3c71  Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
 new ab7d24c  Ignore long time failing test: SparkMetricsSinkTest

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c6cca7d)
\
 N -- N -- N   refs/heads/spark-runner_structured-streaming 
(ab7d24c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/batch/ReadSourceTranslatorBatch.java  | 2 ++
 .../translation/streaming/ReadSourceTranslatorStreaming.java  | 4 +++-
 2 files changed, 5 insertions(+), 1 deletion(-)



[beam] 02/02: Ignore long time failing test: SparkMetricsSinkTest

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ab7d24c6b4a8bcbb9e90a99fd7e96940efb83122
Author: Etienne Chauchot 
AuthorDate: Fri Sep 27 10:41:55 2019 +0200

Ignore long time failing test: SparkMetricsSinkTest
---
 .../aggregators/metrics/sink/SparkMetricsSinkTest.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
index dd23c05..9d56f0c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
@@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource;
  * A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
  * streaming modes.
  */
+@Ignore ("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
 public class SparkMetricsSinkTest {
   @Rule public ExternalResource inMemoryMetricsSink = new 
InMemoryMetricsSinkRule();
   @Rule public final TestPipeline pipeline = TestPipeline.create();



[beam] 01/02: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3ac3c717ad11248211fa0e2a0b077b1ea2602287
Author: Etienne Chauchot 
AuthorDate: Thu Sep 19 17:20:31 2019 +0200

Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
---
 .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++---
 .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++---
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2dcf66f..ceb87cf 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch
 .load();
 
 // extract windowedValue from Row
-WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+WindowedValue.FullWindowedValueCoder windowedValueCoder =
+WindowedValue.FullWindowedValueCoder.of(
+source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
 
 Dataset> dataset =
 rowDataset.map(
 
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-EncoderHelpers.fromBeamCoder(windowedValueCoder));
+// using kryo bytes serialization because the mapper already calls
+// windowedValueCoder.decode, no need to call it also in the Spark 
encoder
+EncoderHelpers.windowedValueEncoder());
 
 PCollection output = (PCollection) context.getOutput();
 context.putDataset(output, dataset);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9e03d96..9f1e34d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming
 .load();
 
 // extract windowedValue from Row
-WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+WindowedValue.FullWindowedValueCoder windowedValueCoder =
+WindowedValue.FullWindowedValueCoder.of(
+source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
 Dataset> dataset =
 rowDataset.map(
 
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-EncoderHelpers.fromBeamCoder(windowedValueCoder));
+// using kryo bytes serialization because the mapper already calls
+// windowedValueCoder.decode, no need to call it also in the Spark 
encoder
+EncoderHelpers.windowedValueEncoder());
 
 PCollection output = (PCollection) context.getOutput();
 context.putDataset(output, dataset);



[beam] 02/03: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 97e8a19ef1c2c8d868a580d8a97ae41acaec2978
Author: Etienne Chauchot 
AuthorDate: Thu Sep 19 17:20:31 2019 +0200

Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
---
 .../translation/batch/ReadSourceTranslatorBatch.java   | 7 ---
 .../translation/streaming/ReadSourceTranslatorStreaming.java   | 5 +++--
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2dcf66f..c9a69d4 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -70,13 +70,14 @@ class ReadSourceTranslatorBatch
 .load();
 
 // extract windowedValue from Row
-WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+WindowedValue.FullWindowedValueCoder windowedValueCoder =
+WindowedValue.FullWindowedValueCoder.of(
+source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
 
 Dataset> dataset =
 rowDataset.map(
 
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-EncoderHelpers.fromBeamCoder(windowedValueCoder));
+EncoderHelpers.windowedValueEncoder());
 
 PCollection output = (PCollection) context.getOutput();
 context.putDataset(output, dataset);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9e03d96..ea10272 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -71,8 +71,9 @@ class ReadSourceTranslatorStreaming
 .load();
 
 // extract windowedValue from Row
-WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+WindowedValue.FullWindowedValueCoder windowedValueCoder =
+WindowedValue.FullWindowedValueCoder.of(
+source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
 Dataset> dataset =
 rowDataset.map(
 
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),



[beam] 03/03: Ignore long time failing test: SparkMetricsSinkTest

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c6cca7dcad367e2479d0cb292892473d56a205da
Author: Etienne Chauchot 
AuthorDate: Fri Sep 27 10:41:55 2019 +0200

Ignore long time failing test: SparkMetricsSinkTest
---
 .../aggregators/metrics/sink/SparkMetricsSinkTest.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
index dd23c05..9d56f0c 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExternalResource;
@@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource;
  * A test that verifies Beam metrics are reported to Spark's metrics sink 
in both batch and
  * streaming modes.
  */
+@Ignore ("Has been failing since at least 
c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
 public class SparkMetricsSinkTest {
   @Rule public ExternalResource inMemoryMetricsSink = new 
InMemoryMetricsSinkRule();
   @Rule public final TestPipeline pipeline = TestPipeline.create();



[beam] branch spark-runner_structured-streaming updated (aa25e85 -> c6cca7d)

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


from aa25e85  Apply new Encoders to CombinePerKey
 new bcbb697  Apply new Encoders to Read source
 new 97e8a19  Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
 new c6cca7d  Ignore long time failing test: SparkMetricsSinkTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/batch/ReadSourceTranslatorBatch.java  | 7 ++-
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java  | 8 ++--
 .../aggregators/metrics/sink/SparkMetricsSinkTest.java| 2 ++
 4 files changed, 15 insertions(+), 6 deletions(-)



[beam] 01/03: Apply new Encoders to Read source

2019-09-27 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bcbb69785106dba375414291eae956276e74b0fe
Author: Etienne Chauchot 
AuthorDate: Fri Sep 6 17:49:10 2019 +0200

Apply new Encoders to Read source
---
 .../translation/batch/ReadSourceTranslatorBatch.java  | 8 ++--
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java  | 7 +--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 6ae6646..2dcf66f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch
 .load();
 
 // extract windowedValue from Row
+WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+
 Dataset> dataset =
 rowDataset.map(
-
RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-EncoderHelpers.windowedValueEncoder());
+
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
 PCollection output = (PCollection) context.getOutput();
 context.putDataset(output, dataset);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
index 6ee0e07..ac74c29 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
@@ -43,13 +43,11 @@ public final class RowHelpers {
* @return A {@link MapFunction} that accepts a {@link Row} and returns its 
{@link WindowedValue}.
*/
   public static  MapFunction> 
extractWindowedValueFromRowMapFunction(
-  Coder coder) {
+  WindowedValue.WindowedValueCoder windowedValueCoder) {
 return (MapFunction>)
 value -> {
   // there is only one value put in each Row by the 
InputPartitionReader
   byte[] bytes = (byte[]) value.get(0);
-  WindowedValue.FullWindowedValueCoder windowedValueCoder =
-  WindowedValue.FullWindowedValueCoder.of(coder, 
GlobalWindow.Coder.INSTANCE);
   return windowedValueCoder.decode(new ByteArrayInputStream(bytes));
 };
   }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c3d07ff..9e03d96 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming
 .load();
 
 // extract windowedValue from Row
+WindowedValue.FullWindowedValueCoder windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+.of(source.getOutputCoder(), GlobalWind