[beam] branch master updated: [BEAM-12128] replace usage of snippets_test_py3.py to snippets_test.py (#14488)
This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f49e097 [BEAM-12128] replace usage of snippets_test_py3.py to snippets_test.py (#14488) f49e097 is described below commit f49e09791bbb372667da6217257a35d61ff1585e Author: yoshiki.obata <1285728+lazyl...@users.noreply.github.com> AuthorDate: Fri Apr 9 13:31:27 2021 +0900 [BEAM-12128] replace usage of snippets_test_py3.py to snippets_test.py (#14488) --- .../www/site/content/en/documentation/sdks/python-type-safety.md | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/website/www/site/content/en/documentation/sdks/python-type-safety.md b/website/www/site/content/en/documentation/sdks/python-type-safety.md index c4602a1..e4f0b50 100644 --- a/website/www/site/content/en/documentation/sdks/python-type-safety.md +++ b/website/www/site/content/en/documentation/sdks/python-type-safety.md @@ -95,7 +95,7 @@ Annotations are currently supported on: The following code declares an `int` input and a `str` output type hint on the `to_id` transform, using annotations on `my_fn`. {{< highlight py >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_map_annotations >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_map_annotations >}} {{< /highlight >}} The following code demonstrates how to use annotations on `PTransform` subclasses. @@ -104,7 +104,7 @@ The following code declares typehints on a custom PTransform, that takes a `PCol and outputs a `PCollection[str]`, using annotations. {{< highlight py >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_ptransforms >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_ptransforms >}} {{< /highlight >}} The following code declares `int` input and output type hints on `filter_evens`, using annotations on `FilterEvensDoFn.process`. @@ -114,7 +114,7 @@ It is an error to have a non-iterable return type annotation for these functions Other supported iterable types include: `Iterator`, `Generator`, `Tuple`, `List`. {{< highlight py >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_do_fn_annotations >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_do_fn_annotations >}} {{< /highlight >}} The following code declares `int` input and output type hints on `double_evens`, using annotations on `FilterEvensDoubleDoFn.process`. @@ -122,7 +122,7 @@ Since `process` returns a `list` or `None`, the output type is annotated as `Opt Beam will also remove the outer `Optional` and (as above) the outer iterable of the return type, only on the `DoFn.process` method and functions passed to `FlatMap`. {{< highlight py >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_do_fn_annotations_optional >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_do_fn_annotations_optional >}} {{< /highlight >}} ### Declaring Type Hints Inline
[beam] tag nightly-master updated (e4e39e4 -> a48abeb)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to tag nightly-master in repository https://gitbox.apache.org/repos/asf/beam.git. *** WARNING: tag nightly-master was modified! *** from e4e39e4 (commit) to a48abeb (commit) from e4e39e4 Merge pull request #14368 from KevinGG/portable_pin_2 add 572fef9 [BEAM-7372] remove usage of future package and unnecessary builtins import from internal and metrics (#14445) add cb31b7b [BEAM-7372] cleanup codes for py2 compatibility from apache_beam/examples/snippets/*.py and apache_beam/examples/*.py (#1) add 05b3fd3 [BEAM-11948] Drop support for Flink 1.8 and 1.9 add 572a99b Merge pull request #14203: [BEAM-11948] Drop support for Flink 1.8 and 1.9 add 9601bde [BEAM-11227] Try reverting #14295: Moving from vendored gRPC 1.26 to 1.36 (#14466) add 2cca8f1 [BEAM-12092] Bump jedis to 3.5.2 add e6767c1 Merge pull request #14471: [BEAM-12092] Bump jedis to 3.5.2 add 27739f9 [BEAM-10925] Refactor ZetaSqlJavaUdfTypeTest. add 0f955b4 Merge pull request #14462 from ibzib/java-udf-types add 2dcb7da [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads add 58bd73c Merge pull request #14469 from [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads add 961789e SDF bounded wrapper returns None when any exception happen in the calculation. add dada0f9 Merge pull request #14439 from boyuanzz/fix_py add a696836 Merge pull request #14446 from [BEAM-10854] Fix PeriodicImpulse for default values add c557567 [BEAM-12012] Add API key & token authentication in ElasticsearchIO add 850e4af Simplify arguments checks add 747e3a9 Merge pull request #14345 from fcaylus/12012-elasticsearch_io_api_key_and_token_auth add b5a8b54 Turn on mpyp checks for filesystem (#14425) add c1035ab [BEAM-12112] Disable streaming mode for PORTABILITY_BATCH (#14452) add fd42d77 [BEAM-9547] Implementations for a few more DataFrame operations (#14362) add 5820268 Complex Type Passthrough Test add f5960f4 Don't use base types in BeamCalcRel add 450bbb1 Use correct schema geters, enforce types add 891b731 Rename functions, add comments add 567cf8b Merge pull request #13930: [BEAM-9379] Simplify BeamCalcRel inputs add c472530 Change PubSubSource and PubSubSink translation to avoid special transform overrides. add a48abeb Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides No new revisions were added by this update. Summary of changes: CHANGES.md | 1 + .../org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +- ...g_1_36_0.groovy => GrpcVendoring_1_26_0.groovy} | 75 -- examples/java/build.gradle | 2 +- .../transforms/DataProtectors.java | 6 +- .../kafkatopubsub/kafka/consumer/Utils.java| 4 +- gradle.properties | 2 +- .../pipeline/src/main/proto/beam_runner_api.proto | 4 + runners/core-construction-java/build.gradle| 2 +- .../beam/runners/core/construction/BeamUrns.java | 2 +- .../core/construction/CoderTranslation.java| 2 +- .../core/construction/CoderTranslators.java| 2 +- .../core/construction/CombineTranslation.java | 2 +- .../CreatePCollectionViewTranslation.java | 2 +- .../DefaultExpansionServiceClientFactory.java | 2 +- .../core/construction/DisplayDataTranslation.java | 2 +- .../runners/core/construction/Environments.java| 4 +- .../beam/runners/core/construction/External.java | 6 +- .../runners/core/construction/ModelCoders.java | 2 +- .../construction/PCollectionViewTranslation.java | 2 +- .../core/construction/ParDoTranslation.java| 4 +- .../construction/PipelineOptionsTranslation.java | 6 +- .../runners/core/construction/ReadTranslation.java | 4 +- .../core/construction/TestStreamTranslation.java | 2 +- .../core/construction/WindowIntoTranslation.java | 2 +- .../construction/WindowingStrategyTranslation.java | 8 +- .../core/construction/WriteFilesTranslation.java | 2 +- .../graph/GreedyPCollectionFusers.java | 2 +- .../core/construction/graph/QueryablePipeline.java | 2 +- .../runners/core/construction/CommonCoderTest.java | 2 +- .../PipelineOptionsTranslationTest.java| 6 +- .../core/construction/ValidateRunnerXlangTest.java | 8 +- .../construction/WindowIntoTranslationTest.java| 2 +- .../construction/graph/ProtoOverridesTest.java | 2 +- runners/core-java/build.gradle | 2 +- .../core/metrics/MetricsContainerStepMap.java | 4 +- .../core/metrics/MonitoringInfoEncodings.java | 2 +-
[beam] branch master updated: Change PubSubSource and PubSubSink translation to avoid special transform overrides.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c472530 Change PubSubSource and PubSubSink translation to avoid special transform overrides. new a48abeb Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides c472530 is described below commit c4725301da8f1fbc3982bca986f4d9a1b9a4ce19 Author: Boyuan Zhang AuthorDate: Tue Mar 30 18:00:59 2021 -0700 Change PubSubSource and PubSubSink translation to avoid special transform overrides. --- .../pipeline/src/main/proto/beam_runner_api.proto | 4 + .../beam/runners/dataflow/DataflowRunner.java | 180 +--- sdks/java/io/google-cloud-platform/build.gradle| 3 +- .../io/gcp/pubsub/PubSubPayloadTranslation.java| 159 + .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 78 ++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 73 +--- .../sdk/io/gcp/pubsub/RunnerImplementedSink.java | 68 .../pubsub/RunnerImplementedSinkTranslation.java | 87 -- .../sdk/io/gcp/pubsub/RunnerImplementedSource.java | 83 - .../pubsub/RunnerImplementedSourceTranslation.java | 102 --- java => PubSubReadPayloadTranslationTest.java} | 189 +++-- ...java => PubSubWritePayloadTranslationTest.java} | 37 ++-- .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java| 12 +- .../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 80 +++-- 14 files changed, 450 insertions(+), 705 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 134fcb6..138e352 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -696,6 +696,8 @@ message WriteFilesPayload { // with a native implementation. // The SDK should guarantee that only one of topic, subscription, // topic_runtime_overridden and subscription_runtime_overridden is set. +// The output of PubSubReadPayload should be bytes of serialized PubsubMessage +// proto if with_attributes == true. Otherwise, the bytes is the raw payload. message PubSubReadPayload { // Topic to read from. Exactly one of topic or subscription should be set. @@ -727,6 +729,8 @@ message PubSubReadPayload { // with a native implementation. // The SDK should guarantee that only one of topic and topic_runtime_overridden // is set. +// The output of PubSubWritePayload should be bytes if serialized PubsubMessage +// proto. message PubSubWritePayload { // Topic to write to. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a06951f..cdb7e67 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -95,7 +95,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; @@ -113,12 +112,8 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSink; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -137,11 +132,9 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import
[beam] branch master updated (fd42d77 -> 567cf8b)
This is an automated email from the ASF dual-hosted git repository. apilloud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from fd42d77 [BEAM-9547] Implementations for a few more DataFrame operations (#14362) add 5820268 Complex Type Passthrough Test add f5960f4 Don't use base types in BeamCalcRel add 450bbb1 Use correct schema geters, enforce types add 891b731 Rename functions, add comments add 567cf8b Merge pull request #13930: [BEAM-9379] Simplify BeamCalcRel inputs No new revisions were added by this update. Summary of changes: .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 268 ++--- .../sdk/extensions/sql/BeamComplexTypeTest.java| 45 +++- 2 files changed, 213 insertions(+), 100 deletions(-)
[beam] branch master updated (c1035ab -> fd42d77)
This is an automated email from the ASF dual-hosted git repository. bhulette pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from c1035ab [BEAM-12112] Disable streaming mode for PORTABILITY_BATCH (#14452) add fd42d77 [BEAM-9547] Implementations for a few more DataFrame operations (#14362) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/dataframe/frames.py| 44 -- sdks/python/apache_beam/dataframe/frames_test.py | 40 .../apache_beam/dataframe/pandas_doctests_test.py | 4 +- 3 files changed, 81 insertions(+), 7 deletions(-)
[beam] branch master updated (b5a8b54 -> c1035ab)
This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b5a8b54 Turn on mpyp checks for filesystem (#14425) add c1035ab [BEAM-12112] Disable streaming mode for PORTABILITY_BATCH (#14452) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 6 ++ 1 file changed, 6 insertions(+)
[beam] branch master updated (747e3a9 -> b5a8b54)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 747e3a9 Merge pull request #14345 from fcaylus/12012-elasticsearch_io_api_key_and_token_auth add b5a8b54 Turn on mpyp checks for filesystem (#14425) No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/io/filesystem.py | 18 -- sdks/python/mypy.ini | 3 +++ 2 files changed, 11 insertions(+), 10 deletions(-)
[beam] branch master updated (a696836 -> 747e3a9)
This is an automated email from the ASF dual-hosted git repository. heejong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from a696836 Merge pull request #14446 from [BEAM-10854] Fix PeriodicImpulse for default values new c557567 [BEAM-12012] Add API key & token authentication in ElasticsearchIO new 850e4af Simplify arguments checks new 747e3a9 Merge pull request #14345 from fcaylus/12012-elasticsearch_io_api_key_and_token_auth The 31448 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 47 +- 1 file changed, 45 insertions(+), 2 deletions(-)
[beam] branch release-2.29.0 updated: [BEAM-11932] Rename service_options to dataflow_service_options.
This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch release-2.29.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.29.0 by this push: new cbcf5ac [BEAM-11932] Rename service_options to dataflow_service_options. new 35010c1 Merge pull request #14461 from [BEAM-11932] Rename service_options to dataflow_service_options. cbcf5ac is described below commit cbcf5ac3a44d7f74323b8b7aed5b2016e30f09ca Author: Robert Bradshaw AuthorDate: Fri Apr 2 10:13:50 2021 -0700 [BEAM-11932] Rename service_options to dataflow_service_options. --- .../dataflow/DataflowPipelineTranslator.java | 2 +- .../dataflow/options/DataflowPipelineOptions.java | 4 ++-- .../dataflow/DataflowPipelineTranslatorTest.java | 8 +++ .../python/apache_beam/options/pipeline_options.py | 6 +++--- .../apache_beam/options/pipeline_options_test.py | 25 +++--- .../runners/dataflow/internal/apiclient.py | 4 ++-- .../runners/dataflow/internal/apiclient_test.py| 4 ++-- 7 files changed, 31 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0fd8d80..c096692 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -334,7 +334,7 @@ public class DataflowPipelineTranslator { Environment environment = new Environment(); job.setEnvironment(environment); - job.getEnvironment().setServiceOptions(options.getServiceOptions()); + job.getEnvironment().setServiceOptions(options.getDataflowServiceOptions()); WorkerPool workerPool = new WorkerPool(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 8434ff7..7d3be45 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -115,9 +115,9 @@ public interface DataflowPipelineOptions @Description( "Service options are set by the user and configure the service. This " + "decouples service side feature availability from the Apache Beam release cycle.") - List getServiceOptions(); + List getDataflowServiceOptions(); - void setServiceOptions(List options); + void setDataflowServiceOptions(List options); /** Run the job as a specific service account, instead of the default GCE robot. */ @Hidden diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8015ab3..b296f6e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -1293,12 +1293,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { } @Test - public void testServiceOptionsSet() throws IOException { -final List serviceOptions = + public void testDataflowServiceOptionsSet() throws IOException { +final List dataflowServiceOptions = Stream.of("whizz=bang", "foo=bar").collect(Collectors.toList()); DataflowPipelineOptions options = buildPipelineOptions(); -options.setServiceOptions(serviceOptions); +options.setDataflowServiceOptions(dataflowServiceOptions); Pipeline p = buildPipeline(options); p.traverseTopologically(new RecordingPipelineVisitor()); @@ -1314,7 +1314,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { Collections.emptyList()) .getJob(); -assertEquals(serviceOptions, job.getEnvironment().getServiceOptions()); +assertEquals(dataflowServiceOptions, job.getEnvironment().getServiceOptions()); } private static void assertAllStepOutputsHaveUniqueIds(Job job) throws Exception { diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 181b536..ebc5f40 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++
[beam] branch master updated (dada0f9 -> a696836)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from dada0f9 Merge pull request #14439 from boyuanzz/fix_py add a696836 Merge pull request #14446 from [BEAM-10854] Fix PeriodicImpulse for default values No new revisions were added by this update. Summary of changes: .../apache_beam/transforms/periodicsequence.py | 8 .../apache_beam/transforms/periodicsequence_test.py | 20 2 files changed, 28 insertions(+)
[beam] branch master updated: SDF bounded wrapper returns None when any exception happen in the calculation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 961789e SDF bounded wrapper returns None when any exception happen in the calculation. new dada0f9 Merge pull request #14439 from boyuanzz/fix_py 961789e is described below commit 961789eded3fad003f8b8d5b3d16d88892d33a40 Author: Boyuan Zhang AuthorDate: Mon Apr 5 18:53:21 2021 -0700 SDF bounded wrapper returns None when any exception happen in the calculation. --- sdks/python/apache_beam/io/iobase.py | 60 +-- sdks/python/apache_beam/io/iobase_test.py | 13 +++ 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 521da25..71d8037 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1499,33 +1499,39 @@ class _SDFBoundedSourceRestriction(object): return self._source_bundle.source def try_split(self, fraction_of_remainder): -consumed_fraction = self.range_tracker().fraction_consumed() -fraction = ( -consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder) -position = self.range_tracker().position_at_fraction(fraction) -# Need to stash current stop_pos before splitting since -# range_tracker.split will update its stop_pos if splits -# successfully. -stop_pos = self._source_bundle.stop_position -split_result = self.range_tracker().try_split(position) -if split_result: - split_pos, split_fraction = split_result - primary_weight = self._source_bundle.weight * split_fraction - residual_weight = self._source_bundle.weight - primary_weight - # Update self to primary weight and end position. - self._source_bundle = SourceBundle( - primary_weight, - self._source_bundle.source, - self._source_bundle.start_position, - split_pos) - return ( - self, - _SDFBoundedSourceRestriction( - SourceBundle( - residual_weight, - self._source_bundle.source, - split_pos, - stop_pos))) +try: + consumed_fraction = self.range_tracker().fraction_consumed() + fraction = ( + consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder) + position = self.range_tracker().position_at_fraction(fraction) + # Need to stash current stop_pos before splitting since + # range_tracker.split will update its stop_pos if splits + # successfully. + stop_pos = self._source_bundle.stop_position + split_result = self.range_tracker().try_split(position) + if split_result: +split_pos, split_fraction = split_result +primary_weight = self._source_bundle.weight * split_fraction +residual_weight = self._source_bundle.weight - primary_weight +# Update self to primary weight and end position. +self._source_bundle = SourceBundle( +primary_weight, +self._source_bundle.source, +self._source_bundle.start_position, +split_pos) +return ( +self, +_SDFBoundedSourceRestriction( +SourceBundle( +residual_weight, +self._source_bundle.source, +split_pos, +stop_pos))) +except Exception: + # For any exceptions from underlying trySplit calls, the wrapper will + # think that the source refuses to split at this point. In this case, + # no split happens at the wrapper level. + return None class _SDFBoundedSourceRestrictionTracker(RestrictionTracker): diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index 303cb68..bde0566 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -27,6 +27,7 @@ import apache_beam as beam from apache_beam.io.concat_source import ConcatSource from apache_beam.io.concat_source_test import RangeSource from apache_beam.io import iobase +from apache_beam.io import range_trackers from apache_beam.io.iobase import SourceBundle from apache_beam.options.pipeline_options import DebugOptions from apache_beam.testing.util import assert_that @@ -181,6 +182,18 @@ class SDFBoundedSourceRestrictionTrackerTest(unittest.TestCase): actual_primary._source_bundle.weight, self.sdf_restriction_tracker.current_restriction().weight()) + def test_try_split_with_any_exception(self): +source_bundle = SourceBundle( +range_trackers.OffsetRangeTracker.OFFSET_INFINITY, +RangeSource(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY), +0, +
[beam] branch master updated: [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2dcb7da [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads new 58bd73c Merge pull request #14469 from [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads 2dcb7da is described below commit 2dcb7da3add3db01d0fbd14b9b64ea8636eda325 Author: Steve Niemitz AuthorDate: Thu Apr 8 00:16:52 2021 -0400 [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads --- .../src/main/java/org/apache/beam/runners/direct/DirectRunner.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 3404fa5..a9a154e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -184,7 +184,6 @@ public class DirectRunner extends PipelineRunner { DisplayDataValidator.validatePipeline(pipeline); DisplayDataValidator.validateOptions(options); - SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); ExecutorService metricsPool = Executors.newCachedThreadPool( @@ -253,6 +252,8 @@ public class DirectRunner extends PipelineRunner { // The last set of overrides includes GBK overrides used in WriteView pipeline.replaceAll(groupByKeyOverrides()); + + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } @SuppressWarnings("rawtypes")
[beam] branch master updated: [BEAM-10925] Refactor ZetaSqlJavaUdfTypeTest.
This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 27739f9 [BEAM-10925] Refactor ZetaSqlJavaUdfTypeTest. new 0f955b4 Merge pull request #14462 from ibzib/java-udf-types 27739f9 is described below commit 27739f959c86018f496841bfc047b8d9e721108e Author: Kyle Weaver AuthorDate: Wed Apr 7 10:19:09 2021 -0700 [BEAM-10925] Refactor ZetaSqlJavaUdfTypeTest. Remove unnecessary throws clause and register all UDFs up front. --- .../sql/zetasql/ZetaSqlJavaUdfTypeTest.java| 277 ++--- 1 file changed, 126 insertions(+), 151 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java index e45b832..d9a9ae4 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql.zetasql; -import java.lang.reflect.Method; import org.apache.beam.sdk.extensions.sql.BeamSqlUdf; import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection; import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; @@ -32,6 +31,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -51,12 +51,6 @@ public class ZetaSqlJavaUdfTypeTest extends ZetaSqlTestBase { @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); - private Method boolUdf; - private Method longUdf; - private Method stringUdf; - private Method bytesUdf; - private Method doubleUdf; - private static final TestBoundedTable table = TestBoundedTable.of( Schema.builder() @@ -111,21 +105,39 @@ public class ZetaSqlJavaUdfTypeTest extends ZetaSqlTestBase { public void setUp() throws NoSuchMethodException { initialize(); -// Add BeamJavaUdfCalcRule to planner to enable UDFs. +// Register test table. +JdbcConnection jdbcConnection = +JdbcDriver.connect( +new ReadOnlyTableProvider("table_provider", ImmutableMap.of("table", table)), +PipelineOptionsFactory.create()); + +// Register UDFs. +SchemaPlus schema = jdbcConnection.getCurrentSchemaPlus(); +schema.add( +"test_boolean", +ScalarFunctionImpl.create(BooleanIdentityFn.class.getMethod("eval", Boolean.class))); +schema.add( +"test_int64", +ScalarFunctionImpl.create(Int64IdentityFn.class.getMethod("eval", Long.class))); +schema.add( +"test_string", +ScalarFunctionImpl.create(StringIdentityFn.class.getMethod("eval", String.class))); +schema.add( +"test_bytes", +ScalarFunctionImpl.create(BytesIdentityFn.class.getMethod("eval", byte[].class))); +schema.add( +"test_float64", +ScalarFunctionImpl.create(DoubleIdentityFn.class.getMethod("eval", Double.class))); + this.config = Frameworks.newConfigBuilder(config) +.defaultSchema(schema) +// Add BeamJavaUdfCalcRule to planner to enable UDFs. .ruleSets( ZetaSQLQueryPlanner.getZetaSqlRuleSets( ImmutableList.of(BeamJavaUdfCalcRule.INSTANCE)) .toArray(new RuleSet[0])) .build(); - -// Look up UDF methods. -this.boolUdf = BooleanIdentityFn.class.getMethod("eval", Boolean.class); -this.longUdf = Int64IdentityFn.class.getMethod("eval", Long.class); -this.stringUdf = StringIdentityFn.class.getMethod("eval", String.class); -this.bytesUdf = BytesIdentityFn.class.getMethod("eval", byte[].class); -this.doubleUdf = DoubleIdentityFn.class.getMethod("eval", Double.class); } public static class BooleanIdentityFn implements BeamSqlUdf { @@ -158,19 +170,7 @@ public class ZetaSqlJavaUdfTypeTest extends ZetaSqlTestBase { } } - private void runUdfTypeTest(String query, Object result, Schema.TypeName typeName, Method udf) - throws NoSuchMethodException { -// Add UDF to Calcite schema. -
[beam] branch master updated (9601bde -> e6767c1)
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 9601bde [BEAM-11227] Try reverting #14295: Moving from vendored gRPC 1.26 to 1.36 (#14466) add 2cca8f1 [BEAM-12092] Bump jedis to 3.5.2 add e6767c1 Merge pull request #14471: [BEAM-12092] Bump jedis to 3.5.2 No new revisions were added by this update. Summary of changes: sdks/java/io/redis/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (572a99b -> 9601bde)
This is an automated email from the ASF dual-hosted git repository. suztomo pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 572a99b Merge pull request #14203: [BEAM-11948] Drop support for Flink 1.8 and 1.9 add 9601bde [BEAM-11227] Try reverting #14295: Moving from vendored gRPC 1.26 to 1.36 (#14466) No new revisions were added by this update. Summary of changes: .../org/apache/beam/gradle/BeamModulePlugin.groovy | 10 +-- ...g_1_36_0.groovy => GrpcVendoring_1_26_0.groovy} | 75 ++ examples/java/build.gradle | 2 +- .../transforms/DataProtectors.java | 6 +- .../kafkatopubsub/kafka/consumer/Utils.java| 4 +- runners/core-construction-java/build.gradle| 2 +- .../beam/runners/core/construction/BeamUrns.java | 2 +- .../core/construction/CoderTranslation.java| 2 +- .../core/construction/CoderTranslators.java| 2 +- .../core/construction/CombineTranslation.java | 2 +- .../CreatePCollectionViewTranslation.java | 2 +- .../DefaultExpansionServiceClientFactory.java | 2 +- .../core/construction/DisplayDataTranslation.java | 2 +- .../runners/core/construction/Environments.java| 4 +- .../beam/runners/core/construction/External.java | 6 +- .../runners/core/construction/ModelCoders.java | 2 +- .../construction/PCollectionViewTranslation.java | 2 +- .../core/construction/ParDoTranslation.java| 4 +- .../construction/PipelineOptionsTranslation.java | 6 +- .../runners/core/construction/ReadTranslation.java | 4 +- .../core/construction/TestStreamTranslation.java | 2 +- .../core/construction/WindowIntoTranslation.java | 2 +- .../construction/WindowingStrategyTranslation.java | 8 +-- .../core/construction/WriteFilesTranslation.java | 2 +- .../graph/GreedyPCollectionFusers.java | 2 +- .../core/construction/graph/QueryablePipeline.java | 2 +- .../runners/core/construction/CommonCoderTest.java | 2 +- .../PipelineOptionsTranslationTest.java| 6 +- .../core/construction/ValidateRunnerXlangTest.java | 8 +-- .../construction/WindowIntoTranslationTest.java| 2 +- .../construction/graph/ProtoOverridesTest.java | 2 +- runners/core-java/build.gradle | 2 +- .../core/metrics/MetricsContainerStepMap.java | 4 +- .../core/metrics/MonitoringInfoEncodings.java | 2 +- .../core/metrics/MonitoringInfoEncodingsTest.java | 2 +- runners/direct-java/build.gradle | 4 +- runners/flink/flink_runner.gradle | 2 +- .../FlinkBatchPortablePipelineTranslator.java | 2 +- .../apache/beam/runners/flink/FlinkJobInvoker.java | 2 +- .../beam/runners/flink/FlinkPipelineRunner.java| 2 +- .../FlinkStreamingPortablePipelineTranslator.java | 2 +- .../utils/FlinkPortableRunnerUtils.java| 2 +- .../streaming/ExecutableStageDoFnOperator.java | 4 +- .../wrappers/streaming/FlinkKeyUtils.java | 2 +- .../FlinkExecutableStageFunctionTest.java | 2 +- .../streaming/ExecutableStageDoFnOperatorTest.java | 4 +- .../wrappers/streaming/FlinkKeyUtilsTest.java | 2 +- runners/google-cloud-dataflow-java/build.gradle| 2 +- .../dataflow/DataflowPipelineTranslator.java | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../beam/runners/dataflow/DataflowRunnerTest.java | 2 +- .../google-cloud-dataflow-java/worker/build.gradle | 2 +- .../worker/legacy-worker/build.gradle | 4 +- .../runners/dataflow/worker/ByteStringCoder.java | 2 +- .../dataflow/worker/DataflowRunnerHarness.java | 2 +- .../worker/DataflowWorkerHarnessHelper.java| 2 +- .../worker/GroupAlsoByWindowParDoFnFactory.java| 2 +- .../beam/runners/dataflow/worker/PubsubSink.java | 2 +- .../beam/runners/dataflow/worker/StateFetcher.java | 2 +- .../dataflow/worker/StreamingDataflowWorker.java | 4 +- .../worker/StreamingModeExecutionContext.java | 2 +- .../dataflow/worker/StreamingSideInputFetcher.java | 4 +- .../dataflow/worker/WindmillComputationKey.java| 4 +- .../dataflow/worker/WindmillNamespacePrefix.java | 2 +- .../beam/runners/dataflow/worker/WindmillSink.java | 2 +- .../dataflow/worker/WindmillStateCache.java| 2 +- .../dataflow/worker/WindmillStateInternals.java| 2 +- .../dataflow/worker/WindmillStateReader.java | 2 +- .../dataflow/worker/WindmillTimerInternals.java| 2 +- .../dataflow/worker/WorkerCustomSources.java | 2 +- .../dataflow/worker/fn/BeamFnControlService.java | 6 +- .../control/RegisterAndProcessBundleOperation.java | 4 +- .../worker/fn/data/BeamFnDataGrpcService.java | 4 +- .../worker/fn/logging/BeamFnLoggingService.java| 4 +- .../fn/stream/ServerStreamObserverFactory.java | 6 +-