[beam] branch master updated (85b9fb5 -> a4aee0b)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 85b9fb5 Merge pull request #5669: [BEAM-4325] Enforce ErrorProne analysis in the SQL project add 7a87a5f Removing some null checks, where we already know that the variable in question is non-null new a4aee0b Merge pull request #5575: Removing some null checks, where we already know that the variable in question is non-null The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/examples/subprocess/kernel/SubProcessKernel.java | 6 ++ .../org/apache/beam/runners/core/MergingActiveWindowSet.java | 10 -- .../sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java | 2 +- .../beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java | 4 +--- 4 files changed, 8 insertions(+), 14 deletions(-)
[beam] 01/01: Merge pull request #5575: Removing some null checks, where we already know that the variable in question is non-null
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit a4aee0bb4eae03ae33d7cf983736130a01afc016 Merge: 85b9fb5 7a87a5f Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Thu Jun 21 13:31:02 2018 +0200 Merge pull request #5575: Removing some null checks, where we already know that the variable in question is non-null .../beam/examples/subprocess/kernel/SubProcessKernel.java | 6 ++ .../org/apache/beam/runners/core/MergingActiveWindowSet.java | 10 -- .../sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java | 2 +- .../beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java | 4 +--- 4 files changed, 8 insertions(+), 14 deletions(-)
[beam] 01/01: Merge pull request #6200: [BEAM-5147] Expose document metadata in ElasticsearchIO read
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit fa8e9f737541c2a5ffa4f7d99b6548af225cc6e5 Merge: a7ebcf3 30e7f23 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Aug 15 13:46:36 2018 +0200 Merge pull request #6200: [BEAM-5147] Expose document metadata in ElasticsearchIO read .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 .../elasticsearch/ElasticsearchIOTestCommon.java | 42 ++ .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 22 ++-- 4 files changed, 74 insertions(+), 2 deletions(-)
[beam] branch master updated (a7ebcf3 -> fa8e9f7)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from a7ebcf3 Merge pull request #6226 from qinyeli/master add 30e7f23 [BEAM-5147] Java elasticsearch client support for reading entire documents with metadata new fa8e9f7 Merge pull request #6200: [BEAM-5147] Expose document metadata in ElasticsearchIO read The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 .../elasticsearch/ElasticsearchIOTestCommon.java | 42 ++ .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 22 ++-- 4 files changed, 74 insertions(+), 2 deletions(-)
[beam] 01/01: Merge pull request #6194: [BEAM-3654] Port FilterExamplesTest off DoFnTester
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 6255ce1de063bd90700eb1d0c50bd92698a19cd1 Merge: 90ca21e c288be9 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Aug 22 19:05:58 2018 +0200 Merge pull request #6194: [BEAM-3654] Port FilterExamplesTest off DoFnTester [BEAM-3654] Port FilterExamplesTest off DoFnTester .../beam/examples/cookbook/FilterExamplesTest.java | 45 +- 1 file changed, 26 insertions(+), 19 deletions(-)
[beam] branch master updated (90ca21e -> 6255ce1)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 90ca21e Merge pull request #6232: [BEAM-5196] Add MD5 consistency check on S3 uploads (writes) add c288be9 [BEAM-3654] Port FilterExamplesTest off DoFnTester new 6255ce1 Merge pull request #6194: [BEAM-3654] Port FilterExamplesTest off DoFnTester The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/examples/cookbook/FilterExamplesTest.java | 45 +- 1 file changed, 26 insertions(+), 19 deletions(-)
[beam] branch master updated (7f8e67d -> 03c84dd)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 7f8e67d [BEAM-4800] send PutArtifactResponse in BeamFileSystemArtifactStagingService add 97cdb12 [BEAM-4622] Makes required to call Beam SQL expressions validation add 7cf8f6c Check number of arguments at first new 03c84dd Merge pull request #5912: [BEAM-4622] Makes required to call Beam SQL expressions validation The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sql/impl/interpreter/BeamSqlFnExecutor.java| 73 ++ .../operator/map/BeamSqlMapExpression.java | 10 ++- .../reinterpret/BeamSqlReinterpretExpression.java | 14 - .../beam/sdk/extensions/sql/BeamSqlMapTest.java| 23 +++ 4 files changed, 91 insertions(+), 29 deletions(-)
[beam] 01/01: Merge pull request #5912: [BEAM-4622] Makes required to call Beam SQL expressions validation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 03c84dd3f26b30ea6c7c00957654db8c45c78525 Merge: 7f8e67d 7cf8f6c Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Tue Jul 17 10:57:25 2018 +0200 Merge pull request #5912: [BEAM-4622] Makes required to call Beam SQL expressions validation .../sql/impl/interpreter/BeamSqlFnExecutor.java| 73 ++ .../operator/map/BeamSqlMapExpression.java | 10 ++- .../reinterpret/BeamSqlReinterpretExpression.java | 14 - .../beam/sdk/extensions/sql/BeamSqlMapTest.java| 23 +++ 4 files changed, 91 insertions(+), 29 deletions(-)
[beam] 01/01: Merge pull request #6569: [BEAM-4553] Implement graphite sink for MetricsPusher and refactor MetricsHttpSink test
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 1d36dff31cdbeefa456824cf68148c705160d19f Merge: e6d7dfe d5b32d4 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Tue Oct 16 18:26:06 2018 +0200 Merge pull request #6569: [BEAM-4553] Implement graphite sink for MetricsPusher and refactor MetricsHttpSink test .../extensions/metrics/MetricsGraphiteSink.java| 328 + .../extensions/metrics/MetricsHttpSink.java| 4 +- .../metrics/CustomMetricQueryResults.java | 140 + .../metrics/MetricsGraphiteSinkTest.java | 112 +++ .../extensions/metrics/MetricsHttpSinkTest.java| 210 + .../extensions/metrics/NetworkMockServer.java | 112 +++ .../apache/beam/sdk/options/PipelineOptions.java | 11 + 7 files changed, 782 insertions(+), 135 deletions(-)
[beam] branch master updated (e6d7dfe -> 1d36dff)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e6d7dfe Merge pull request #6705: Inform contributors to tag PRs with R: @reviewer add 0abb8bf [BEAM-4553] Implement graphite sink for MetricsPusher add 5505196 [BEAM-3310] Extract CustomQueryResults to be used in all metrics sinks tests. add 416b946 [BEAM-3310] Introduce NetworkMockServer to be used for all metrics sinks tests add 8de693e [BEAM-4553] Test Graphite metrics sink add 9a3bc3c [BEAM-3310] improve http sink test add ed19748 [BEAM-4553] support runners that do not support committed metrics add bd3b353 [BEAM-4553] Refactor MetricsGraphiteSink for code deduplication. add d5b32d4 [BEAM-3310] Replace Thread.sleep with a CountDownLatch in all metrics sinks tests to avoid flakiness of the tests. new 1d36dff Merge pull request #6569: [BEAM-4553] Implement graphite sink for MetricsPusher and refactor MetricsHttpSink test The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../extensions/metrics/MetricsGraphiteSink.java| 328 + .../extensions/metrics/MetricsHttpSink.java| 4 +- .../metrics/CustomMetricQueryResults.java | 140 + .../metrics/MetricsGraphiteSinkTest.java | 112 +++ .../extensions/metrics/MetricsHttpSinkTest.java| 210 + .../extensions/metrics/NetworkMockServer.java | 112 +++ .../apache/beam/sdk/options/PipelineOptions.java | 11 + 7 files changed, 782 insertions(+), 135 deletions(-) create mode 100644 runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java create mode 100644 runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java create mode 100644 runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSinkTest.java create mode 100644 runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/NetworkMockServer.java
[beam] 01/01: Merge pull request #6964: [BEAM-6010] Deprecate KafkaIO withTimestampFn().
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit ba5bc60c7da3693a076344d47ffa4629cd696768 Merge: 39e8214 38a07ec Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Nov 7 15:17:31 2018 +0100 Merge pull request #6964: [BEAM-6010] Deprecate KafkaIO withTimestampFn(). .../org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java | 9 - 1 file changed, 8 insertions(+), 1 deletion(-)
[beam] branch master updated (39e8214 -> ba5bc60)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 39e8214 Merge pull request #6962: [BEAM-5981] Par do load test add 38a07ec Deprecate KafkaIO withTimestampFn(). new ba5bc60 Merge pull request #6964: [BEAM-6010] Deprecate KafkaIO withTimestampFn(). The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java | 9 - 1 file changed, 8 insertions(+), 1 deletion(-)
[beam] branch master updated (e05dcef -> 372afba)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e05dcef Merge pull request #6551: [BEAM-5613] Snapshot of Python depedency and add it to nightly snapshot job add 3d8d0b8 [BEAM-3655] Port MaxPerKeyExamplesTest off DoFnTester add 372afba Merge pull request #6542: [BEAM-3655] Port MaxPerKeyExamplesTest off DoFnTester No new revisions were added by this update. Summary of changes: .../examples/cookbook/MaxPerKeyExamplesTest.java | 38 ++ 1 file changed, 24 insertions(+), 14 deletions(-)
[beam] branch master updated (b0be69f -> 4943306)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b0be69f Merge pull request #6481: [BEAM-5487] ByteKeyRangeTracker restrictions do not cover the entire interval because of incorrect next key add 3ab0073 [BEAM-3651] Port BigQueryTornadoesTest off DoFnTester new 4943306 Merge pull request #6525: [BEAM-3651] Port BigQueryTornadoesTest off DoFnTester The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../examples/cookbook/BigQueryTornadoesTest.java | 69 ++ 1 file changed, 44 insertions(+), 25 deletions(-)
[beam] 01/01: Merge pull request #6525: [BEAM-3651] Port BigQueryTornadoesTest off DoFnTester
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 4943306e9b27766c60d362fa80f241105291f44d Merge: b0be69f 3ab0073 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Mon Oct 1 18:22:20 2018 +0200 Merge pull request #6525: [BEAM-3651] Port BigQueryTornadoesTest off DoFnTester .../examples/cookbook/BigQueryTornadoesTest.java | 69 ++ 1 file changed, 44 insertions(+), 25 deletions(-)
[beam] branch master updated (853758c -> e79918c)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 853758c [BEAM-4496] Fix #2 for branch fetch failure on job_PostCommit_Website_Publish (#6531) add fcff44f [BEAM-3371] Allow running integration tests on Spark add 08d0ad6 [BEAM-3371] Move common code for pre-stage resources preparation to one place add d502e70 [BEAM-3371] Fix directories not being staged to classpath issue add dd0d74b [BEAM-3370] Make preparing resources to stage condition more explicit in Flink runner. Add unit tests add e79918c Merge pull request #6244: [BEAM-3371] Enable running integration tests on Spark No new revisions were added by this update. Summary of changes: .../org/apache/beam/gradle/BeamModulePlugin.groovy | 11 +++ .../core/construction/PipelineResources.java | 68 ++ .../core/construction/PipelineResourcesTest.java | 46 .../flink/FlinkPipelineExecutionEnvironment.java | 83 +- .../FlinkPipelineExecutionEnvironmentTest.java | 74 +++ .../org/apache/beam/runners/spark/SparkRunner.java | 16 + 6 files changed, 231 insertions(+), 67 deletions(-)
[beam] 01/01: Merge pull request #6486: [BEAM-5289] Upgrade assertj dependecny to latest (3.11.1)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 2e323dadc73fe399a8ccd79f682a31eb03df1dd5 Merge: e2ce403 29c9d9c Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Tue Sep 25 17:44:08 2018 +0200 Merge pull request #6486: [BEAM-5289] Upgrade assertj dependecny to latest (3.11.1) sdks/java/io/kinesis/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (e2ce403 -> 2e323da)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e2ce403 Merge pull request #6469: Add tests for port supplier methods in ServerFactory add 29c9d9c Upgrade assertj dependecny to latest (3.11.1) new 2e323da Merge pull request #6486: [BEAM-5289] Upgrade assertj dependecny to latest (3.11.1) The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/java/io/kinesis/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated: [BEAM-6334] Add throwing exception in case of invalid state or timeout
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d116214 [BEAM-6334] Add throwing exception in case of invalid state or timeout d116214 is described below commit d1162143f79a80b0e4b3aacd67fa227036b34322 Author: Łukasz Gajowy AuthorDate: Thu Jan 17 18:26:36 2019 +0100 [BEAM-6334] Add throwing exception in case of invalid state or timeout --- .../org/apache/beam/sdk/loadtests/JobFailure.java | 93 ++ .../org/apache/beam/sdk/loadtests/LoadTest.java| 15 ++-- .../apache/beam/sdk/loadtests/LoadTestOptions.java | 6 ++ 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java new file mode 100644 index 000..aa498ea --- /dev/null +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.loadtests; + +import static java.lang.String.format; +import static java.util.Optional.empty; +import static java.util.Optional.of; + +import java.io.IOException; +import java.util.Optional; +import org.apache.beam.sdk.PipelineResult; +import org.joda.time.Duration; + +/** Class for detecting failures after {@link PipelineResult#waitUntilFinish(Duration)} unblocks. */ +class JobFailure { + + private String cause; + + private boolean requiresCancelling; + + JobFailure(String cause, boolean requiresCancelling) { +this.cause = cause; +this.requiresCancelling = requiresCancelling; + } + + static void handleFailure(final PipelineResult pipelineResult, final LoadTestResult testResult) + throws IOException { +Optional failure = lookForFailure(pipelineResult, testResult); + +if (failure.isPresent()) { + JobFailure jobFailure = failure.get(); + + if (jobFailure.requiresCancelling) { +pipelineResult.cancel(); + } + + throw new RuntimeException(jobFailure.cause); +} + } + + private static Optional lookForFailure( + PipelineResult pipelineResult, LoadTestResult testResult) { +PipelineResult.State state = pipelineResult.getState(); + +Optional stateRelatedFailure = lookForInvalidState(state); + +if (stateRelatedFailure.isPresent()) { + return stateRelatedFailure; +} else { + return lookForMetricResultFailure(testResult); +} + } + + private static Optional lookForMetricResultFailure(LoadTestResult testResult) { +if (testResult.getRuntime() == -1 || testResult.getTotalBytesCount() == -1) { + return of(new JobFailure("Invalid test results", false)); +} else { + return empty(); +} + } + + private static Optional lookForInvalidState(PipelineResult.State state) { +switch (state) { + case RUNNING: + case UNKNOWN: +return of(new JobFailure("Job timeout.", true)); + + case CANCELLED: + case FAILED: + case STOPPED: + case UPDATED: +return of(new JobFailure(format("Invalid job state: %s.", state.toString()), false)); + + default: +return empty(); +} + } +} diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java index 61cb3a2..2f783d2 100644 --- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.loadtests; import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString; +import static org.apache.beam.sdk.loadtests.JobFailure.handleFailure; import java.io.IOException; import java.util.Optional; @@ -37,6 +38,7 @@ import org.apache.beam.sdk.values.PBegin; impo
[beam] branch spark-runner_structured-streaming updated: Address minor review notes
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 16ac4a0 Address minor review notes 16ac4a0 is described below commit 16ac4a026dc17dcdf6bb5fe2250af6bf06e1cfbc Author: Alexey Romanenko AuthorDate: Fri Jan 18 19:13:34 2019 +0100 Address minor review notes --- .../translation/TranslationContext.java | 4 .../translation/batch/DoFnFunction.java | 16 .../translation/batch/ParDoTranslatorBatch.java | 12 ++-- .../{SparkProcessContext.java => ProcessContext.java}| 12 ++-- .../{SparkNoOpStepContext.java => NoOpStepContext.java} | 2 +- 5 files changed, 25 insertions(+), 21 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 75b470e..ca69261 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -109,6 +109,10 @@ public class TranslationContext { return (Dataset>) dataset; } + /** + * TODO: All these 3 methods (putDataset*) are temporary and they are used only for generics type + * checking. We should unify them in the future. + */ public void putDatasetWildcard(PValue value, Dataset> dataset) { if (!datasets.containsKey(value)) { datasets.put(value, dataset); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java index 35204bc..62629ee 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -26,7 +26,7 @@ import com.google.common.collect.Multimap; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -45,6 +45,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +/** + * Encapsulates a {@link DoFn} inside a Spark {@link + * org.apache.spark.api.java.function.MapPartitionsFunction}. + * + * We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index and must tag + * all outputs with the output number. Afterwards a filter will filter out those elements that are + * not to be in a specific output. + */ public class DoFnFunction implements MapPartitionsFunction, Tuple2, WindowedValue>> { @@ -98,18 +106,18 @@ public class DoFnFunction outputManager, mainOutputTag, additionalOutputTags, -new SparkNoOpStepContext(), +new NoOpStepContext(), inputCoder, outputCoderMap, windowingStrategy); -return new SparkProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator()) +return new ProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator()) .processPartition(iter) .iterator(); } private class DoFnOutputManager - implements SparkProcessContext.SparkOutputManager, WindowedValue>> { + implements ProcessContext.SparkOutputManager, WindowedValue>> { private final Multimap, WindowedValue> outputs = LinkedListMultimap.create(); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 9cbde5a..f80db9a 100644 --- a/run
[beam] branch spark-runner_structured-streaming updated: Fail in case of having SideInouts or State/Timers
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new ff7a24f Fail in case of having SideInouts or State/Timers ff7a24f is described below commit ff7a24fadd7bbd5d53e935e138fe97a62328dc58 Author: Alexey Romanenko AuthorDate: Wed Jan 23 18:02:18 2019 +0100 Fail in case of having SideInouts or State/Timers --- .../translation/TranslationContext.java | 5 - .../translation/batch/ParDoTranslatorBatch.java | 16 ++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 2837125..bf7053d 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -136,11 +136,6 @@ public class TranslationContext { } @SuppressWarnings("unchecked") - public T getInput(PTransform transform) { -return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); - } - - @SuppressWarnings("unchecked") public Map, PValue> getInputs() { return currentTransform.getInputs(); } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index f80db9a..0b39b8b 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.*; @@ -47,7 +48,8 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkState; /** - * TODO: Add support of state and timers TODO: Add support of side inputs + * TODO: Add support of state and timers + * TODO: Add support of side inputs * * @param * @param @@ -59,12 +61,19 @@ class ParDoTranslatorBatch public void translateTransform( PTransform, PCollectionTuple> transform, TranslationContext context) { +// TODO: add support of Splittable DoFn DoFn doFn = getDoFn(context); checkState( !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(), "Not expected to directly translate splittable DoFn, should have been overridden: %s", doFn); +// TODO: add support of states and timers +DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); +boolean stateful = +signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0; +checkState(!stateful, "States and timers are not supported for the moment."); + Dataset> inputDataSet = context.getDataset(context.getInput()); Map, PValue> outputs = context.getOutputs(); TupleTag mainOutputTag = getTupleTag(context); @@ -109,6 +118,9 @@ class ParDoTranslatorBatch UnionCoder unionCoder = UnionCoder.of(outputCoders); List> sideInputs = getSideInputs(context); +final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; +// TODO: add support of SideInputs +checkState(!hasSideInputs, "SideInputs are not supported for the moment."); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows @@ -128,7 +140,7 @@ class ParDoTranslatorBatch context.getOptions(), outputTags, mainOutputTag, -context.getInput(transform).getCoder(), +((PCollection)context.getInput()).getCoder(), outputCoderMap); Dataset, WindowedValue>> allOutputsDataset =
[beam] branch master updated (ec55000 -> 289d2b2)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from ec55000 Merge pull request #7078 from [BEAM-6094] External portable BeamPython. add 9853bd7 [BEAM-6213] Fix matching of glob patterns in windows local filesystem new 289d2b2 Merge pull request #7258: [BEAM-6213] Fix matching of glob patterns in windows local filesystem The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-)
[beam] 01/01: Merge pull request #7258: [BEAM-6213] Fix matching of glob patterns in windows local filesystem
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 289d2b2d95748140479dfd8552fcfd10d740af9f Merge: ec55000 9853bd7 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Dec 12 15:30:32 2018 +0100 Merge pull request #7258: [BEAM-6213] Fix matching of glob patterns in windows local filesystem .../core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-)
[beam] branch master updated (ac8c956 -> e636294)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from ac8c956 Merge pull request #7287: [BEAM-6239] Add session side input join to Nexmark add 0ad4a5d [BEAM-6244] Restore updateProducerProperties add 9b0d8fb [BEAM-6244] Restore validate new e636294 Merge pull request #7343: [BEAM-6244] KafkaIO: keep KafkaIO.Write compatibility with 2.9.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 13 + .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 4 +++- 2 files changed, 16 insertions(+), 1 deletion(-)
[beam] 01/01: Merge pull request #7343: [BEAM-6244] KafkaIO: keep KafkaIO.Write compatibility with 2.9.0
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit e6362941530de92ffa1600641a42a000d793986a Merge: ac8c956 9b0d8fb Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Dec 26 11:37:44 2018 +0100 Merge pull request #7343: [BEAM-6244] KafkaIO: keep KafkaIO.Write compatibility with 2.9.0 .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 13 + .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 4 +++- 2 files changed, 16 insertions(+), 1 deletion(-)
[beam] branch master updated (a68f209 -> 67d0f78)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from a68f209 Merge pull request #4760: [BEAM-2873] Setting number of shards for writes with runner determined sharding add a6d4345 [BEAM-6200] Deprecate old HadoopInputFormatIO, move it into new HadoopFormatIO add 67d0f78 Merge pull request #7263: [BEAM-6200] Deprecate old HadoopInputFormatIO, move it into new HadoopFormatIO No new revisions were added by this update. Summary of changes: sdks/java/io/hadoop-format/build.gradle| 40 +- .../beam/sdk/io/hadoop/format/HadoopFormatIO.java | 822 ++- .../format}/ConfigurableEmployeeInputFormat.java | 4 +- .../beam/sdk/io/hadoop/format}/Employee.java | 4 +- .../sdk/io/hadoop/format}/EmployeeInputFormat.java | 6 +- .../sdk/io/hadoop/format/EmployeeOutputFormat.java | 1 - .../hadoop/format/HadoopFormatIOCassandraIT.java} | 30 +- .../format/HadoopFormatIOCassandraTest.java} | 12 +- .../io/hadoop/format/HadoopFormatIOElasticIT.java} | 24 +- .../hadoop/format/HadoopFormatIOElasticTest.java} | 12 +- .../sdk/io/hadoop/format/HadoopFormatIOIT.java | 37 +- .../io/hadoop/format/HadoopFormatIOReadTest.java} | 151 ++-- .../hadoop/format/HadoopFormatIOTestOptions.java} | 6 +- ...matIOTest.java => HadoopFormatIOWriteTest.java} | 6 +- .../format}/ReuseObjectsEmployeeInputFormat.java | 6 +- .../sdk/io/hadoop/format}/TestEmployeeDataSet.java | 2 +- .../sdk/io/hadoop/format}/TestRowDBWritable.java | 4 +- .../src/test/resources/cassandra.yaml | 0 sdks/java/io/hadoop-input-format/build.gradle | 1 + .../io/hadoop/inputformat/HadoopInputFormatIO.java | 871 +++-- .../io/hadoop/inputformat/EmployeeInputFormat.java | 2 +- .../inputformat/HadoopInputFormatIOTest.java | 462 --- 22 files changed, 1116 insertions(+), 1387 deletions(-) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/ConfigurableEmployeeInputFormat.java (96%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/Employee.java (95%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/EmployeeInputFormat.java (95%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOCassandraIT.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraIT.java} (87%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java} (95%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOElasticIT.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticIT.java} (91%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java} (96%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java} (85%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java} (92%) rename sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/{HadoopFormatIOTest.java => HadoopFormatIOWriteTest.java} (98%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/ReuseObjectsEmployeeInputFormat.java (96%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/TestEmployeeDataSet.java (98%) copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format}/TestRowDBWritable.java (94%) copy sdks/java/io/{hadoop-input-format => hadoop-format}/src/test/resources/cassandra.yaml (100%)
[beam] 01/01: Merge pull request #7126: [BEAM-3659] Port UserScoreTest off DoFnTester
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit bd81e8b14f27ebb75f8683b8f03fd748ccbdefdd Merge: b06b8e5 4ca0e89 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Fri Nov 30 18:20:09 2018 +0100 Merge pull request #7126: [BEAM-3659] Port UserScoreTest off DoFnTester .../beam/examples/complete/game/UserScore.java | 36 -- .../beam/examples/complete/game/UserScoreTest.java | 31 +-- 2 files changed, 55 insertions(+), 12 deletions(-)
[beam] branch master updated (b06b8e5 -> bd81e8b)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b06b8e5 Merge pull request #6954: [BEAM-6146] Add portable WordCount to Python PreCommit add 4ca0e89 [BEAM-3659] Port UserScoreTest off DoFnTester new bd81e8b Merge pull request #7126: [BEAM-3659] Port UserScoreTest off DoFnTester The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/examples/complete/game/UserScore.java | 36 -- .../beam/examples/complete/game/UserScoreTest.java | 31 +-- 2 files changed, 55 insertions(+), 12 deletions(-)
[beam] branch spark-runner_structured-streaming updated: Add primitive GroupByKeyTranslatorBatch implementation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 7b00f7c Add primitive GroupByKeyTranslatorBatch implementation 7b00f7c is described below commit 7b00f7c6a8ef37e42d741b6954a6e9b87ea8fea7 Author: Alexey Romanenko AuthorDate: Fri Dec 7 10:54:12 2018 +0100 Add primitive GroupByKeyTranslatorBatch implementation --- ...KeyTranslatorBatch.java => EncoderHelpers.java} | 22 -- .../translation/TranslationContext.java| 4 +- .../batch/GroupByKeyTranslatorBatch.java | 49 -- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java similarity index 56% copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java index 4ee77fb..4c56922 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java @@ -15,20 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +package org.apache.beam.runners.spark.structuredstreaming.translation; -import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; -class GroupByKeyTranslatorBatch -implements TransformTranslator< -PTransform>, PCollection>>>> { +/** {@link Encoders} utility class. */ +public class EncoderHelpers { - @Override - public void translateTransform( - PTransform>, PCollection>>> transform, - TranslationContext context) {} + @SuppressWarnings("unchecked") + public static Encoder encoder() { +return Encoders.kryo((Class) Object.class); + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 3c29867..e66bc90 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -46,9 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException; */ public class TranslationContext { - /** All the datasets of the DAG */ + /** All the datasets of the DAG. */ private final Map> datasets; - /** datasets that are not used as input to other datasets (leaves of the DAG) */ + /** datasets that are not used as input to other datasets (leaves of the DAG). */ private final Set> leaves; private final SparkPipelineOptions options; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 4ee77fb..7f2d7fa 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,18 +17,59 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.beam.runners.spark.struct
[beam] branch master updated (9448dba -> aec6d82)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 9448dba [BEAM-6182] Disable conscrypt by default (#7203) add 8f15b88 [BEAM-3912] Add HadoopOutputFormatIO support add 406f8d7 [BEAM-3912] Remove useless dep add 86f723e [BEAM-3912] Add HadoopOutputFormatIO support add 9863c79 [BEAM-3912] Remove useless dep add fa9cc4a Fix typo in test name add 757b71e [BEAM-3912] Implement HadoopFormatIO.Write add 20e3e24 [BEAM-5309] Add streaming support for HadoopFormatIO add 4adc254 [BEAM-5309] Add streaming support for HadoopFormatIO add aec6d82 Merge pull request #6691: [BEAM-5309] Add streaming support for HadoopFormatIO No new revisions were added by this update. Summary of changes: runners/spark/build.gradle |6 +- sdks/java/io/hadoop-format/build.gradle| 96 ++ .../io/hadoop/format/ExternalSynchronization.java | 62 + .../sdk/io/hadoop/format/HDFSSynchronization.java | 186 +++ .../beam/sdk/io/hadoop/format/HadoopFormatIO.java | 1181 .../beam/sdk/io/hadoop/format/HadoopFormats.java | 243 .../sdk/io/hadoop/format/IterableCombinerFn.java | 140 +++ .../beam/sdk/io/hadoop/format}/package-info.java |8 +- .../sdk/io/hadoop/format/EmployeeOutputFormat.java | 74 ++ .../io/hadoop/format/HDFSSynchronizationTest.java | 173 +++ .../sdk/io/hadoop/format/HadoopFormatIOIT.java}| 97 +- .../format/HadoopFormatIOSequenceFileTest.java | 372 ++ .../sdk/io/hadoop/format/HadoopFormatIOTest.java | 316 ++ .../sdk/io/hadoop/format/IterableCombinerTest.java | 98 ++ .../io/hadoop/inputformat/TestEmployeeDataSet.java |2 +- .../io/hadoop/inputformat/TestRowDBWritable.java |9 +- sdks/java/javadoc/build.gradle |1 + settings.gradle|6 + 18 files changed, 3023 insertions(+), 47 deletions(-) create mode 100644 sdks/java/io/hadoop-format/build.gradle create mode 100644 sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/ExternalSynchronization.java create mode 100644 sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java create mode 100644 sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java create mode 100644 sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java create mode 100644 sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerFn.java copy sdks/java/{extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf => io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format}/package-info.java (77%) create mode 100644 sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java create mode 100644 sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java copy sdks/java/io/{hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java => hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java} (59%) create mode 100644 sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java create mode 100644 sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java create mode 100644 sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerTest.java
[beam] branch spark-runner_structured-streaming updated: Use Iterators.transform() to return Iterable
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new b2d37bf Use Iterators.transform() to return Iterable b2d37bf is described below commit b2d37bf36d0e4360a40a59f497e9ca6db3741668 Author: Alexey Romanenko AuthorDate: Mon Dec 10 10:52:19 2018 +0100 Use Iterators.transform() to return Iterable --- .../translation/batch/GroupByKeyTranslatorBatch.java| 13 ++--- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 7f2d7fa..0ff0750 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,9 +17,7 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.List; +import com.google.common.collect.Iterators; import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -54,14 +52,7 @@ class GroupByKeyTranslatorBatch Dataset>> materialized = grouped.mapGroups( (MapGroupsFunction, KV>>) -(key, iterator) -> { - // TODO: can we use here just "Iterable iterable = () -> iterator;" ? - List values = Lists.newArrayList(); - while (iterator.hasNext()) { -values.add(iterator.next().getValue()); - } - return KV.of(key, Iterables.unmodifiableIterable(values)); -}, +(key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), EncoderHelpers.encoder()); Dataset>>> output =
[beam] branch master updated (e3c79e2 -> 7eba171)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e3c79e2 Merge pull request #7114: [BEAM-5817] Add Nexmark SqlBoundedSideInputJoin add d762c8f Switch to using java.nio.file.Files instead of Apache Commons IO FileUtils new 7eba171 Merge pull request #6944: [BEAM-6123] Switch to using java.nio.file.Files The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java| 5 ++--- .../java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java | 4 ++-- .../java/org/apache/beam/runners/apex/examples/WordCountTest.java | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-)
[beam] 01/01: Merge pull request #6944: [BEAM-6123] Switch to using java.nio.file.Files
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 7eba1718d936b6739e114867ac1fa178b62da841 Merge: e3c79e2 d762c8f Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Mon Nov 26 11:04:09 2018 +0100 Merge pull request #6944: [BEAM-6123] Switch to using java.nio.file.Files .../main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java| 5 ++--- .../java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java | 4 ++-- .../java/org/apache/beam/runners/apex/examples/WordCountTest.java | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-)
[beam] branch spark-runner_structured-streaming updated: Added SparkRunnerRegistrar
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 463178b Added SparkRunnerRegistrar 463178b is described below commit 463178b7331b1eed7955df48e8f0fe54bdd048c3 Author: Alexey Romanenko AuthorDate: Tue Nov 27 18:19:46 2018 +0100 Added SparkRunnerRegistrar --- .../structuredstreaming/SparkRunnerRegistrar.java | 54 ++ 1 file changed, 54 insertions(+) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunnerRegistrar.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunnerRegistrar.java new file mode 100644 index 000..e1f930b --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunnerRegistrar.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link + * SparkRunner}. + * + * {@link AutoService} will register Spark's implementations of the {@link PipelineRunner} and + * {@link PipelineOptions} as available pipeline runner services. + */ +public final class SparkRunnerRegistrar { + private SparkRunnerRegistrar() {} + + /** Registers the {@link SparkRunner}. */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { +@Override +public Iterable>> getPipelineRunners() { + return ImmutableList.of(SparkRunner.class); +} + } + + /** Registers the {@link SparkPipelineOptions}. */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { +@Override +public Iterable> getPipelineOptions() { + return ImmutableList.of(SparkPipelineOptions.class); +} + } +}
[beam] branch master updated (2dc3ffe -> 50e4e0d)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 2dc3ffe Merge pull request #7033 from Ensure global top is non-empty. add bf70df5 [BEAM-6099] Add PFADD instruction to RedisIO new 50e4e0d Merge pull request #7093: [BEAM-6099] RedisIO support for PFADD operation The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 14 ++- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 28 ++ 2 files changed, 41 insertions(+), 1 deletion(-)
[beam] 01/01: Merge pull request #7093: [BEAM-6099] RedisIO support for PFADD operation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 50e4e0d93f0b79d0963a0c61d7bc0dcf1ee349bc Merge: 2dc3ffe bf70df5 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Wed Nov 21 17:35:23 2018 +0100 Merge pull request #7093: [BEAM-6099] RedisIO support for PFADD operation .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 14 ++- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 28 ++ 2 files changed, 41 insertions(+), 1 deletion(-)
[beam] branch master updated (054b88c -> 28353e8)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 054b88c Merge pull request #7006: [BEAM-5964] Add ClickHouseIO.Write add d802843 BEAM-6378 - Updating Tika new 28353e8 Merge pull request #7387: [BEAM-6378] Updating Tika The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/java/io/tika/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] 01/01: Merge pull request #7387: [BEAM-6378] Updating Tika
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 28353e84e4b73e7cd50275ef2974f73bdf084c16 Merge: 054b88c d802843 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Mon Jan 7 17:31:29 2019 +0100 Merge pull request #7387: [BEAM-6378] Updating Tika sdks/java/io/tika/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch spark-runner_structured-streaming updated (d5f235d -> c6618c5)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard d5f235d Enable gradle build scan discard 2acdf67 Enable test mode discard a7d2328 Put all transform translators Serializable discard 9fad3d4 Simplify beam reader creation as it created once the source as already been partitioned discard 3be7f2d Fix SourceTest discard 002f0b4 Move SourceTest to same package as tested class discard 47c20c2 Add serialization test discard 43c737b Fix SerializationDebugger discard 5c9fcd3 Add SerializationDebugger discard bab9027 Fix serialization issues new c6618c5 First attempt for ParDo primitive implementation This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (d5f235d) \ N -- N -- N refs/heads/spark-runner_structured-streaming (c6618c5) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: runners/spark-structured-streaming/build.gradle| 4 - .../spark/structuredstreaming/SparkRunner.java | 2 +- .../translation/TransformTranslator.java | 3 +- .../translation/TranslationContext.java| 23 ++- .../translation/batch/DatasetSourceBatch.java | 80 +- .../translation/batch/DoFnFunction.java| 137 .../translation/batch/ParDoTranslatorBatch.java| 174 - .../translation/batch}/SparkProcessContext.java| 32 +--- .../batch/functions/SparkNoOpStepContext.java} | 6 +- .../batch/functions/SparkSideInputReader.java} | 45 +++--- .../spark/structuredstreaming/SourceTest.java | 29 .../translation/batch/SourceTest.java | 79 -- .../utils/SerializationDebugger.java | 131 .../structuredstreaming/utils/package-info.java| 20 --- 14 files changed, 427 insertions(+), 338 deletions(-) create mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java copy runners/{spark/src/main/java/org/apache/beam/runners/spark/translation => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch}/SparkProcessContext.java (88%) copy runners/{flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java} (85%) copy runners/{google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LazilyInitializedSideInputReader.java => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java} (55%) create mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/SerializationDebugger.java delete mode 100644 runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/package-info.java
[beam] 01/01: First attempt for ParDo primitive implementation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c6618c5d4099e3fa2e7f15d7f8388d2b6b9905b0 Author: Alexey Romanenko AuthorDate: Mon Jan 7 10:47:04 2019 +0100 First attempt for ParDo primitive implementation --- .../translation/TranslationContext.java| 12 ++ .../translation/batch/DoFnFunction.java| 137 .../translation/batch/ParDoTranslatorBatch.java| 174 - .../translation/batch/SparkProcessContext.java | 149 ++ .../SparkNoOpStepContext.java} | 24 +-- .../batch/functions/SparkSideInputReader.java | 62 6 files changed, 545 insertions(+), 13 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index e40bb85..ab136dc 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -108,6 +108,13 @@ public class TranslationContext { return (Dataset>) dataset; } + public void putDatasetWildcard(PValue value, Dataset> dataset) { +if (!datasets.containsKey(value)) { + datasets.put(value, dataset); + leaves.add(dataset); +} + } + public void putDataset(PValue value, Dataset> dataset) { if (!datasets.containsKey(value)) { datasets.put(value, dataset); @@ -131,6 +138,11 @@ public class TranslationContext { } @SuppressWarnings("unchecked") + public T getInput(PTransform transform) { +return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + } + + @SuppressWarnings("unchecked") public Map, PValue> getInputs() { return currentTransform.getInputs(); } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java new file mode 100644 index 000..35204bc --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import scala.Tuple2; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class DoFnFunction +implements MapPartitionsFun
[beam] branch spark-runner_structured-streaming updated: Fix spotlessJava issues
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 3533779 Fix spotlessJava issues 3533779 is described below commit 353377986e9b0f006772de7639616fa58cca995c Author: Alexey Romanenko AuthorDate: Fri Dec 28 15:59:34 2018 +0100 Fix spotlessJava issues --- .../translation/TranslationContext.java| 10 ++--- .../translation/batch/DatasetSourceBatch.java | 9 +++-- .../translation/batch/DatasetSourceMockBatch.java | 44 -- .../translation/batch/FlattenTranslatorBatch.java | 2 +- .../batch/ReadSourceTranslatorBatch.java | 18 + .../batch/ReadSourceTranslatorMockBatch.java | 18 + .../streaming/DatasetSourceStreaming.java | 16 +--- .../streaming/ReadSourceTranslatorStreaming.java | 22 +-- .../spark/structuredstreaming/SourceTest.java | 19 +- 9 files changed, 88 insertions(+), 70 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index acc49f4..5606886 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -115,12 +115,12 @@ public class TranslationContext { } } -public void putDatasetRaw(PValue value, Dataset dataset) { - if (!datasets.containsKey(value)) { -datasets.put(value, dataset); -leaves.add(dataset); - } + public void putDatasetRaw(PValue value, Dataset dataset) { +if (!datasets.containsKey(value)) { + datasets.put(value, dataset); + leaves.add(dataset); } + } // // PCollections methods diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index f4cd885..7726ad7 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -50,12 +50,13 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { private TranslationContext context; private BoundedSource source; - - @Override public DataSourceReader createReader(DataSourceOptions options) { + @Override + public DataSourceReader createReader(DataSourceOptions options) { this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); this.bundleSize = context.getOptions().getBundleSize(); -return new DatasetReader(); } +return new DatasetReader(); + } /** This class can be mapped to Beam {@link BoundedSource}. */ private class DatasetReader implements DataSourceReader { @@ -106,7 +107,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { } } - /** This class can be mapped to Beam {@link BoundedReader} */ + /** This class can be mapped to Beam {@link BoundedReader}. */ private class DatasetPartitionReader implements InputPartitionReader { BoundedReader reader; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java index b616a6f..5485257 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java @@ -34,58 +34,64 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.types.StructType; import org.joda.time.Instant; -/** - * This is a mock source that gives values between 0 and 999. - */ +/** This is
[beam] branch spark-runner_structured-streaming updated: First attempt for ParDo primitive implementation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 16c57c3 First attempt for ParDo primitive implementation 16c57c3 is described below commit 16c57c30f52f0d1b76423a68b4321ee602c1e7c0 Author: Alexey Romanenko AuthorDate: Mon Jan 7 10:47:04 2019 +0100 First attempt for ParDo primitive implementation --- .../translation/TranslationContext.java| 12 ++ .../translation/batch/DoFnFunction.java| 137 .../translation/batch/ParDoTranslatorBatch.java| 174 - .../translation/batch/SparkProcessContext.java | 149 ++ .../SparkNoOpStepContext.java} | 24 +-- .../batch/functions/SparkSideInputReader.java | 62 6 files changed, 545 insertions(+), 13 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 9a3330a..33706bd 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -108,6 +108,13 @@ public class TranslationContext { return (Dataset>) dataset; } + public void putDatasetWildcard(PValue value, Dataset> dataset) { +if (!datasets.containsKey(value)) { + datasets.put(value, dataset); + leaves.add(dataset); +} + } + public void putDataset(PValue value, Dataset> dataset) { if (!datasets.containsKey(value)) { datasets.put(value, dataset); @@ -131,6 +138,11 @@ public class TranslationContext { } @SuppressWarnings("unchecked") + public T getInput(PTransform transform) { +return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + } + + @SuppressWarnings("unchecked") public Map, PValue> getInputs() { return currentTransform.getInputs(); } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java new file mode 100644 index 000..35204bc --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import sc
[beam] branch master updated: [BEAM-4904] Update embedded Mongo to version 2.2.0
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 135a04b [BEAM-4904] Update embedded Mongo to version 2.2.0 new 72c7f38 Merge pull request #7647: [BEAM-4904] Update embedded Mongo to version 2.2.0 135a04b is described below commit 135a04b3edd3b1dcd7e620c37370f993051b22dc Author: Ismaël Mejía AuthorDate: Mon Jan 28 10:31:56 2019 +0100 [BEAM-4904] Update embedded Mongo to version 2.2.0 --- sdks/java/io/mongodb/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index 9b0cd93..0efb1d5 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -37,6 +37,6 @@ dependencies { testCompile library.java.hamcrest_library testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadow") testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest") - testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:2.1.1" - testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:2.0.5" + testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:2.2.0" + testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:2.1.2" }
[beam] branch master updated: Add a guideline for commits history
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c890643 Add a guideline for commits history new 1eb2164 Merge pull request #8116: Update a guideline for commits history c890643 is described below commit c8906435795f92af6a3450c9b4a1d0780ca2d35b Author: Etienne Chauchot AuthorDate: Fri Mar 22 14:35:29 2019 +0100 Add a guideline for commits history --- website/src/contribute/committer-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/src/contribute/committer-guide.md b/website/src/contribute/committer-guide.md index 4a2ee71..4a79432 100644 --- a/website/src/contribute/committer-guide.md +++ b/website/src/contribute/committer-guide.md @@ -41,6 +41,7 @@ Granularity of changes: * It is OK to keep separate commits for different logical pieces of the code, if they make reviewing and revisiting code easier * Making commits isolated is a good practice, authors should be able to relatively easily split the PR upon reviewer's request * Generally, every commit should compile and pass tests. +* Avoid keeping in history formatting messages such as checkstyle or spotless fixes. Squash such commits with previous one. ## Always get to LGTM ("Looks good to me!")
[beam] branch spark-runner_structured-streaming updated: Don't use Reshuffle translation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new a4483c6 Don't use Reshuffle translation a4483c6 is described below commit a4483c623cec334b4a24b252f9e559a2cc15de9a Author: Alexey Romanenko AuthorDate: Thu Mar 21 11:09:28 2019 +0100 Don't use Reshuffle translation --- .../translation/batch/PipelineTranslatorBatch.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index a111ea4..f75df21 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.Window; @@ -60,7 +59,9 @@ public class PipelineTranslatorBatch extends PipelineTranslator { TRANSFORM_TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); TRANSFORM_TRANSLATORS.put(Combine.Globally.class, new CombineGloballyTranslatorBatch()); TRANSFORM_TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); -TRANSFORM_TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); + +// TODO: Do we need to have a dedicated translator for {@code Reshuffle} if it's deprecated? +//TRANSFORM_TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); TRANSFORM_TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslatorBatch());
[beam] branch master updated: [BEAM-6976] Fix incorrect doc of HadoopFormatIO on partitioner
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8af8929 [BEAM-6976] Fix incorrect doc of HadoopFormatIO on partitioner new 9521fcb Merge pull request: [BEAM-6976] Fix incorrect doc of HadoopFormatIO on partitioner 8af8929 is described below commit 8af89294935a32be1fe9c783292300b7f3b77453 Author: JohnZZGithub AuthorDate: Tue Apr 2 12:10:52 2019 -0700 [BEAM-6976] Fix incorrect doc of HadoopFormatIO on partitioner wq# modified: sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java --- .../main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java | 2 +- website/src/documentation/io/built-in-hadoop.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java index 1d7fc32..8819024 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java @@ -247,7 +247,7 @@ import org.slf4j.LoggerFactory; *MyDbOutputFormatKeyClass, Object.class); * myHadoopConfiguration.setClass(mapreduce.job.output.value.class, *MyDbOutputFormatValueClass, Object.class); - * myHadoopConfiguration.setClass(mapreduce.job.output.value.class, + * myHadoopConfiguration.setClass(mapreduce.job.partitioner.class, *MyPartitionerClass, Object.class); * myHadoopConfiguration.setInt(mapreduce.job.reduces, 2); * } diff --git a/website/src/documentation/io/built-in-hadoop.md b/website/src/documentation/io/built-in-hadoop.md index 37ae66f..fd330ec 100644 --- a/website/src/documentation/io/built-in-hadoop.md +++ b/website/src/documentation/io/built-in-hadoop.md @@ -376,7 +376,7 @@ myHadoopConfiguration.setClass("mapreduce.job.output.key.class", MyDbOutputFormatKeyClass, Object.class); myHadoopConfiguration.setClass("mapreduce.job.output.value.class", MyDbOutputFormatValueClass, Object.class); -myHadoopConfiguration.setClass("mapreduce.job.output.value.class", +myHadoopConfiguration.setClass("mapreduce.job.partitioner.class", MyPartitionerClass, Object.class); myHadoopConfiguration.setInt("mapreduce.job.reduces", 2); ```
[beam] branch master updated: [BEAM-6268] Adjust Cassandra ports
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0fd297e [BEAM-6268] Adjust Cassandra ports new ce64ad5 Merge pull request #7317: [BEAM-6268] Adjust Cassandra ports 0fd297e is described below commit 0fd297ed628ca94bbc342621aceaca0f9ba71172 Author: Alexey Romanenko AuthorDate: Wed Dec 19 16:39:01 2018 +0100 [BEAM-6268] Adjust Cassandra ports --- .../beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java| 6 -- .../sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java | 6 +- .../java/io/hadoop-input-format/src/test/resources/cassandra.yaml | 8 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java index 4b88e32..6000c79 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java @@ -38,20 +38,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests to validate HadoopFormatIO for embedded Cassandra instance. */ -@Ignore("Ignored because of BEAM-6268") @RunWith(JUnit4.class) public class HadoopFormatIOCassandraTest implements Serializable { private static final long serialVersionUID = 1L; private static final String CASSANDRA_KEYSPACE = "beamdb"; private static final String CASSANDRA_HOST = "127.0.0.1"; private static final String CASSANDRA_TABLE = "scientists"; + private static final String CASSANDRA_NATIVE_PORT_PROPERTY = "cassandra.input.native.port"; private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port"; private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address"; private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY = @@ -60,6 +59,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { private static final String CASSANDRA_KEYSPACE_PROPERTY = "cassandra.input.keyspace"; private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = "cassandra.input.columnfamily"; private static final String CASSANDRA_PORT = "9061"; + private static final String CASSANDRA_NATIVE_PORT = "9042"; private static transient Cluster cluster; private static transient Session session; private static final long TEST_DATA_ROW_COUNT = 10L; @@ -140,6 +140,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { */ private Configuration getConfiguration() { Configuration conf = new Configuration(); +conf.set(CASSANDRA_NATIVE_PORT_PROPERTY, CASSANDRA_NATIVE_PORT); conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT); conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST); conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE); @@ -189,6 +190,7 @@ public class HadoopFormatIOCassandraTest implements Serializable { .addContactPoint(CASSANDRA_HOST) .withClusterName("beam") .withSocketOptions(socketOptions) +.withPort(Integer.valueOf(CASSANDRA_NATIVE_PORT)) .build(); session = cluster.connect(); createCassandraData(); diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java index 0273261..c8d7d8d 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java @@ -50,6 +50,7 @@ public class HIFIOWithEmbeddedCassandraTest implements Serializable { private static final String CASSANDRA_KEYSPACE = "beamdb"; private static final String CASSANDRA_HOST = "127.0.0.1"; private static final String CASSANDRA_TABLE = "scientists"; + private static final String CASSANDRA_NATIVE_PORT_PROPERTY = "cassandra.input.native.port"; private static final String CASS
[beam] branch master updated: [BEAM-5115] Make ValueProvider API consistent between XmlIO and XmlSource
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7ac0790 [BEAM-5115] Make ValueProvider API consistent between XmlIO and XmlSource new 3d6315b Merge pull request #8068: [BEAM-5115] Make ValueProvider API consistent between XmlIO and XmlSource 7ac0790 is described below commit 7ac0790b1d592cbd9bbefe62f8c7048ff8084fe3 Author: Ismaël Mejía AuthorDate: Fri Mar 15 14:23:43 2019 +0100 [BEAM-5115] Make ValueProvider API consistent between XmlIO and XmlSource --- .../src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java | 17 - .../test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java | 4 +++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 5b2dbc7..17fdcf4 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.ReadAllViaFileBasedSource; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -232,7 +233,7 @@ public class XmlIO { abstract MappingConfiguration getConfiguration(); @Nullable -abstract String getFileOrPatternSpec(); +abstract ValueProvider getFileOrPatternSpec(); abstract Compression getCompression(); @@ -244,7 +245,7 @@ public class XmlIO { abstract static class Builder { abstract Builder setConfiguration(MappingConfiguration configuration); - abstract Builder setFileOrPatternSpec(String fileOrPatternSpec); + abstract Builder setFileOrPatternSpec(ValueProvider fileOrPatternSpec); abstract Builder setCompression(Compression compression); @@ -291,6 +292,14 @@ public class XmlIO { * file should be of the form defined in {@link #read}. */ public Read from(String fileOrPatternSpec) { + return from(StaticValueProvider.of(fileOrPatternSpec)); +} + +/** + * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML + * file should be of the form defined in {@link #read}. Using ValueProviders. + */ +public Read from(ValueProvider fileOrPatternSpec) { return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); } @@ -371,9 +380,7 @@ public class XmlIO { @VisibleForTesting BoundedSource createSource() { - return CompressedSource.from( - new XmlSource<>( - StaticValueProvider.of(getFileOrPatternSpec()), getConfiguration(), 1L)) + return CompressedSource.from(new XmlSource<>(getFileOrPatternSpec(), getConfiguration(), 1L)) .withCompression(getCompression()); } diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java index 102b38a..ba778a2 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlIOTest.java @@ -128,7 +128,9 @@ public class XmlIOTest { PCollection readBack = readPipeline.apply( XmlIO.read() -.from(new File(tmpFolder.getRoot(), "birds").getAbsolutePath() + "*") +.from( +readPipeline.newProvider( +new File(tmpFolder.getRoot(), "birds").getAbsolutePath() + "*")) .withRecordClass(Bird.class) .withRootElement("birds") .withRecordElement("bird")
[beam] branch spark-runner_structured-streaming updated: Added SideInput support
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 49ab275 Added SideInput support 49ab275 is described below commit 49ab27554bc6fc44f5f5f23c5d0a6535fb4a158d Author: Alexey Romanenko AuthorDate: Tue Mar 19 19:33:11 2019 +0100 Added SideInput support --- .../translation/TranslationContext.java| 5 + .../translation/batch/DoFnFunction.java| 11 +- .../translation/batch/ParDoTranslatorBatch.java| 48 +-- .../batch/functions/NoOpSideInputReader.java | 56 .../batch/functions/SparkSideInputReader.java | 148 + .../translation/helpers/CoderHelpers.java | 47 +++ .../translation/helpers/SideInputBroadcast.java| 28 .../translation/batch/ParDoTest.java | 80 +-- 8 files changed, 339 insertions(+), 84 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 013ef75..d2ace25 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -139,6 +139,11 @@ public class TranslationContext { } } + @SuppressWarnings("unchecked") + public Dataset getSideInputDataSet(PCollectionView value) { +return (Dataset) broadcastDataSets.get(value); + } + // // PCollections methods // diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java index 0409a79..4449082 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -28,11 +28,11 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpSideInputReader; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -62,6 +62,7 @@ public class DoFnFunction private final TupleTag mainOutputTag; private final Coder inputCoder; private final Map, Coder> outputCoderMap; + private final SideInputBroadcast broadcastStateData; public DoFnFunction( DoFn doFn, @@ -71,7 +72,8 @@ public class DoFnFunction List> additionalOutputTags, TupleTag mainOutputTag, Coder inputCoder, - Map, Coder> outputCoderMap) { + Map, Coder> outputCoderMap, + SideInputBroadcast broadcastStateData) { this.doFn = doFn; this.sideInputs = sideInputs; @@ -81,6 +83,7 @@ public class DoFnFunction this.mainOutputTag = mainOutputTag; this.inputCoder = inputCoder; this.outputCoderMap = outputCoderMap; +this.broadcastStateData = broadcastStateData; } @Override @@ -93,7 +96,7 @@ public class DoFnFunction DoFnRunners.simpleRunner( serializableOptions.get(), doFn, -new NoOpSideInputReader(sideInputs), +new SparkSideInputReader(sideInputs, broadcastStateData), outputManager, mainOutputTag, additionalOutputTags, diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstrea
[beam] branch spark-runner_structured-streaming updated: Fix CheckStyle violations
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 6a78af7 Fix CheckStyle violations 6a78af7 is described below commit 6a78af72a66b4175c9659c59a35923df5d75aabf Author: Alexey Romanenko AuthorDate: Wed Mar 20 12:11:24 2019 +0100 Fix CheckStyle violations --- .../translation/SchemaHelpers.java | 1 + .../batch/AggregatorCombinerGlobally.java| 1 + .../batch/CreatePCollectionViewTranslatorBatch.java | 3 +-- .../translation/batch/DatasetSourceBatch.java| 1 - .../translation/batch/DoFnFunction.java | 2 +- .../translation/batch/ParDoTranslatorBatch.java | 8 +--- .../translation/batch/PipelineTranslatorBatch.java | 1 - .../translation/batch/ProcessContext.java| 3 ++- .../batch/functions/AggregatorCombinerPerKey.java| 1 + .../batch/functions/SparkSideInputReader.java| 18 -- .../functions/package-info.java} | 20 ++-- .../translation/helpers/CoderHelpers.java| 3 +-- .../translation/helpers/RowHelpers.java | 7 +++ .../translation/helpers/SideInputBroadcast.java | 5 +++-- .../package-info.java} | 20 ++-- .../translation/batch/ParDoTest.java | 2 +- .../translation/batch/WindowAssignTest.java | 4 .../utils/SerializationDebugger.java | 1 + 18 files changed, 33 insertions(+), 68 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SchemaHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SchemaHelpers.java index 4efb28e..13fbfb8 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SchemaHelpers.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SchemaHelpers.java @@ -22,6 +22,7 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +/** A {@link SchemaHelpers} for the Spark Batch Runner. */ public class SchemaHelpers { public static StructType binarySchema() { // we use a binary schema for now because: diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java index 92aeea5..539f502 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java @@ -25,6 +25,7 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.Aggregator; +/** An {@link Aggregator} for the Spark Batch Runner. */ public class AggregatorCombinerGlobally extends Aggregator { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java index df4d252..d412b86 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java @@ -1,5 +1,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import java.io.IOException; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -10,8 +11,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.spark.sql.Dataset; -import java.io.IOException; - class CreatePCollectionViewTranslatorBatch implements TransformTranslator, PCollection>> {
[beam] branch master updated: [BEAM-6854] Update amazon-web-services aws-sdk dependency to version 1.11.519
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f60832b [BEAM-6854] Update amazon-web-services aws-sdk dependency to version 1.11.519 new 1d08260 Merge pull request #8083: [BEAM-6854] Update amazon-web-services aws-sdk dependency to version 1.11.519 f60832b is described below commit f60832b368b9bb6bd98a13069a265a82bf32655b Author: Ismaël Mejía AuthorDate: Mon Mar 18 18:46:35 2019 +0100 [BEAM-6854] Update amazon-web-services aws-sdk dependency to version 1.11.519 It also unifies AWS SDK versions for the kinesis and amazon-web-services modules --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 33 +- sdks/java/io/amazon-web-services/build.gradle | 12 .../org/apache/beam/sdk/io/aws/sqs/SqsIOTest.java | 1 - sdks/java/io/kinesis/build.gradle | 8 ++ 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8ef5246..df2ff37 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -340,30 +340,31 @@ class BeamModulePlugin implements Plugin { // These versions are defined here because they represent // a dependency version which should match across multiple // Maven artifacts. +def apex_core_version = "3.7.0" +def apex_malhar_version = "3.4.0" +def aws_java_sdk_version = "1.11.519" +def cassandra_driver_version = "3.6.0" def generated_grpc_beta_version = "0.44.0" def generated_grpc_ga_version = "1.43.0" +def google_auth_version = "0.12.0" +def google_clients_version = "1.27.0" def google_cloud_bigdataoss_version = "1.9.16" +def google_cloud_core_version = "1.61.0" def google_cloud_spanner_version = "1.6.0" -def google_clients_version = "1.27.0" -def google_auth_version = "0.12.0" def grpc_version = "1.17.1" -def protobuf_version = "3.6.0" def guava_version = "20.0" -def netty_version = "4.1.30.Final" -def proto_google_common_protos_version = "1.12.0" -def hamcrest_version = "1.3" def hadoop_version = "2.7.3" +def hamcrest_version = "1.3" def jackson_version = "2.9.8" -def spark_version = "2.4.0" -def nemo_version = "0.1" -def apex_core_version = "3.7.0" -def apex_malhar_version = "3.4.0" -def postgres_version = "42.2.2" def jaxb_api_version = "2.2.12" def kafka_version = "1.0.0" +def nemo_version = "0.1" +def netty_version = "4.1.30.Final" +def postgres_version = "42.2.2" +def proto_google_common_protos_version = "1.12.0" +def protobuf_version = "3.6.0" def quickcheck_version = "0.8" -def google_cloud_core_version = "1.61.0" -def cassandra_driver_version = "3.6.0" +def spark_version = "2.4.0" // A map of maps containing common libraries used per language. To use: // dependencies { @@ -385,6 +386,12 @@ class BeamModulePlugin implements Plugin { args4j : "args4j:args4j:2.33", avro: "org.apache.avro:avro:1.8.2", avro_tests : "org.apache.avro:avro:1.8.2:tests", +aws_java_sdk_cloudwatch : "com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version", +aws_java_sdk_core : "com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version", +aws_java_sdk_kinesis: "com.amazonaws:aws-java-sdk-kinesis:$aws_java_sdk_version", +aws_java_sdk_s3 : "com.amazonaws:aws-java-sdk-s3:$aws_java_sdk_version", +aws_java_sdk_sns: "com.amazonaws:aws-java-sdk-sns:$aws_java_sdk_version", +aws_java_sdk_sqs: "com.amazonaws:aws-java-sdk-sqs:$aws_java_sdk_version", bigdataoss_gcsio: "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_
[beam] branch spark-runner_structured-streaming updated (a4483c6 -> e5921c5)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from a4483c6 Don't use Reshuffle translation new 319f360 Added using CachedSideInputReader new e5921c5 Added TODO comment for ReshuffleTranslatorBatch The 19447 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../translation/batch/DoFnFunction.java| 3 +- .../batch/ReshuffleTranslatorBatch.java| 1 + .../translation/utils}/CachedSideInputReader.java | 76 +--- .../translation/utils/SideInputStorage.java} | 82 +++--- .../translation/{ => utils}/package-info.java | 4 +- 5 files changed, 78 insertions(+), 88 deletions(-) copy runners/{spark/src/main/java/org/apache/beam/runners/spark/util => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils}/CachedSideInputReader.java (59%) copy runners/{spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java => spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java} (54%) copy runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/{ => utils}/package-info.java (91%)
[beam] 01/01: Merge pull request #8108: Correct spark version used by the spark runner in the website
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 133c56d225e2c8d976208c675d607f8a0344862f Merge: c961139 1a839ef Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Thu Mar 21 16:30:53 2019 +0100 Merge pull request #8108: Correct spark version used by the spark runner in the website website/src/documentation/runners/spark.md | 16 1 file changed, 8 insertions(+), 8 deletions(-)
[beam] branch master updated (c961139 -> 133c56d)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from c961139 Merge pull request #8086 [BEAM-6865] DefaultJobBundleFactory and environmentFactoryProviderMap add 1a839ef Correct spark version used by the spark runner in the website new 133c56d Merge pull request #8108: Correct spark version used by the spark runner in the website The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: website/src/documentation/runners/spark.md | 16 1 file changed, 8 insertions(+), 8 deletions(-)
[beam] branch master updated: [BEAM-6483] Add support for SADD operation to RedisIO.Write
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3e2f06a [BEAM-6483] Add support for SADD operation to RedisIO.Write new 664e5ab Merge pull request #7587: [BEAM-6483] Add support for SADD operation to RedisIO.Write 3e2f06a is described below commit 3e2f06a8f1634220e87649969e16d5b5d47aac12 Author: Kengo Seki AuthorDate: Tue Jan 22 10:11:10 2019 -0800 [BEAM-6483] Add support for SADD operation to RedisIO.Write For now, RedisIO.Write supports write methods for string (APPEND, SET), list (LPUSH, RPUSH) and HyperLogLog (PFADD), but not for set (SADD). This PR adds it. In addition, I did the following refactoring in this: * make the input value check for port number stricter * replace a magic number indicating the end of a loop with a constant * remove an unnecessary argument from writeUsingHLLCommand, which is a private method used only internally --- .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 20 .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 28 ++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java index 8d6b0be..06ba187 100644 --- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java +++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java @@ -167,7 +167,7 @@ public class RedisIO { public Read withEndpoint(String host, int port) { checkArgument(host != null, "host can not be null"); - checkArgument(port > 0, "port can not be negative or 0"); + checkArgument(0 < port && port < 65536, "port must be a positive integer less than 65536"); return builder() .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port)) .build(); @@ -320,7 +320,7 @@ public class RedisIO { processContext.output(k); } cursor = scanResult.getStringCursor(); -if ("0".equals(cursor)) { +if (cursor.equals(ScanParams.SCAN_POINTER_START)) { finished = true; } } @@ -446,6 +446,9 @@ public class RedisIO { */ RPUSH, + /** Use SADD command. Insert value in a set. Duplicated values are ignored. */ + SADD, + /** Use PFADD command. Insert value in a HLL structure. Create key if it doesn't exist */ PFADD } @@ -570,8 +573,10 @@ public class RedisIO { writeUsingSetCommand(record, expireTime); } else if (Method.LPUSH == method || Method.RPUSH == method) { writeUsingListCommand(record, method, expireTime); +} else if (Method.SADD == method) { + writeUsingSaddCommand(record, expireTime); } else if (Method.PFADD == method) { - writeUsingHLLCommand(record, method, expireTime); + writeUsingHLLCommand(record, expireTime); } } @@ -610,7 +615,14 @@ public class RedisIO { setExpireTimeWhenRequired(key, expireTime); } - private void writeUsingHLLCommand(KV record, Method method, Long expireTime) { + private void writeUsingSaddCommand(KV record, Long expireTime) { +String key = record.getKey(); +String value = record.getValue(); + +pipeline.sadd(key, value); + } + + private void writeUsingHLLCommand(KV record, Long expireTime) { String key = record.getKey(); String value = record.getValue(); diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java index 529a854..c33355c 100644 --- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java +++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java @@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.redis; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.beam.sdk.io.redis.RedisIO.Write.Method; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -29,6 +31,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; import org.junit.
[beam] branch spark-runner_structured-streaming updated: Fix kryo issue in GBK translator with a workaround
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new c0be696 Fix kryo issue in GBK translator with a workaround c0be696 is described below commit c0be696c50be0468f2a82d7720e3004dbff30ead Author: Alexey Romanenko AuthorDate: Wed Feb 6 18:53:40 2019 +0100 Fix kryo issue in GBK translator with a workaround --- .../batch/GroupByKeyTranslatorBatch.java | 25 +++--- .../translation/batch/GroupByKeyTest.java | 4 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 9ecda56..3626181 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,7 +17,8 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; -import com.google.common.collect.Iterators; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -30,6 +31,8 @@ import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.KeyValueGroupedDataset; +import java.util.List; + class GroupByKeyTranslatorBatch implements TransformTranslator< PTransform>, PCollection>>>> { @@ -41,24 +44,30 @@ class GroupByKeyTranslatorBatch Dataset>> input = context.getDataset(context.getInput()); +// Extract key to group by key only. KeyValueGroupedDataset> grouped = input -// extact KV from WindowedValue .map( (MapFunction>, KV>) WindowedValue::getValue, EncoderHelpers.kvEncoder()) -// apply the actual GBK providing a way to extract the K -.groupByKey((MapFunction, K>) KV::getKey, EncoderHelpers.genericEncoder()); +.groupByKey((MapFunction, K>) KV::getKey, EncoderHelpers.genericEncoder()); +// Materialize grouped values, potential OOM because of creation of new iterable Dataset>> materialized = -// create KV> grouped.mapGroups( (MapGroupsFunction, KV>>) -(key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), +// TODO: We need to improve this part and avoid creating of new List (potential OOM) +// (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), +(key, iterator) -> { + List values = Lists.newArrayList(); + while (iterator.hasNext()) { +values.add(iterator.next().getValue()); + } + return KV.of(key, Iterables.unmodifiableIterable(values)); +}, EncoderHelpers.kvEncoder()); -// wrap inside a WindowedValue -//TODO fix: serialization issue +// Window the result into global window. Dataset>>> output = materialized.map( (MapFunction>, WindowedValue>>>) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java index 58a14dc..a069534 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java @@ -48,10 +48,6 @@ public class GroupByKeyTest implements Serializable { pipeline = Pipeline.create(options); } - @Ignore( - "fails with Unable to create serializer " - + "\"com.esotericsoftware.kryo.serializers.FieldSerializer\" for class: " - + "worker.org.gradle.internal.UncheckedException in last map step") @Test public void testGroupByKey() { Map elems = new HashMap<>();
[beam] branch spark-runner_structured-streaming updated: Fix for test elements container in GroupByKeyTest
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new dff0931 Fix for test elements container in GroupByKeyTest dff0931 is described below commit dff0931950c04f086d3e2d2d7bf33d3d54958c79 Author: Alexey Romanenko AuthorDate: Fri Feb 8 18:44:06 2019 +0100 Fix for test elements container in GroupByKeyTest --- .../translation/batch/GroupByKeyTest.java| 20 ++-- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java index a069534..b772070 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkRunner; import org.apache.beam.sdk.Pipeline; @@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -50,13 +49,14 @@ public class GroupByKeyTest implements Serializable { @Test public void testGroupByKey() { -Map elems = new HashMap<>(); -elems.put(1, 1); -elems.put(1, 3); -elems.put(1, 5); -elems.put(2, 2); -elems.put(2, 4); -elems.put(2, 6); +List> elems = new ArrayList<>(); +elems.add(KV.of(1, 1)); +elems.add(KV.of(1, 3)); +elems.add(KV.of(1, 5)); +elems.add(KV.of(2, 2)); +elems.add(KV.of(2, 4)); +elems.add(KV.of(2, 6)); + PCollection> input = pipeline.apply(Create.of(elems)); input.apply(GroupByKey.create()); pipeline.run();
[beam] branch spark-runner_structured-streaming updated: Added "testTwoPardoInRow"
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new c0061ab Added "testTwoPardoInRow" c0061ab is described below commit c0061ab8ac8c896af5635a7ecca94fd255ec4aae Author: Alexey Romanenko AuthorDate: Tue Feb 12 17:06:10 2019 +0100 Added "testTwoPardoInRow" --- .../translation/batch/ParDoTest.java | 27 ++ 1 file changed, 27 insertions(+) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java index 88e862f..48350df 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java @@ -56,4 +56,31 @@ public class ParDoTest implements Serializable { })); pipeline.run(); } + + @Test + public void testTwoPardoInRow() { +PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +input +.apply( +ParDo.of( +new DoFn() { + @ProcessElement + public void processElement(ProcessContext context) { +Integer val = context.element() + 1; +context.output(val); +System.out.println("ParDo1: val = " + val); + } +})) +.apply( +ParDo.of( +new DoFn() { + @ProcessElement + public void processElement(ProcessContext context) { +Integer val = context.element() + 1; +context.output(val); +System.out.println("ParDo2: val = " + val); + } +})); +pipeline.run(); + } }
[beam] branch master updated: Remove gcemd and gcsproxy projects
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5e1720e Remove gcemd and gcsproxy projects new 34fa661 Merge pull request #7820: Remove gcemd and gcsproxy projects 5e1720e is described below commit 5e1720e6cbefe51633b2a020ae4803a65770c8b6 Author: Alexey Romanenko AuthorDate: Tue Feb 12 14:48:05 2019 +0100 Remove gcemd and gcsproxy projects --- settings.gradle | 4 1 file changed, 4 deletions(-) diff --git a/settings.gradle b/settings.gradle index f62eae3..1b36e48 100644 --- a/settings.gradle +++ b/settings.gradle @@ -61,10 +61,6 @@ project(":beam-runners-flink-1.7-job-server").dir = file("runners/flink/1.7/job- include "beam-runners-flink-1.7-job-server-container" project(":beam-runners-flink-1.7-job-server-container").dir = file("runners/flink/1.7/job-server-container") /* End Flink Runner related settings */ -include "beam-runners-gcp-gcemd" -project(":beam-runners-gcp-gcemd").dir = file("runners/gcp/gcemd") -include "beam-runners-gcp-gcsproxy" -project(":beam-runners-gcp-gcsproxy").dir = file("runners/gcp/gcsproxy") include "beam-runners-gearpump" project(":beam-runners-gearpump").dir = file("runners/gearpump") include "beam-runners-google-cloud-dataflow-java"
[beam] branch master updated (34fa661 -> 5b71fd9)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 34fa661 Merge pull request #7820: Remove gcemd and gcsproxy projects new 4a1b983 [BEAM-5392] GroupByKey optimized for non-merging windows new 023f31d [BEAM-5392] small CR corrections new 6283f3d [BEAM-5392] GroupNonMergingWindowsFunctions.groupByKeyAndWindow description new 5b71fd9 Merge pull request #7601: [BEAM-5392] GroupByKey optimised for non-merging windows The 20135 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../spark/coders/BeamSparkRunnerRegistrator.java | 2 + .../spark/translation/EvaluationContext.java | 52 - .../GroupNonMergingWindowsFunctions.java | 252 + .../spark/translation/TransformTranslator.java | 43 ++-- .../org/apache/beam/runners/spark/CacheTest.java | 33 ++- .../GroupNonMergingWindowsFunctionsTest.java | 133 +++ 6 files changed, 477 insertions(+), 38 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctionsTest.java
[beam] branch master updated: [BEAM-6568] Updated Hadoop Input/Output Format IO documentation
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5bdeaf8 [BEAM-6568] Updated Hadoop Input/Output Format IO documentation new 347b186 Merge pull request #7692: [BEAM-6568] Updated Hadoop Input/Output Format IO documentation 5bdeaf8 is described below commit 5bdeaf89576dd11485ee7cf0c8ca634290a8fb18 Author: Alexey Romanenko AuthorDate: Thu Jan 31 18:07:44 2019 +0100 [BEAM-6568] Updated Hadoop Input/Output Format IO documentation --- website/src/documentation/io/built-in-hadoop.md | 153 ++-- website/src/documentation/io/built-in.md| 2 +- 2 files changed, 119 insertions(+), 36 deletions(-) diff --git a/website/src/documentation/io/built-in-hadoop.md b/website/src/documentation/io/built-in-hadoop.md index f136e59..37ae66f 100644 --- a/website/src/documentation/io/built-in-hadoop.md +++ b/website/src/documentation/io/built-in-hadoop.md @@ -1,6 +1,6 @@ --- layout: section -title: "Apache Hadoop InputFormat IO" +title: "Apache Hadoop Input/Output Format IO" section_menu: section-menu/documentation.html permalink: /documentation/io/built-in/hadoop/ --- @@ -18,11 +18,17 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Hadoop InputFormat IO +# Hadoop Input/Output Format IO -A `HadoopInputFormatIO` is a transform for reading data from any source that implements Hadoop's `InputFormat`. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. +> **IMPORTANT!** Previous implementation of Hadoop Input Format IO, called `HadoopInputFormatIO`, is deprecated starting from *Apache Beam 2.10*. Please, use current `HadoopFormatIO` which supports both `InputFormat` and `OutputFormat`. -`HadoopInputFormatIO` allows you to connect to many data sources that do not yet have a Beam IO transform. However, `HadoopInputFormatIO` has to make several performance trade-offs in connecting to `InputFormat`. So, if there is another Beam IO transform for connecting specifically to your data source of choice, we recommend you use that one. +A `HadoopFormatIO` is a transform for reading data from any source or writing data to any sink that implements Hadoop's `InputFormat` or `OurputFormat` accordingly. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. + +`HadoopFormatIO` allows you to connect to many data sources/sinks that do not yet have a Beam IO transform. However, `HadoopFormatIO` has to make several performance trade-offs in connecting to `InputFormat` or `OutputFormat`. So, if there is another Beam IO transform for connecting specifically to your data source/sink of choice, we recommend you use that one. + + + +### Reading using HadoopFormatIO You will need to pass a Hadoop `Configuration` with parameters specifying how the read will occur. Many properties of the `Configuration` are optional and some are required for certain `InputFormat` classes, but the following properties must be set for all `InputFormat` classes: @@ -41,7 +47,7 @@ myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.clas ``` ```py - # The Beam SDK for Python does not support Hadoop InputFormat IO. + # The Beam SDK for Python does not support Hadoop Input/Output Format IO. ``` You will need to check if the `Key` and `Value` classes output by the `InputFormat` have a Beam `Coder` available. If not, you can use `withKeyTranslation` or `withValueTranslation` to specify a method transforming instances of those classes into another class that is supported by a Beam `Coder`. These settings are optional and you don't need to specify translation for both key and value. @@ -63,21 +69,19 @@ new SimpleFunction() { ``` ```py - # The Beam SDK for Python does not support Hadoop InputFormat IO. + # The Beam SDK for Python does not support Hadoop Input/Output Format IO. ``` -### Reading using Hadoop InputFormat IO - Read data only with Hadoop configuration. ```java p.apply("read", - HadoopInputFormatIO.read() + HadoopFormatIO.read() .withConfiguration(myHadoopConfiguration); ``` ```py - # The Beam SDK for Python does not support Hadoop InputFormat IO. + # The Beam SDK for Python does not support Hadoop Input/Output Format IO. ``` Read data with configuration and key translation @@ -86,13 +90,13 @@ For example, a Beam `Coder` is not available for `Key` class, so key translation ```java p.apply("read", - HadoopInputFormatIO.read() + HadoopFormatIO.read() .withConfiguration(myHadoopConfiguration) .withKeyTranslation(myOutputKeyType); ``` ```py - # The Beam SDK for Python does not support Hadoop InputFormat IO. + # The Beam SDK for Python does not suppo
[beam] branch master updated: [BEAM-6426] Fix unvendored guava imports
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 000615d [BEAM-6426] Fix unvendored guava imports new 6ac29ac Merge pull request #7771: [BEAM-6426] Fix unvendored guava imports 000615d is described below commit 000615def0705d7d63fed6fef094c99f062a3d07 Author: Ismaël Mejía AuthorDate: Thu Feb 7 10:39:44 2019 +0100 [BEAM-6426] Fix unvendored guava imports --- .../src/test/java/org/apache/beam/sdk/util/GcsUtilIT.java | 2 +- .../src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsKmsKeyIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilIT.java index 60e4ca1..e428d7b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilIT.java @@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertNotNull; -import com.google.common.collect.Lists; import java.io.IOException; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; @@ -29,6 +28,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsKmsKeyIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsKmsKeyIT.java index 7130195..2f64b1f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsKmsKeyIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsKmsKeyIT.java @@ -22,7 +22,6 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; -import com.google.common.collect.Iterables; import java.io.IOException; import java.util.Collections; import java.util.Date; @@ -42,6 +41,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith;
[beam] branch master updated: [BEAM-6426] Fix regexp to enforce ban of non-vendored Guava
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new bd1eed1 [BEAM-6426] Fix regexp to enforce ban of non-vendored Guava new c654b57 Merge pull request #7701: [BEAM-6426] Fix regexp to enforce ban of non-vendored Guava bd1eed1 is described below commit bd1eed1353ba6bc1a0096747c8ea071d6cf95887 Author: Ismaël Mejía AuthorDate: Wed Feb 6 18:09:40 2019 +0100 [BEAM-6426] Fix regexp to enforce ban of non-vendored Guava Fix the matiching uses of non-vendored guava and adds suppressions for cases when guava is part of the components API. --- .../construction/graph/SideInputReference.java | 2 +- .../beam/runners/direct/CloningBundleFactory.java | 2 +- .../runners/direct/ImmutableListBundleFactory.java | 2 +- .../org/apache/beam/runners/flink/FlinkRunner.java | 2 +- .../FlinkPipelineExecutionEnvironmentTest.java | 2 +- .../beam/runners/flink/FlinkSavepointTest.java | 2 +- ...piMonitoringInfoToCounterUpdateTransformer.java | 2 +- ...ecMonitoringInfoToCounterUpdateTransformer.java | 2 +- .../runners/samza/runtime/SamzaDoFnRunners.java| 2 +- .../translation/FlattenPCollectionsTranslator.java | 2 +- .../samza/translation/GroupByKeyTranslator.java| 2 +- .../samza/translation/ImpulseTranslator.java | 2 +- .../translation/PortableTranslationContext.java| 2 +- .../SamzaPortablePipelineTranslator.java | 2 +- .../samza/util/SamzaPipelineTranslatorUtils.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java| 4 +- .../runners/spark/stateful/StateSpecFunctions.java | 3 +- .../runners/spark/translation/BoundedDataset.java | 4 +- .../spark/translation/GroupCombineFunctions.java | 6 +- .../spark/translation/TransformTranslator.java | 2 +- .../spark/translation/WindowingHelpers.java| 68 -- .../streaming/StreamingTransformTranslator.java| 3 +- .../src/main/resources/beam/checkstyle.xml | 4 +- .../src/main/resources/beam/suppressions.xml | 8 ++- .../splittabledofn/RestrictionTracker.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java| 2 +- .../beam/sdk/schemas/transforms/SelectTest.java| 4 +- .../translate/BroadcastHashJoinTranslator.java | 6 +- .../sql/impl/BeamCalciteSchemaFactory.java | 2 +- .../sdk/extensions/sql/impl/JdbcConnection.java| 2 +- .../extensions/sql/BeamSqlMultipleSchemasTest.java | 2 +- .../sdk/io/gcp/bigquery/FakeDatasetService.java| 10 ++-- .../beam/sdk/io/gcp/bigquery/FakeJobService.java | 22 --- .../io/gcp/bigtable/BigtableServiceImplTest.java | 2 +- .../org/apache/beam/sdk/io/kinesis/KinesisIO.java | 2 +- .../beam/sdk/io/kinesis/KinesisProducerMock.java | 5 +- 36 files changed, 68 insertions(+), 125 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java index e329743..d3ce0e0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java @@ -18,13 +18,13 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; -import com.google.common.base.MoreObjects; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; /** * A reference to a side input. This includes the PTransform that references the side input as well diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java index 25571da..9f1605d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import com.google.common.base.MoreObjects; import org.apache.beam.runners.local.StructuralKey; import org.apache.beam.sdk.coders.Coder; import
[beam] branch spark-runner_structured-streaming updated: Fix getSideInputs
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 9fcc955 Fix getSideInputs 9fcc955 is described below commit 9fcc955f5722dcc7899f6ec91b9432444a8dd46c Author: Alexey Romanenko AuthorDate: Tue Feb 19 17:01:04 2019 +0100 Fix getSideInputs --- .../translation/TranslationContext.java| 11 ++ .../CreatePCollectionViewTranslatorBatch.java | 40 ++ .../translation/batch/ParDoTranslatorBatch.java| 1 + .../translation/batch/PipelineTranslatorBatch.java | 4 +++ .../translation/batch/ParDoTest.java | 27 +++ 5 files changed, 83 insertions(+) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 6711b1c..013ef75 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.SparkConf; @@ -61,6 +62,8 @@ public class TranslationContext { @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy private SparkSession sparkSession; + private final Map, Dataset> broadcastDataSets; + public TranslationContext(SparkPipelineOptions options) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(options.getSparkMaster()); @@ -73,6 +76,7 @@ public class TranslationContext { this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); this.leaves = new HashSet<>(); +this.broadcastDataSets = new HashMap<>(); } public SparkSession getSparkSession() { @@ -128,6 +132,13 @@ public class TranslationContext { } } + public void setSideInputDataset( + PCollectionView value, Dataset> set) { +if (!broadcastDataSets.containsKey(value)) { + broadcastDataSets.put(value, set); +} + } + // // PCollections methods // diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java new file mode 100644 index 000..df4d252 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CreatePCollectionViewTranslatorBatch.java @@ -0,0 +1,40 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.sql.Dataset; + +import java.io.IOException; + +class CreatePCollectionViewTranslatorBatch +implements TransformTranslator, PCollection>> { + + @Override + public void translateTransform( + PTransform, PCollection> transform, TranslationContext context) { + +Dataset> inputDataSet = context.getDataset(context.getInput()); + +@SuppressWarnings("unchecked") AppliedPTransform< +PCollection, PCollection, +PTransform, PCollection>> +application = +(AppliedPTransform< +PCollection, PCollection, +PTransform, PCollection>>) +context.getCurrentTransform(); +PCollectionView input; +
[beam] branch master updated: [BEAM-6305] Update Cassandra java driver to version 3.6.0
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c30eefa [BEAM-6305] Update Cassandra java driver to version 3.6.0 new efb83e6 Merge pull request #7658: [BEAM-6305] Update Cassandra java driver to version 3.6.0 c30eefa is described below commit c30eefaa8f64981e9df0f286e9d58dd7e5fdc473 Author: Ismaël Mejía AuthorDate: Tue Jan 29 10:09:30 2019 +0100 [BEAM-6305] Update Cassandra java driver to version 3.6.0 --- .../src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +++ sdks/java/io/cassandra/build.gradle | 6 ++ sdks/java/io/hadoop-format/build.gradle | 6 ++ sdks/java/io/hadoop-input-format/build.gradle | 6 ++ 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index d5a8e82..c9ffd24 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -351,6 +351,7 @@ class BeamModulePlugin implements Plugin { def kafka_version = "1.0.0" def quickcheck_version = "0.8" def google_cloud_core_version = "1.36.0" +def cassandra_driver_version = "3.6.0" // A map of maps containing common libraries used per language. To use: // dependencies { @@ -377,6 +378,8 @@ class BeamModulePlugin implements Plugin { bigtable_client_core: "com.google.cloud.bigtable:bigtable-client-core:$bigtable_version", bigtable_protos : "com.google.api.grpc:grpc-google-cloud-bigtable-v2:$generated_grpc_beta_version", byte_buddy : "net.bytebuddy:byte-buddy:1.9.3", +cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", +cassandra_driver_mapping: "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", commons_compress: "org.apache.commons:commons-compress:1.16.1", commons_csv : "org.apache.commons:commons-csv:1.4", commons_io_1x : "commons-io:commons-io:1.3.2", diff --git a/sdks/java/io/cassandra/build.gradle b/sdks/java/io/cassandra/build.gradle index 3494147..298758c 100644 --- a/sdks/java/io/cassandra/build.gradle +++ b/sdks/java/io/cassandra/build.gradle @@ -29,14 +29,12 @@ enableJavaPerformanceTesting() description = "Apache Beam :: SDKs :: Java :: IO :: Cassandra" ext.summary = "IO to read and write with Apache Cassandra database" -def cassandra_version = "3.5.0" - dependencies { shadow library.java.vendored_guava_20_0 shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow library.java.slf4j_api - shadow "com.datastax.cassandra:cassandra-driver-core:$cassandra_version" - shadow "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_version" + shadow library.java.cassandra_driver_core + shadow library.java.cassandra_driver_mapping testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest") testCompile library.java.junit diff --git a/sdks/java/io/hadoop-format/build.gradle b/sdks/java/io/hadoop-format/build.gradle index 8db1b5d..5dabf80 100644 --- a/sdks/java/io/hadoop-format/build.gradle +++ b/sdks/java/io/hadoop-format/build.gradle @@ -28,8 +28,6 @@ ext.summary = "IO to read data from sources and to write data to sinks that impl def log4j_version = "2.6.2" def elastic_search_version = "5.0.0" -// Migrate to using a version of the driver compatible with Guava 20 -def cassandra_driver = "3.2.0" configurations.create("sparkRunner") configurations.sparkRunner { @@ -76,9 +74,9 @@ dependencies { exclude group: "org.apache.spark", module: "spark-sql_2.10" exclude group: "org.apache.storm", module: "storm-core" } - testCompile "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver" + testCompile library.java.cassandra_driver_core + testCompile library.java.cassandra_driver_mapping testCompile "org.apache.cassandra:cassandra-all:3.11.3" - testCompile "
[beam] branch master updated: [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7f17201 [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read() 7f17201 is described below commit 7f17201640881e7f4bbf85c1d337735ba66168d6 Author: XuMingmin AuthorDate: Wed Jan 30 01:28:14 2019 -0800 [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read() --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 23 + .../beam/sdk/io/kafka/KafkaUnboundedReader.java| 56 +- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 43 + 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8b3218b..f27ec68 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -345,6 +345,9 @@ public class KafkaIO { abstract TimestampPolicyFactory getTimestampPolicyFactory(); +@Nullable +abstract Map getOffsetConsumerConfig(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -380,6 +383,8 @@ public class KafkaIO { abstract Builder setTimestampPolicyFactory( TimestampPolicyFactory timestampPolicyFactory); + abstract Builder setOffsetConsumerConfig(Map offsetConsumerConfig); + abstract Read build(); } @@ -656,6 +661,24 @@ public class KafkaIO { return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build(); } +/** + * Set additional configuration for the backend offset consumer. It may be required for a + * secured Kafka cluster, especially when you see similar WARN log message 'exception while + * fetching latest offset for partition {}. will be retried'. + * + * In {@link KafkaIO#read()}, there're two consumers running in the backend actually: + * 1. the main consumer, which reads data from kafka; + * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest + * offset; + * + * By default, offset consumer inherits the configuration from main consumer, with an + * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka + * which requires more configurations. + */ +public Read withOffsetConsumerConfigOverrides(Map offsetConsumerConfig) { + return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build(); +} + /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform>> withoutMetadata() { return new TypedWithoutMetadata<>(this); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ee058aa..580b0bc 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -141,28 +141,7 @@ class KafkaUnboundedReader extends UnboundedReader> { consumerPollThread.submit(this::consumerPollLoop); // offsetConsumer setup : - -Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG); -// override group_id and disable auto_commit so that it does not interfere with main consumer -String offsetGroupId = -String.format( -"%s_offset_consumer_%d_%s", -name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); -Map offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig()); -offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); -offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); -// Force read isolation level to 'read_uncommitted' for offset consumer. This consumer -// fetches latest offset for two reasons : (a) to calculate backlog (number of records -// yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do -// for (a) is to leave this config unchanged from the main config (i.e. if there are records -// that can't be read because of uncommitted records before them, they shouldn't -// ideally count towards backlog when "read_committed" is enabled. But (b) -// requires finding out if there are any records left to be read (committed or uncommitted). -// Rather than using two separate consumers we will go with better support for (b). If we do -// hit a case where a
[beam] branch master updated: [BEAM-3667] Make embedded MongoDB instance per class and test code more robust
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f1a3967 [BEAM-3667] Make embedded MongoDB instance per class and test code more robust new 3496b64 Merge pull request #7649: [BEAM-3667] Make embedded MongoDB instance per class and test code more robust f1a3967 is described below commit f1a3967f2f7253bcd573f82870e0a2c7c574fc0b Author: Ismaël Mejía AuthorDate: Mon Jan 28 09:35:54 2019 +0100 [BEAM-3667] Make embedded MongoDB instance per class and test code more robust --- .../beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 39 ++-- .../apache/beam/sdk/io/mongodb/MongoDbIOTest.java | 244 - 2 files changed, 108 insertions(+), 175 deletions(-) diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index d8a5bf3..51beb5e 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -34,18 +34,15 @@ import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; import de.flapdoodle.embed.mongo.config.Net; import de.flapdoodle.embed.mongo.config.Storage; import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.process.io.file.Files; import de.flapdoodle.embed.process.runtime.Network; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.Serializable; import java.net.ServerSocket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -72,44 +69,40 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Test on the MongoDbGridFSIO. */ -public class MongoDBGridFSIOTest implements Serializable { +public class MongoDBGridFSIOTest { private static final Logger LOG = LoggerFactory.getLogger(MongoDBGridFSIOTest.class); - private static final String MONGODB_LOCATION = "target/mongodb"; + @ClassRule public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder(); private static final String DATABASE = "gridfs"; - private static final transient MongodStarter mongodStarter = MongodStarter.getDefaultInstance(); - - private static transient MongodExecutable mongodExecutable; - private static transient MongodProcess mongodProcess; + private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance(); + private static MongodExecutable mongodExecutable; + private static MongodProcess mongodProcess; private static int port; - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); @BeforeClass - public static void setup() throws Exception { + public static void start() throws Exception { try (ServerSocket serverSocket = new ServerSocket(0)) { port = serverSocket.getLocalPort(); } -LOG.info("Starting MongoDB embedded instance on {}", port); -try { - Files.forceDelete(new File(MONGODB_LOCATION)); -} catch (Exception e) { -} -new File(MONGODB_LOCATION).mkdirs(); +LOG.info("Starting MongoDB embedded instance on {}", port); IMongodConfig mongodConfig = new MongodConfigBuilder() .version(Version.Main.PRODUCTION) .configServer(false) -.replication(new Storage(MONGODB_LOCATION, null, 0)) +.replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), null, 0)) .net(new Net("localhost", port, Network.localhostIsIPv6())) .cmdOptions( new MongoCmdOptionsBuilder() @@ -117,6 +110,7 @@ public class MongoDBGridFSIOTest implements Serializable { .useNoPrealloc(true) .useSmallFiles(true) .useNoJournal(true) +.verbose(false) .build()) .build(); mongodExecutable = mongodStarter.prepare(mongodConfig); @@ -180,15 +174,14 @@ public class MongoDBGridFSIOTest implements Serializable { } @AfterClass - public static void stop() throws Exception { + public static void stop() { LOG.info(&qu
[beam] branch spark-runner_structured-streaming updated: Fixed Javadoc error
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new f0f0138 Fixed Javadoc error f0f0138 is described below commit f0f01389dee96e93b0f9b8a86a78551b9d6bb828 Author: Alexey Romanenko AuthorDate: Fri Jan 25 11:16:37 2019 +0100 Fixed Javadoc error --- .../runners/spark/structuredstreaming/translation/EncoderHelpers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java index 90797c1..2912e3c 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java @@ -31,7 +31,7 @@ public class EncoderHelpers { // the type inference mechanism to infer Encoder> to get back the type checking /** - * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo + * Get a bytes {@link Encoder} for {@link WindowedValue}. Bytes serialisation is issued by Kryo */ @SuppressWarnings("unchecked") public static Encoder windowedValueEncoder() {
[beam] branch spark-runner_structured-streaming updated: Rename SparkSideInputReader class
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new a98c17d Rename SparkSideInputReader class a98c17d is described below commit a98c17d34d1b9103c6faa03eff844530b13aa913 Author: Alexey Romanenko AuthorDate: Fri Jan 25 17:02:50 2019 +0100 Rename SparkSideInputReader class --- .../spark/structuredstreaming/translation/batch/DoFnFunction.java | 4 ++-- .../functions/{SparkSideInputReader.java => NoOpSideInputReader.java} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java index 6067045..8ce98a8 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpSideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -99,7 +99,7 @@ public class DoFnFunction DoFnRunners.simpleRunner( serializedOptions.get(), doFn, -new SparkSideInputReader(sideInputs), +new NoOpSideInputReader(sideInputs), outputManager, mainOutputTag, additionalOutputTags, diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpSideInputReader.java similarity index 91% rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpSideInputReader.java index 91e5385..eca9d95 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpSideInputReader.java @@ -31,10 +31,10 @@ import org.apache.beam.sdk.values.WindowingStrategy; * * A {@link SideInputReader} for the Spark Batch Runner. */ -public class SparkSideInputReader implements SideInputReader { +public class NoOpSideInputReader implements SideInputReader { private final Map, WindowingStrategy> sideInputs; - public SparkSideInputReader(Map, WindowingStrategy> indexByView) { + public NoOpSideInputReader(Map, WindowingStrategy> indexByView) { sideInputs = new HashMap<>(); }
[beam] branch spark-runner_structured-streaming updated: Rename pruneOutput() to pruneOutputFilteredByTag()
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new c077daa Rename pruneOutput() to pruneOutputFilteredByTag() c077daa is described below commit c077daa6bd7b91c86a6c2de2e875cb9a1b9d1279 Author: Alexey Romanenko AuthorDate: Fri Jan 25 17:31:44 2019 +0100 Rename pruneOutput() to pruneOutputFilteredByTag() --- .../structuredstreaming/translation/batch/ParDoTranslatorBatch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 93f9da0..a984615 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -147,7 +147,7 @@ class ParDoTranslatorBatch inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); for (Map.Entry, PValue> output : outputs.entrySet()) { - pruneOutput(context, allOutputsDataset, output); + pruneOutputFilteredByTag(context, allOutputsDataset, output); } } @@ -182,7 +182,7 @@ class ParDoTranslatorBatch return doFn; } - private void pruneOutput( + private void pruneOutputFilteredByTag( TranslationContext context, Dataset, WindowedValue>> tmpDataset, Map.Entry, PValue> output) {
[beam] branch master updated: Update design-documents.md
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 948b862 Update design-documents.md new f6c47df Merge pull request #7694: Update design-documents.md with link to Get Metrics API 948b862 is described below commit 948b86299f0d0fb850f51a0a9d11003e6c50c283 Author: Alex Amato AuthorDate: Thu Jan 31 13:47:40 2019 -0800 Update design-documents.md --- website/src/contribute/design-documents.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/src/contribute/design-documents.md b/website/src/contribute/design-documents.md index 1cc0c81..c483f16 100644 --- a/website/src/contribute/design-documents.md +++ b/website/src/contribute/design-documents.md @@ -67,6 +67,7 @@ This is a collection of documents that may or may not be up to date. - Exactly-once Kafka sink [[doc](https://lists.apache.org/thread.html/fb394e576e6e858205307b033c5a5c6cc3923a17606814a54036c570@%3Cdev.beam.apache.org%3E)] ### Metrics +- Get Metrics API: Metric Extraction via proto RPC API. [[doc](https://s.apache.org/get-metrics-api)] - Metrics API [[doc](http://s.apache.org/beam-metrics-api)] - I/O Metrics [[doc](https://s.apache.org/standard-io-metrics)] - Metrics extraction independent from runners / execution engines [[doc](https://s.apache.org/runner_independent_metrics_extraction)]
[beam] branch master updated: Update design-your-pipeline.md
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2af3bee Update design-your-pipeline.md new 2f997b7 Merge pull request #7436: Update design-your-pipeline.md 2af3bee is described below commit 2af3bee72b21af50a40266c88b6c7a9fe5023db5 Author: J Ross Thomson <39315853+jrossthom...@users.noreply.github.com> AuthorDate: Tue Jan 8 08:21:08 2019 -0500 Update design-your-pipeline.md Seems like this should be the output of the KeyedPCollectionTuple. --- website/src/documentation/pipelines/design-your-pipeline.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/src/documentation/pipelines/design-your-pipeline.md b/website/src/documentation/pipelines/design-your-pipeline.md index 02e5fb1..c04df7f 100644 --- a/website/src/documentation/pipelines/design-your-pipeline.md +++ b/website/src/documentation/pipelines/design-your-pipeline.md @@ -214,7 +214,7 @@ PCollection> joinedCollection = .and(orderTag, userOrder) .apply(CoGroupByKey.create()); -coGbkResultCollection.apply(...); +joinedCollection.apply(...); ``` ## What's next
[beam] branch spark-runner_structured-streaming updated: Don't use deprecated sideInput.getWindowingStrategyInternal()
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new e65815e Don't use deprecated sideInput.getWindowingStrategyInternal() e65815e is described below commit e65815e97b5056a839b0d9e2cce5543a7231d158 Author: Alexey Romanenko AuthorDate: Fri Feb 1 18:50:00 2019 +0100 Don't use deprecated sideInput.getWindowingStrategyInternal() --- .../translation/batch/ParDoTranslatorBatch.java| 22 +- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index a984615..fbb6649 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -61,6 +61,7 @@ class ParDoTranslatorBatch public void translateTransform( PTransform, PCollectionTuple> transform, TranslationContext context) { +// Check for not-supported advanced features // TODO: add support of Splittable DoFn DoFn doFn = getDoFn(context); checkState( @@ -74,10 +75,16 @@ class ParDoTranslatorBatch signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0; checkState(!stateful, "States and timers are not supported for the moment."); +// TODO: add support of SideInputs +List> sideInputs = getSideInputs(context); +final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; +checkState(!hasSideInputs, "SideInputs are not supported for the moment."); + + +// Init main variables Dataset> inputDataSet = context.getDataset(context.getInput()); Map, PValue> outputs = context.getOutputs(); TupleTag mainOutputTag = getTupleTag(context); - Map, Integer> outputTags = Maps.newHashMap(); outputTags.put(mainOutputTag, 0); @@ -98,7 +105,7 @@ class ParDoTranslatorBatch WindowingStrategy windowingStrategy = null; // collect all output Coders and create a UnionCoder for our tagged outputs -List> outputCoders = Lists.newArrayList(); +//List> outputCoders = Lists.newArrayList(); for (TupleTag tag : indexMap.values()) { PValue taggedValue = outputs.get(tag); checkState( @@ -107,7 +114,7 @@ class ParDoTranslatorBatch taggedValue, taggedValue.getClass().getSimpleName()); PCollection coll = (PCollection) taggedValue; - outputCoders.add(coll.getCoder()); +// outputCoders.add(coll.getCoder()); windowingStrategy = coll.getWindowingStrategy(); } @@ -115,18 +122,15 @@ class ParDoTranslatorBatch throw new IllegalStateException("No outputs defined."); } -UnionCoder unionCoder = UnionCoder.of(outputCoders); +//UnionCoder unionCoder = UnionCoder.of(outputCoders); + -List> sideInputs = getSideInputs(context); -final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; -// TODO: add support of SideInputs -checkState(!hasSideInputs, "SideInputs are not supported for the moment."); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows Map, WindowingStrategy> sideInputStrategies = new HashMap<>(); for (PCollectionView sideInput : sideInputs) { - sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + sideInputStrategies.put(sideInput, sideInput.getPCollection().getWindowingStrategy()); } Map, Coder> outputCoderMap = context.getOutputCoders();
[beam] branch master updated: [BEAM-6520] Deprecate MongoDb `withKeepAlive`
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f1fe880 [BEAM-6520] Deprecate MongoDb `withKeepAlive` new a37615a Merge pull request #7648: [BEAM-6520] Deprecate MongoDb `withKeepAlive` f1fe880 is described below commit f1fe880c749810eb1dc2ae4c9c71e33dd23aa793 Author: Ismaël Mejía AuthorDate: Mon Jan 28 11:50:30 2019 +0100 [BEAM-6520] Deprecate MongoDb `withKeepAlive` This is done because it is also deprecated in the Mongo driver. --- .../org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 57 -- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 4f9d33f..e9694f8 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -133,6 +133,10 @@ public class MongoDbIO { @Nullable abstract String uri(); +/** + * @deprecated This is deprecated in the MongoDB API and will be removed in a future version. + */ +@Deprecated abstract boolean keepAlive(); abstract int maxConnectionIdleTime(); @@ -162,7 +166,10 @@ public class MongoDbIO { @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); - + /** + * @deprecated This is deprecated in the MongoDB API and will be removed in a future version. + */ + @Deprecated abstract Builder setKeepAlive(boolean keepAlive); abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); @@ -226,7 +233,13 @@ public class MongoDbIO { return builder().setUri(uri).build(); } -/** Sets whether socket keep alive is enabled. */ +/** + * Sets whether socket keep alive is enabled. + * + * @deprecated configuring keep-alive has been deprecated in the MongoDB Java API. It now + * defaults to true and disabling it is not recommended. + */ +@Deprecated public Read withKeepAlive(boolean keepAlive) { return builder().setKeepAlive(keepAlive).build(); } @@ -246,7 +259,7 @@ public class MongoDbIO { return builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build(); } -/** Enable ignoreSSLCertificate for ssl for connection (allow for self signed ceritificates). */ +/** Enable ignoreSSLCertificate for ssl for connection (allow for self signed certificates). */ public Read withIgnoreSSLCertificate(boolean ignoreSSLCertificate) { return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build(); } @@ -329,7 +342,7 @@ public class MongoDbIO { /** A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. */ @VisibleForTesting static class BoundedMongoDbSource extends BoundedSource { -private Read spec; +private final Read spec; private BoundedMongoDbSource(Read spec) { this.spec = spec; @@ -403,7 +416,7 @@ public class MongoDbIO { // the desired batch size is small, using default chunk size of 1MB if (desiredBundleSizeBytes < 1024L * 1024L) { - desiredBundleSizeBytes = 1L * 1024L * 1024L; + desiredBundleSizeBytes = 1024L * 1024L; } // now we have the batch size (provided by user or provided by the runner) @@ -522,7 +535,7 @@ public class MongoDbIO { private MongoCursor cursor; private Document current; -public BoundedMongoDbReader(BoundedMongoDbSource source) { +BoundedMongoDbReader(BoundedMongoDbSource source) { this.source = source; } @@ -604,7 +617,10 @@ public class MongoDbIO { @Nullable abstract String uri(); - +/** + * @deprecated This is deprecated in the MongoDB API and will be removed in a future version. + */ +@Deprecated abstract boolean keepAlive(); abstract int maxConnectionIdleTime(); @@ -630,7 +646,10 @@ public class MongoDbIO { @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); - + /** + * @deprecated This is deprecated in the MongoDB API and will be removed in a future version. + */ + @Deprecated abstract Builder setKeepAlive(boolean keepAlive); abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); @@ -692,7 +711,13 @@ public class MongoDbIO { return builder().setUri(uri).build(); } -/** Sets whether socket keep alive is enabled. */ +/** + * Sets whether socket keep alive is enabled. + * + * @deprecated configuring keep-alive has been deprecated in the Mong
[beam] branch master updated: [BEAM-4890] Update cassandra-all dependency to version 3.11.3
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 24c0d0d [BEAM-4890] Update cassandra-all dependency to version 3.11.3 new 5275f5f Merge pull request #765: [BEAM-4890] Update cassandra-all dependency to version 3.11.3 24c0d0d is described below commit 24c0d0db7ad73b9613d593003484aac1f29ba81a Author: Ismaël Mejía AuthorDate: Tue Jan 29 10:13:43 2019 +0100 [BEAM-4890] Update cassandra-all dependency to version 3.11.3 --- sdks/java/io/hadoop-format/build.gradle | 2 +- sdks/java/io/hadoop-input-format/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/hadoop-format/build.gradle b/sdks/java/io/hadoop-format/build.gradle index 90bb9e7..8db1b5d 100644 --- a/sdks/java/io/hadoop-format/build.gradle +++ b/sdks/java/io/hadoop-format/build.gradle @@ -77,7 +77,7 @@ dependencies { exclude group: "org.apache.storm", module: "storm-core" } testCompile "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver" - testCompile "org.apache.cassandra:cassandra-all:3.9" + testCompile "org.apache.cassandra:cassandra-all:3.11.3" testCompile "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver" testCompile library.java.postgres testCompile "org.apache.logging.log4j:log4j-core:$log4j_version" diff --git a/sdks/java/io/hadoop-input-format/build.gradle b/sdks/java/io/hadoop-input-format/build.gradle index 1b0c7cd..82ffe04 100644 --- a/sdks/java/io/hadoop-input-format/build.gradle +++ b/sdks/java/io/hadoop-input-format/build.gradle @@ -67,7 +67,7 @@ dependencies { exclude group: "org.apache.storm", module: "storm-core" } testCompile "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver" - testCompile "org.apache.cassandra:cassandra-all:3.9" + testCompile "org.apache.cassandra:cassandra-all:3.11.3" testCompile "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver" testCompile project(path: ":beam-sdks-java-io-jdbc", configuration: "shadow") testCompile library.java.postgres
[beam] branch spark-runner_structured-streaming updated: Simplify logic of ParDo translator
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 16cf3c2 Simplify logic of ParDo translator 16cf3c2 is described below commit 16cf3c2ca6e5a82f1959ce2976a330badd6e6c44 Author: Alexey Romanenko AuthorDate: Mon Feb 4 11:22:10 2019 +0100 Simplify logic of ParDo translator --- .../translation/batch/DoFnFunction.java| 9 ++-- .../translation/batch/ParDoTranslatorBatch.java| 59 -- 2 files changed, 13 insertions(+), 55 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java index 8ce98a8..2989d0d 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import java.util.Collections; import java.util.Iterator; @@ -60,7 +59,7 @@ public class DoFnFunction private final WindowingStrategy windowingStrategy; - private final Map, Integer> outputMap; + private final List> additionalOutputTags; private final TupleTag mainOutputTag; private final Coder inputCoder; private final Map, Coder> outputCoderMap; @@ -72,7 +71,7 @@ public class DoFnFunction WindowingStrategy windowingStrategy, Map, WindowingStrategy> sideInputs, PipelineOptions options, - Map, Integer> outputMap, + List> additionalOutputTags, TupleTag mainOutputTag, Coder inputCoder, Map, Coder> outputCoderMap) { @@ -81,7 +80,7 @@ public class DoFnFunction this.sideInputs = sideInputs; this.serializedOptions = new SerializablePipelineOptions(options); this.windowingStrategy = windowingStrategy; -this.outputMap = outputMap; +this.additionalOutputTags = additionalOutputTags; this.mainOutputTag = mainOutputTag; this.inputCoder = inputCoder; this.outputCoderMap = outputCoderMap; @@ -93,8 +92,6 @@ public class DoFnFunction DoFnOutputManager outputManager = new DoFnOutputManager(); -List> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); - DoFnRunner doFnRunner = DoFnRunners.simpleRunner( serializedOptions.get(), diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index fbb6649..5c9cb16 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -32,7 +31,6 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; @@ -61,7 +59,7 @@ class ParDoTranslatorBatch public void translateTransform( PTransform, PCollectionTuple> transform, TranslationContext context) { -// Check for not-supported advanced features +// Check for not supported advanced features // TODO: add support of Splittable DoFn DoFn doFn = getDoFn(context); checkState( @@ -80,51 +78,13 @@ class ParDoTranslatorBatch final boolean hasSideInputs = sideInputs != null && sideInputs.size()
[beam] branch master updated: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new bc9aa73 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not new 702df1b Merge pull request #8257: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not bc9aa73 is described below commit bc9aa730009909d9c632fce669bff5ce25d9d81a Author: Jean-Baptiste Onofré AuthorDate: Tue Apr 9 17:15:21 2019 +0200 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 45 ++ .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java| 14 +++ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index e6f2699..8c824a8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -242,6 +242,8 @@ public class JdbcIO { @Nullable abstract DataSource getDataSource(); +abstract boolean isPoolingDataSource(); + abstract Builder builder(); @AutoValue.Builder @@ -258,14 +260,22 @@ public class JdbcIO { abstract Builder setDataSource(DataSource dataSource); + abstract Builder setPoolingDataSource(boolean poolingDataSource); + abstract DataSourceConfiguration build(); } public static DataSourceConfiguration create(DataSource dataSource) { + return create(dataSource, true); +} + +public static DataSourceConfiguration create( +DataSource dataSource, boolean isPoolingDataSource) { checkArgument(dataSource != null, "dataSource can not be null"); checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDataSource(dataSource) + .setPoolingDataSource(isPoolingDataSource) .build(); } @@ -284,6 +294,7 @@ public class JdbcIO { return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDriverClassName(driverClassName) .setUrl(url) + .setPoolingDataSource(true) .build(); } @@ -356,21 +367,25 @@ public class JdbcIO { current = basicDataSource; } - // wrapping the datasource as a pooling datasource - DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); - PoolableConnectionFactory poolableConnectionFactory = - new PoolableConnectionFactory(connectionFactory, null); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); - poolConfig.setMinIdle(0); - poolConfig.setMinEvictableIdleTimeMillis(1); - poolConfig.setSoftMinEvictableIdleTimeMillis(3); - GenericObjectPool connectionPool = - new GenericObjectPool(poolableConnectionFactory, poolConfig); - poolableConnectionFactory.setPool(connectionPool); - poolableConnectionFactory.setDefaultAutoCommit(false); - poolableConnectionFactory.setDefaultReadOnly(false); - return new PoolingDataSource(connectionPool); + if (isPoolingDataSource()) { +// wrapping the datasource as a pooling datasource +DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); +PoolableConnectionFactory poolableConnectionFactory = +new PoolableConnectionFactory(connectionFactory, null); +GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); +poolConfig.setMaxTotal(1); +poolConfig.setMinIdle(0); +poolConfig.setMinEvictableIdleTimeMillis(1); +poolConfig.setSoftMinEvictableIdleTimeMillis(3); +GenericObjectPool connectionPool = +new GenericObjectPool(poolableConnectionFactory, poolConfig); +poolableConnectionFactory.setPool(connectionPool); +poolableConnectionFactory.setDefaultAutoCommit(false); +poolableConnectionFactory.setDefaultReadOnly(false); +return new PoolingDataSource(connectionPool); + } else { +return current; + } } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 0e9127a..3e45363 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest
[beam] branch master updated: [BEAM-7056] Include partition keys in beam schema resolution
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f3b0ac7 [BEAM-7056] Include partition keys in beam schema resolution new 020a820 Merge pull request #8276: [BEAM-7056] Include partition keys in beam schema resolution f3b0ac7 is described below commit f3b0ac7caddd5935a1e2bd92522287ea17cb6ad0 Author: Jozef Vilcek AuthorDate: Thu Apr 11 11:44:07 2019 +0200 [BEAM-7056] Include partition keys in beam schema resolution --- .../beam/sdk/io/hcatalog/HCatalogBeamSchema.java | 7 ++- .../sdk/io/hcatalog/HCatalogBeamSchemaTest.java| 24 ++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java index 1b1705c..b43bc9e 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java @@ -18,13 +18,16 @@ package org.apache.beam.sdk.io.hcatalog; import com.sun.istack.Nullable; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; /** @@ -75,7 +78,9 @@ public class HCatalogBeamSchema { public Optional getTableSchema(String db, String table) { try { org.apache.hadoop.hive.metastore.api.Table metastoreTable = metastore.getTable(db, table); - Schema schema = SchemaUtils.toBeamSchema(metastoreTable.getSd().getCols()); + List fields = Lists.newArrayList(metastoreTable.getSd().getCols()); + fields.addAll(metastoreTable.getPartitionKeys()); + Schema schema = SchemaUtils.toBeamSchema(fields); return Optional.of(schema); } catch (NoSuchObjectException e) { return Optional.absent(); diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java index f993740..8342fba 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java @@ -38,6 +38,8 @@ import org.junit.rules.TemporaryFolder; /** Unit tests for {@link HCatalogBeamSchema}. */ public class HCatalogBeamSchemaTest implements Serializable { + + private static final String TEST_TABLE_PARTITIONED = TEST_TABLE + "_partitioned"; @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); private static EmbeddedMetastoreService service; @@ -87,6 +89,22 @@ public class HCatalogBeamSchemaTest implements Serializable { } @Test + public void testGetTableSchemaForPartitionedTable() throws Exception { +HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); +Schema schema = hcatSchema.getTableSchema(TEST_DATABASE, TEST_TABLE_PARTITIONED).get(); + +Schema expectedSchema = +Schema.builder() +.addNullableField("mycol1", Schema.FieldType.STRING) +.addNullableField("mycol2", Schema.FieldType.INT32) +.addNullableField("part1", Schema.FieldType.STRING) +.addNullableField("part2", Schema.FieldType.INT32) +.build(); + +assertEquals(expectedSchema, schema); + } + + @Test public void testDoesntHaveTable() throws Exception { HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); assertFalse(hcatSchema.getTableSchema(TEST_DATABASE, "non-existent-table").isPresent()); @@ -99,6 +117,12 @@ public class HCatalogBeamSchemaTest implements Serializable { private void reCreateTestTable() throws CommandNeedRetryException { service.executeQuery("drop table " + TEST_TABLE); +service.executeQuery("drop table " + TEST_TABLE_PARTITIONED); service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)"); +service.executeQuery( +"create table " ++ TEST_TABLE_PARTITIONED ++ "(mycol1 string, mycol2 int) " ++ "partitioned by (part1 string, part2 int)"); } }
[beam] branch master updated (0e9ab8d -> 8db095d)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0e9ab8d Merge pull request #8304: [BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests new 3602c27 [BEAM-7076] Update Spark runner to use spark version 2.4.1 new 6f3d85f [BEAM-7076] Multiple static analysis fixes on Spark runner new 8db095d Merge pull request #8305: [BEAM-7076] Update Spark runner to use spark version 2.4.1 The 20982 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../runners/spark/SparkNativePipelineVisitor.java | 5 ++-- .../beam/runners/spark/SparkPipelineResult.java| 6 ++--- .../org/apache/beam/runners/spark/SparkRunner.java | 6 ++--- .../runners/spark/SparkTransformOverrides.java | 2 +- .../apache/beam/runners/spark/TestSparkRunner.java | 2 +- .../runners/spark/aggregators/AggAccumParam.java | 2 +- .../spark/coders/StatelessJavaSerializer.java | 2 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../apache/beam/runners/spark/io/SourceRDD.java| 2 +- .../runners/spark/io/SparkUnboundedSource.java | 2 +- .../metrics/SparkMetricsContainerStepMap.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java| 2 +- .../spark/stateful/SparkStateInternals.java| 2 +- .../runners/spark/stateful/StateSpecFunctions.java | 2 +- .../spark/translation/EvaluationContext.java | 7 +++--- .../spark/translation/MultiDoFnFunction.java | 3 +-- .../ReifyTimestampsAndWindowsFunction.java | 2 +- .../spark/translation/SparkAbstractCombineFn.java | 18 +++--- .../spark/translation/SparkAssignWindowFn.java | 2 +- .../translation/SparkExecutableStageFunction.java | 2 +- .../spark/translation/SparkGlobalCombineFn.java| 3 +-- .../SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +- .../spark/translation/SparkKeyedCombineFn.java | 1 - .../spark/translation/SparkPCollectionView.java| 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../spark/translation/TransformTranslator.java | 3 --- .../spark/translation/streaming/Checkpoint.java| 4 ++-- .../streaming/StreamingTransformTranslator.java| 2 -- .../runners/spark/util/GlobalWatermarkHolder.java | 4 +--- .../runners/spark/util/SparkSideInputReader.java | 28 ++ .../beam/runners/spark/ClearWatermarksRule.java| 2 +- .../runners/spark/ProvidedSparkContextTest.java| 18 +++--- .../beam/runners/spark/SparkPipelineStateTest.java | 2 +- .../ResumeFromCheckpointStreamingTest.java | 5 ++-- .../streaming/utils/EmbeddedKafkaCluster.java | 9 +++ 36 files changed, 66 insertions(+), 96 deletions(-)
[beam] branch master updated: [BEAM-7080] Remove unused class KinesisUploader from KinesisIO
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3a5b70f [BEAM-7080] Remove unused class KinesisUploader from KinesisIO new 1a9faf5 Merge pull request #8308: [BEAM-7080] Remove unused class KinesisUploader from KinesisIO tests 3a5b70f is described below commit 3a5b70ffa3b873e4f04ba0bccb22b07fc4e4fec3 Author: Ismaël Mejía AuthorDate: Mon Apr 15 15:02:40 2019 +0200 [BEAM-7080] Remove unused class KinesisUploader from KinesisIO --- .../beam/sdk/io/kinesis/KinesisUploader.java | 77 -- 1 file changed, 77 deletions(-) diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java deleted file mode 100644 index f33815c..000 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.kinesis; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; - -/** Sends records to Kinesis in reliable way. */ -public class KinesisUploader { - - public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499; - - public static void uploadAll(List data, KinesisTestOptions options) { -AmazonKinesis client = -AmazonKinesisClientBuilder.standard() -.withCredentials( -new AWSStaticCredentialsProvider( -new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey( -.withRegion(options.getAwsKinesisRegion()) -.build(); - -List> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH); -for (List partition : partitions) { - List allRecords = new ArrayList<>(); - for (String row : partition) { -allRecords.add( -new PutRecordsRequestEntry() - .withData(ByteBuffer.wrap(row.getBytes(StandardCharsets.UTF_8))) -.withPartitionKey(Integer.toString(row.hashCode(; - } - - PutRecordsResult result; - do { -result = -client.putRecords( -new PutRecordsRequest() -.withStreamName(options.getAwsKinesisStream()) -.withRecords(allRecords)); -List failedRecords = new ArrayList<>(); -int i = 0; -for (PutRecordsResultEntry row : result.getRecords()) { - if (row.getErrorCode() != null) { -failedRecords.add(allRecords.get(i)); - } - ++i; -} -allRecords = failedRecords; - } while (result.getFailedRecordCount() > 0); -} - } -}
[beam] branch master updated (0447bf9 -> f8d59e4)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0447bf9 Merge pull request #8344: [BEAM-7106] Mention Spark on portability webpage new 975a407 [BEAM-7097] Upgrade MqttIO to use fusesource mqtt-client 1.15 new f8d59e4 Merge pull request #8333: [BEAM-7097] Upgrade MqttIO to use fusesource mqtt-client 1.15 The 21049 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/java/io/mqtt/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated: [BEAM-7090] Upgrade JdbcIO to use Commons DBCP 2.6.0
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5684a6d [BEAM-7090] Upgrade JdbcIO to use Commons DBCP 2.6.0 new 64ba213 Merge pull request #8326: [BEAM-7090] Upgrade JdbcIO to use Commons DBCP 2.6.0 5684a6d is described below commit 5684a6d56a46a034961e6660bf439bd5305f423e Author: Jean-Baptiste Onofré AuthorDate: Wed Apr 17 09:42:32 2019 +0200 [BEAM-7090] Upgrade JdbcIO to use Commons DBCP 2.6.0 --- sdks/java/io/jdbc/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle index 0747541..dc47d89 100644 --- a/sdks/java/io/jdbc/build.gradle +++ b/sdks/java/io/jdbc/build.gradle @@ -27,7 +27,7 @@ ext.summary = "IO to read and write on JDBC datasource." dependencies { shadow library.java.vendored_guava_20_0 shadow project(path: ":beam-sdks-java-core", configuration: "shadow") - shadow "org.apache.commons:commons-dbcp2:2.1.1" + shadow "org.apache.commons:commons-dbcp2:2.6.0" testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest") testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadow")
[beam] branch master updated (207f34e -> 14879ed)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 207f34e Merge pull request #8715: [BEAM-7442][BEAM-5650] Read sequentially from bounded sources in UnboundedSourceWrapper add fb51440 [BEAM-7357] KinesisIO: fix too many checks that writing stream exists. add 14879ed Merge pull request #8730: [BEAM-7357] KinesisIO: fix too many checks that writing stream exists. No new revisions were added by this update. Summary of changes: .../java/org/apache/beam/sdk/io/kinesis/KinesisIO.java | 13 - .../apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java | 16 +++- 2 files changed, 15 insertions(+), 14 deletions(-)
[beam] branch master updated (a16a5b7 -> 9cc27c3)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from a16a5b7 Merge pull request #8400: [BEAM-6627] Added byte and item counting metrics to integration tests add 0acb798 [BEAM-6627] Add item and byte counters to HadoopFormatIOIT add 9cc27c3 Merge pull request #8561: [BEAM-6627] Add item and byte counters to HadoopFormatIOIT No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java | 14 ++ 1 file changed, 14 insertions(+)
[beam] branch master updated: [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a06b7a6 [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616) a06b7a6 is described below commit a06b7a6cbad2386b7bee37d3be0d7f3f5b8bfaff Author: Ismaël Mejía AuthorDate: Mon May 20 10:52:52 2019 +0200 [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616) --- .../sdk/io/hadoop/format/HDFSSynchronization.java | 2 +- .../beam/sdk/io/hadoop/format/HadoopFormatIO.java | 122 + .../beam/sdk/io/hadoop/format/HadoopFormats.java | 4 +- .../format/ConfigurableEmployeeInputFormat.java| 2 +- .../apache/beam/sdk/io/hadoop/format/Employee.java | 10 +- .../io/hadoop/format/HDFSSynchronizationTest.java | 7 +- .../hadoop/format/HadoopFormatIOCassandraTest.java | 2 - .../hadoop/format/HadoopFormatIOElasticTest.java | 6 +- .../sdk/io/hadoop/format/HadoopFormatIOIT.java | 2 +- .../io/hadoop/format/HadoopFormatIOReadTest.java | 4 - .../format/HadoopFormatIOSequenceFileTest.java | 18 +-- .../format/ReuseObjectsEmployeeInputFormat.java| 2 +- .../sdk/io/hadoop/format/TestEmployeeDataSet.java | 2 +- .../sdk/io/hadoop/format/TestRowDBWritable.java| 5 +- 14 files changed, 72 insertions(+), 116 deletions(-) diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java index 5a73ea1..60f60b2 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java @@ -180,7 +180,7 @@ public class HDFSSynchronization implements ExternalSynchronization { * @param exception type */ @FunctionalInterface - public interface ThrowingFunction extends Serializable { + interface ThrowingFunction extends Serializable { T2 apply(T1 value) throws X; } } diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java index 8819024..b39ec80 100644 --- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java @@ -226,7 +226,7 @@ import org.slf4j.LoggerFactory; * {@code mapreduce.job.output.value.class}: The value class passed to the {@link * OutputFormat} in {@code mapreduce.job.outputformat.class}. * {@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of write - * tasks which will be genarated. This property is not required for {@link + * tasks which will be generated. This property is not required for {@link * Write.PartitionedWriterBuilder#withoutPartitioning()} write. * {@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used for * distributing of records among partitions. This property is not required for {@link @@ -296,7 +296,7 @@ import org.slf4j.LoggerFactory; */ @Experimental(Experimental.Kind.SOURCE_SINK) public class HadoopFormatIO { - private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFormatIO.class); + private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class); /** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */ public static final String OUTPUT_FORMAT_CLASS_ATTR = MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR; @@ -439,7 +439,7 @@ public class HadoopFormatIO { // Sets key class to key translation function's output class type. return toBuilder() .setKeyTranslationFunction(function) - .setKeyTypeDescriptor((TypeDescriptor) function.getOutputTypeDescriptor()) + .setKeyTypeDescriptor(function.getOutputTypeDescriptor()) .build(); } @@ -449,7 +449,7 @@ public class HadoopFormatIO { // Sets value class to value translation function's output class type. return toBuilder() .setValueTranslationFunction(function) - .setValueTypeDescriptor((TypeDescriptor) function.getOutputTypeDescriptor()) + .setValueTypeDescriptor(function.getOutputTypeDescriptor()) .build(); } @@ -633,25 +633,24 @@ public class HadoopFormatIO { // desiredBundleSizeBytes is not being considered as splitting based on this // value is not supported by inputFormat getSplits() method. if (inputSplit != null) { -LOGGER.info("Not splitting source {} because source is already
[beam] branch master updated: [BEAM-7360] Fix static analysis issues for HCatalogIO (#8617)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 099fad8 [BEAM-7360] Fix static analysis issues for HCatalogIO (#8617) 099fad8 is described below commit 099fad8c47279d71712bcf74d3d2f191c1888a43 Author: Ismaël Mejía AuthorDate: Mon May 20 11:10:30 2019 +0200 [BEAM-7360] Fix static analysis issues for HCatalogIO (#8617) --- .../java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java | 2 +- .../apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java | 2 +- .../org/apache/beam/sdk/io/hcatalog/HCatalogIO.java | 6 +++--- .../beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java| 13 ++--- .../org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java | 17 - 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index 3afc3a0..f76b559 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -36,7 +36,7 @@ public class HCatToRow { * If there is a mismatch between the schema specified here and actual record schema, or * internal representation and schema, then runtime errors will happen. */ - public static PTransform, PCollection> forSchema( + private static PTransform, PCollection> forSchema( Schema schema) { return ParDo.of(new HCatToRowFn(schema)); } diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java index b43bc9e..9aaad6d 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @Experimental public class HCatalogBeamSchema { - private @Nullable IMetaStoreClient metastore; + private @Nullable final IMetaStoreClient metastore; private HCatalogBeamSchema(IMetaStoreClient metastore) { this.metastore = metastore; diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java index c742185..73518f6 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java @@ -312,7 +312,7 @@ public class HCatalogIO { private HCatRecord current; private Iterator hcatIterator; - public BoundedHCatalogReader(BoundedHCatalogSource source) { + BoundedHCatalogReader(BoundedHCatalogSource source) { this.source = source; } @@ -432,7 +432,7 @@ public class HCatalogIO { private HCatWriter masterWriter; private List hCatRecordsBatch; - public WriteFn(Write spec) { + WriteFn(Write spec) { this.spec = spec; } @@ -495,7 +495,7 @@ public class HCatalogIO { } @Teardown - public void tearDown() throws Exception { + public void tearDown() { if (slaveWriter != null) { slaveWriter = null; } diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java index 3fbb016..18d25ae 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService; import org.apache.beam.sdk.schemas.Schema; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -66,19 +65,19 @@ public class HCatalogBeamSchemaTest implements Serializable { } @Test - public void testHasDB() throws Exception { + public void testHasDB() { HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); assertTrue(hcatSchema.hasDatabase(TEST_DATABASE)); } @Test - public void testDoesntHaveDB() throws Exception { + public void testDoesntHaveDB() { HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); asser
[beam] 01/01: Merge pull request #8548: [BEAM-7265] Update Spark runner to use spark version 2.4.3
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 6679b00138a5b82a6a55e7bc94c453957cea501c Merge: 765fe3b ac48aab Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Fri May 10 11:40:27 2019 +0200 Merge pull request #8548: [BEAM-7265] Update Spark runner to use spark version 2.4.3 buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (765fe3b -> 6679b00)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 765fe3b Merge pull request #8547 from iemejia/BEAM-7263-jdbc-deprecations add ac48aab [BEAM-7265] Update Spark runner to use spark version 2.4.3 new 6679b00 Merge pull request #8548: [BEAM-7265] Update Spark runner to use spark version 2.4.3 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (8da20c2 -> 74899d6)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 8da20c2 Merge pull request #8632 from youngoli/beam7373 add cabbce6 [BEAM-7286] RedisIO support for INCRBY/DECRBY operations new 74899d6 Merge pull request: [BEAM-7286] RedisIO support for INCRBY/DECRBY operations The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 26 +- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 31 ++ 2 files changed, 56 insertions(+), 1 deletion(-)
[beam] 01/01: Merge pull request: [BEAM-7286] RedisIO support for INCRBY/DECRBY operations
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 74899d6c762bc7f7cd6ddf6561d3acb24966a9df Merge: 8da20c2 cabbce6 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Tue May 21 12:07:53 2019 +0200 Merge pull request: [BEAM-7286] RedisIO support for INCRBY/DECRBY operations .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 26 +- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 31 ++ 2 files changed, 56 insertions(+), 1 deletion(-)
[beam] branch master updated (71759cb -> 3e976e6)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 71759cb Merge pull request #8633: [BEAM-7378] Fix Python PVR tests by adding a Kafka client dependency add 385f842 [BEAM-7378] Add trigger to Python Flink PVR tests for Kafka module new 3e976e6 Merge pull request #8634: [BEAM-7378] Add trigger to Python Flink PVR tests for Kafka module The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .test-infra/jenkins/job_PreCommit_Python_ValidatesRunner_Flink.groovy | 2 ++ 1 file changed, 2 insertions(+)
[beam] 01/01: Merge pull request #8634: [BEAM-7378] Add trigger to Python Flink PVR tests for Kafka module
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 3e976e6207fe34efde4595930bb29678eef12c7e Merge: 71759cb 385f842 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Tue May 21 14:20:02 2019 +0200 Merge pull request #8634: [BEAM-7378] Add trigger to Python Flink PVR tests for Kafka module .test-infra/jenkins/job_PreCommit_Python_ValidatesRunner_Flink.groovy | 2 ++ 1 file changed, 2 insertions(+)
[beam] branch master updated (bb046df -> 6ab90e9)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from bb046df Merge pull request #8610 [BEAM-7154] Fixing small error bug. add dabf9c4 [BEAM-7367] Add Kafka clients jar as a Docker dependency new 6ab90e9 Merge pull request #8625: [BEAM-7367] Add Kafka clients jar as a Docker dependency The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/java/container/build.gradle | 2 ++ 1 file changed, 2 insertions(+)
[beam] 01/01: Merge pull request #8625: [BEAM-7367] Add Kafka clients jar as a Docker dependency
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 6ab90e9a28132d1f0115815fdedb7573d3eca16d Merge: bb046df dabf9c4 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Mon May 20 17:14:53 2019 +0200 Merge pull request #8625: [BEAM-7367] Add Kafka clients jar as a Docker dependency sdks/java/container/build.gradle | 2 ++ 1 file changed, 2 insertions(+)
[beam] 01/01: Merge pull request #8513: [BEAM-7240] Kinesis IO Watermark Computation Improvements
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 288c95fdd1ac49e1c6cc336145b5f3a9cc4bf395 Merge: 2fbb1ee 9fe0a03 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Thu May 16 17:42:13 2019 +0200 Merge pull request #8513: [BEAM-7240] Kinesis IO Watermark Computation Improvements sdks/java/io/kinesis/build.gradle | 1 + .../org/apache/beam/sdk/io/kinesis/KinesisIO.java | 89 .../apache/beam/sdk/io/kinesis/KinesisReader.java | 21 ++- .../apache/beam/sdk/io/kinesis/KinesisSource.java | 7 + .../beam/sdk/io/kinesis/KinesisWatermark.java | 116 .../beam/sdk/io/kinesis/ShardReadersPool.java | 27 ++-- .../beam/sdk/io/kinesis/ShardRecordsIterator.java | 24 +++- .../beam/sdk/io/kinesis/WatermarkParameters.java | 98 + .../beam/sdk/io/kinesis/WatermarkPolicy.java | 29 .../sdk/io/kinesis/WatermarkPolicyFactory.java | 152 + .../beam/sdk/io/kinesis/KinesisMockReadTest.java | 1 + .../beam/sdk/io/kinesis/KinesisReaderTest.java | 41 ++ .../beam/sdk/io/kinesis/KinesisWatermarkTest.java | 138 --- .../beam/sdk/io/kinesis/ShardReadersPoolTest.java | 61 + .../sdk/io/kinesis/ShardRecordsIteratorTest.java | 16 ++- .../beam/sdk/io/kinesis/WatermarkPolicyTest.java | 152 + 16 files changed, 639 insertions(+), 334 deletions(-)
[beam] branch master updated (2fbb1ee -> 288c95f)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 2fbb1ee [BEAM-7287] Change JAVA_HOME to match new Jenkins agents (#8584) add 9fe0a03 Modified KinesisIO to use WatermarkPolicyFactory for watermark computation. Added tests new 288c95f Merge pull request #8513: [BEAM-7240] Kinesis IO Watermark Computation Improvements The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/java/io/kinesis/build.gradle | 1 + .../org/apache/beam/sdk/io/kinesis/KinesisIO.java | 89 .../apache/beam/sdk/io/kinesis/KinesisReader.java | 21 ++- .../apache/beam/sdk/io/kinesis/KinesisSource.java | 7 + .../beam/sdk/io/kinesis/KinesisWatermark.java | 116 .../beam/sdk/io/kinesis/ShardReadersPool.java | 27 ++-- .../beam/sdk/io/kinesis/ShardRecordsIterator.java | 24 +++- .../beam/sdk/io/kinesis/WatermarkParameters.java | 98 + ...inesisPartitioner.java => WatermarkPolicy.java} | 10 +- .../sdk/io/kinesis/WatermarkPolicyFactory.java | 152 + .../beam/sdk/io/kinesis/KinesisMockReadTest.java | 1 + .../beam/sdk/io/kinesis/KinesisReaderTest.java | 41 ++ .../beam/sdk/io/kinesis/KinesisWatermarkTest.java | 138 --- .../beam/sdk/io/kinesis/ShardReadersPoolTest.java | 61 + .../sdk/io/kinesis/ShardRecordsIteratorTest.java | 16 ++- .../beam/sdk/io/kinesis/WatermarkPolicyTest.java | 152 + 16 files changed, 616 insertions(+), 338 deletions(-) delete mode 100644 sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisWatermark.java create mode 100644 sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java copy sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/{KinesisPartitioner.java => WatermarkPolicy.java} (78%) create mode 100644 sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java delete mode 100644 sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisWatermarkTest.java create mode 100644 sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
[beam] branch master updated (288c95f -> 07d8d5c)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 288c95f Merge pull request #8513: [BEAM-7240] Kinesis IO Watermark Computation Improvements add f3dc7dd [website] Add design doc on portable representation of schemas new 07d8d5c Merge pull request #8591: [website] Add design doc on Portable Beam Schemas The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: website/src/contribute/design-documents.md | 1 + 1 file changed, 1 insertion(+)
[beam] 01/01: Merge pull request #8591: [website] Add design doc on Portable Beam Schemas
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 07d8d5c60a64bd9aed713d25879534d4d291a96e Merge: 288c95f f3dc7dd Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Thu May 16 17:46:59 2019 +0200 Merge pull request #8591: [website] Add design doc on Portable Beam Schemas website/src/contribute/design-documents.md | 1 + 1 file changed, 1 insertion(+)
[beam] 01/01: Merge pull request #8595: [BEAM-7230] Add javadoc on provided poolable data sources on JdbcIO
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit b57084215539862e0567690ad4fda9b3ec6eaf55 Merge: 07d8d5c feaec61 Author: Alexey Romanenko <33895511+aromanenko-...@users.noreply.github.com> AuthorDate: Thu May 16 18:01:07 2019 +0200 Merge pull request #8595: [BEAM-7230] Add javadoc on provided poolable data sources on JdbcIO .../io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java| 5 + 1 file changed, 5 insertions(+)
[beam] branch master updated (07d8d5c -> b570842)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 07d8d5c Merge pull request #8591: [website] Add design doc on Portable Beam Schemas add feaec61 [BEAM-7230] Add javadoc on provided poolable data sources on JdbcIO new b570842 Merge pull request #8595: [BEAM-7230] Add javadoc on provided poolable data sources on JdbcIO The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java| 5 + 1 file changed, 5 insertions(+)