[beam] branch master updated: [BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to the cache.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 727844e [BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to the cache. new ed76a98 Merge pull request #15062 from [BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to the cache. 727844e is described below commit 727844eeac9dbfb1824a1c6cde73ac8f0ccbd984 Author: Luke Cwik AuthorDate: Tue Jun 22 10:49:23 2021 -0700 [BEAM-12524] Ensure that failed BundleProcessor objects are not re-added to the cache. We had tests to cover this but the usage of ExpectedException prevented the assertion that the cache was "empty" from running since the #processBundle method was throwing an exception. --- .../fn/harness/control/ProcessBundleHandler.java | 30 +++--- .../harness/control/ProcessBundleHandlerTest.java | 109 - 2 files changed, 83 insertions(+), 56 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 32bc803..1f6471e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -309,12 +309,13 @@ public class ProcessBundleHandler { throw new RuntimeException(e); } }); -PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry(); -PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry(); -ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker(); -QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient(); - try { + PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry(); + PTransformFunctionRegistry finishFunctionRegistry = + bundleProcessor.getFinishFunctionRegistry(); + ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker(); + QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient(); + try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) { try (Closeable closeTracker = stateTracker.activate()) { // Already in reverse topological order so we don't need to do anything. @@ -354,14 +355,16 @@ public class ProcessBundleHandler { response.setRequiresFinalization(true); } } + + // Mark the bundle processor as re-usable. bundleProcessorCache.release( request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor); + return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } catch (Exception e) { - bundleProcessorCache.release( - request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor); + // Make sure we clean-up from the active set of bundle processors. + bundleProcessorCache.discard(bundleProcessor); throw e; } -return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } public BeamFnApi.InstructionResponse.Builder progress(BeamFnApi.InstructionRequest request) @@ -648,13 +651,12 @@ public class ProcessBundleHandler { } /** - * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} will be reset before - * being added to the cache and will be marked as inactive. + * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor} will be marked as + * inactive and reset before being added to the cache. */ void release(String bundleDescriptorId, BundleProcessor bundleProcessor) { activeBundleProcessors.remove(bundleProcessor.getInstructionId()); try { -bundleProcessor.setInstructionId(null); bundleProcessor.reset(); cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor); } catch (Exception e) { @@ -665,6 +667,11 @@ public class ProcessBundleHandler { } } +/** Discard an active {@link BundleProcessor} instead of being re-used. */ +void discard(BundleProcessor bundleProcessor) { + activeBundleProcessors.remove(bundleProcessor.getInstructionId()); +} + /** Shutdown all the cached {@link BundleProcessor}s, running the tearDown() functions. */ void shutdown() throws Exception { cachedBundleProcessors.invalidateAll(); @@ -742,6 +749,7 @@ public class ProcessBundleHandler { } void reset() throws Exception { + this.instructionId = null; getStartF
[beam] branch master updated (b0b7578 -> cf8ffe6)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b0b7578 [BEAM-12421] Migrates ElasticsearchIOTest to use testContainers (#14933) new 91e4d8c [BEAM-12475] Reject splits for previous bundles. new c02f776 mypy new cf8ffe6 Merge pull request #15042 from [BEAM-12475] Reject splits for previous bundles. The 32343 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/python/apache_beam/runners/worker/bundle_processor.py | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-)
[beam] branch master updated: Fix Kafka expansion issues introduced by PR14801
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c097d84 Fix Kafka expansion issues introduced by PR14801 new 2ad332b Merge pull request #14849 from [BEAM-10670] Fix Kafka expansion issues introduced by PR14801 c097d84 is described below commit c097d8424678813901ac46f2fe674cfe71e67430 Author: Boyuan Zhang AuthorDate: Thu May 20 11:29:42 2021 -0700 Fix Kafka expansion issues introduced by PR14801 --- .../org/apache/beam/runners/direct/DirectRunner.java | 10 -- .../org/apache/beam/runners/flink/FlinkRunner.java | 7 --- .../org/apache/beam/runners/samza/SamzaRunner.java | 12 .../SparkStructuredStreamingRunner.java| 13 - .../org/apache/beam/runners/spark/SparkRunner.java | 9 ++--- .../apache/beam/runners/spark/SparkRunnerDebugger.java | 9 ++--- .../apache/beam/runners/twister2/Twister2Runner.java | 9 ++--- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +- 8 files changed, 31 insertions(+), 56 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index b073840..4e7178b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; -import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.transforms.PTransform; @@ -173,15 +172,6 @@ public class DirectRunner extends PipelineRunner { "PipelineOptions specified failed to serialize to JSON.", e); } -// TODO(BEAM-10670): Remove additional experiments when we address performance issue. -if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) { - // Populate experiments directly to avoid direct-runner <-> kafka circular dependency. - ExperimentalOptions.addExperiment( - pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read"); - ExperimentalOptions.addExperiment( - pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read"); -} - performRewrites(pipeline); MetricsEnvironment.setMetricsSupported(true); try { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 6d9e56b..1470a2c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -78,13 +78,6 @@ public class FlinkRunner extends PipelineRunner { // Portable flink only support SDF as read. // TODO(BEAM-10670): Use SDF read as default when we address performance issue. if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) { - if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) { -// Populate experiments directly to have Kafka use legacy read. -ExperimentalOptions.addExperiment( -pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read"); -ExperimentalOptions.addExperiment( -pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read"); - } SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index dd53a49..42fbed0 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -112,15 +112,11 @@ public class SamzaRunner extends PipelineRunner { @Override public SamzaPipelineResult run(Pipeline pipeline) { -// TODO(BEAM-10670): Use SDF read as default when we address performance issue. -if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) { - // Populate experiments directly to have Kafka use legacy read. - ExperimentalOptions.addExperiment( -
[beam] branch master updated (660fe11 -> 2d37477)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 660fe11 [BEAM-9615] Enable Schemas for struct coding. (#14826) add 99b1a92 [BEAM-10670] Use non-SDF based translation for Read by default on all runners except Dataflow add 2d37477 Merge pull request #14801 from[BEAM-10670] Use non-SDF based translation for Read by default on all runners except Dataflow No new revisions were added by this update. Summary of changes: .../runners/core/construction/SplittableParDo.java | 4 +++- .../org/apache/beam/runners/direct/DirectRunner.java | 11 +++ .../org/apache/beam/runners/flink/FlinkRunner.java | 15 ++- .../org/apache/beam/runners/samza/SamzaRunner.java | 10 ++ .../SparkStructuredStreamingRunner.java | 12 runners/spark/spark_runner.gradle| 1 - .../org/apache/beam/runners/spark/SparkRunner.java | 11 +++ .../beam/runners/spark/SparkRunnerDebugger.java | 11 +++ .../apache/beam/runners/twister2/Twister2Runner.java | 20 .../test/java/org/apache/beam/sdk/io/ReadTest.java | 4 .../beam/sdk/transforms/GroupIntoBatchesTest.java| 5 - 11 files changed, 88 insertions(+), 16 deletions(-)
[beam] branch master updated: [BEAM-12353] Sickbay new added failing test for Dataflow runner v2.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 20076b3 [BEAM-12353] Sickbay new added failing test for Dataflow runner v2. new ee1ca77 Merge pull request #14830 from [BEAM-12353] Sickbay new added failing test for Dataflow runner v2. 20076b3 is described below commit 20076b3e00988e089f88c4375ecf059610084f6d Author: Boyuan Zhang AuthorDate: Mon May 17 17:44:53 2021 -0700 [BEAM-12353] Sickbay new added failing test for Dataflow runner v2. --- runners/google-cloud-dataflow-java/build.gradle | 5 + 1 file changed, 5 insertions(+) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index e8cb290..d48b391 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -393,6 +393,8 @@ task validatesRunnerV2 { 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime', 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testLargeKeys100MB', 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testLargeKeys10MB', + // TODO(BEAM-12353): Identify whether it's bug or a feature gap. + 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner', 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2', ] @@ -476,6 +478,9 @@ task validatesRunnerV2Streaming { "org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty", 'org.apache.beam.sdk.io.CountingSourceTest.testBoundedSourceSplits', + + // TODO(BEAM-12353): Identify whether it's bug or a feature gap. + 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner', ] )) }
[beam] branch master updated: Revert "Merge pull request #14691 from Add PatchResources to FhirIO."
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2ea61b7 Revert "Merge pull request #14691 from Add PatchResources to FhirIO." new d1cede0 Merge pull request #14816 from [BEAM-12310] Revert "Merge pull request #14691 from Add PatchResources to FhirIO." 2ea61b7 is described below commit 2ea61b722894f56eab15aca1cd428ca8f3c41be6 Author: Boyuan Zhang AuthorDate: Fri May 14 12:25:18 2021 -0700 Revert "Merge pull request #14691 from Add PatchResources to FhirIO." This reverts commit e37bedea --- .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 129 +--- .../sdk/io/gcp/healthcare/FhirSearchParameter.java | 15 -- .../sdk/io/gcp/healthcare/HealthcareApiClient.java | 18 +-- .../io/gcp/healthcare/HttpHealthcareApiClient.java | 63 +--- .../beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java | 171 - .../beam/sdk/io/gcp/healthcare/FhirIOTest.java | 23 --- 6 files changed, 9 insertions(+), 410 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 7fd44ce..5bc73ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -28,7 +28,6 @@ import com.google.auto.value.AutoValue; import com.google.gson.Gson; import com.google.gson.JsonArray; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; @@ -59,7 +58,6 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.fs.ResourceIdCoder; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.metrics.Counter; @@ -67,8 +65,6 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; @@ -105,19 +101,19 @@ import org.slf4j.LoggerFactory; * Reading * * FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a - * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from + * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually * prepared list of messages that you need to process (e.g. in a text file read with {@link * org.apache.beam.sdk.io.TextIO}*) . * - * Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings + * Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a - * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link + * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link - * HealthcareIOError}* containing the resources that could not be fetched and the exception as a + * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your * choosing. This error handling is mainly to transparently surface errors where the upstream {@link - * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues. + * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues. * * Writing * @@ -387,16 +383,6 @@ public class FhirIO { } /** -
[beam] branch master updated: [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2f3b188 [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO new 455dd10 Merge pull request #14580 from [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO 2f3b188 is described below commit 2f3b1849369c06abd38f4b2feb73f1d6b2d8 Author: Boyuan Zhang AuthorDate: Mon Apr 19 15:06:05 2021 -0700 [BEAM-12192] Have WatchKafkaTopicPartitionDoFn respects topics given by KafkaIO --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 28 +++--- .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 25 .../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 45 ++ 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8b6058c..29d8bac 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -29,10 +29,12 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.sdk.annotations.Experimental; @@ -581,9 +583,9 @@ public class KafkaIO { extends PTransform>> { abstract Map getConsumerConfig(); -abstract List getTopics(); +abstract @Nullable List getTopics(); -abstract List getTopicPartitions(); +abstract @Nullable List getTopicPartitions(); abstract @Nullable Coder getKeyCoder(); @@ -839,7 +841,8 @@ public class KafkaIO { */ public Read withTopics(List topics) { checkState( - getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both"); + getTopicPartitions() == null || getTopicPartitions().isEmpty(), + "Only topics or topicPartitions can be set, not both"); return toBuilder().setTopics(ImmutableList.copyOf(topics)).build(); } @@ -851,7 +854,9 @@ public class KafkaIO { * partitions are distributed among the splits. */ public Read withTopicPartitions(List topicPartitions) { - checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both"); + checkState( + getTopics() == null || getTopics().isEmpty(), + "Only topics or topicPartitions can be set, not both"); return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build(); } @@ -1170,7 +1175,8 @@ public class KafkaIO { // construction time. But it requires enabling beam_fn_api. if (!isDynamicRead()) { checkArgument( -getTopics().size() > 0 || getTopicPartitions().size() > 0, +(getTopics() != null && getTopics().size() > 0) +|| (getTopicPartitions() != null && getTopicPartitions().size() > 0), "Either withTopic(), withTopics() or withTopicPartitions() is required"); } else { checkArgument( @@ -1327,6 +1333,15 @@ public class KafkaIO { } PCollection output; if (kafkaRead.isDynamicRead()) { + Set topics = new HashSet<>(); + if (kafkaRead.getTopics() != null && kafkaRead.getTopics().size() > 0) { +topics.addAll(kafkaRead.getTopics()); + } + if (kafkaRead.getTopicPartitions() != null && kafkaRead.getTopicPartitions().size() > 0) { +for (TopicPartition topicPartition : kafkaRead.getTopicPartitions()) { + topics.add(topicPartition.topic()); +} + } output = input .getPipeline() @@ -1343,7 +1358,8 @@ public class KafkaIO { kafkaRead.getConsumerFactoryFn(), kafkaRead.getCheckStopReadingFn(), kafkaRead.getConsumerConfig(), - kafkaRead.getStartReadTime(; + kafkaRead.getStartReadTime(), + topics.stream().collect(Collectors.toList(); } else { output = diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java
[beam] branch master updated: [BEAM-12193] Add user metrics to show founded TopicPartition
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5fd434a [BEAM-12193] Add user metrics to show founded TopicPartition new baa106e Merge pull request #14570 from [BEAM-12193] Add user metrics to show founded TopicPartition 5fd434a is described below commit 5fd434a4e18089148efa86e63573ee455d940be2 Author: Boyuan Zhang AuthorDate: Fri Apr 16 21:30:50 2021 -0700 [BEAM-12193] Add user metrics to show founded TopicPartition --- .../apache/beam/sdk/io/kafka/TopicPartitionCoder.java | 3 ++- .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 16 ++-- .../beam/sdk/io/kafka/TopicPartitionCoderTest.java | 6 ++ .../sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 18 ++ 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java index f11e8ca..4868dc2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.kafka.common.TopicPartition; /** The {@link Coder} for encoding and decoding {@link TopicPartition} in Beam. */ @@ -48,7 +49,7 @@ public class TopicPartitionCoder extends StructuredCoder { @Override public List> getCoderArguments() { -return null; +return ImmutableList.of(StringUtf8Coder.of(), VarIntCoder.of()); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java index d82bfcf..fc9cc62 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -62,6 +64,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn, KafkaSourceD private final Map kafkaConsumerConfig; private final Instant startReadTime; + private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition"; + WatchKafkaTopicPartitionDoFn( Duration checkDuration, SerializableFunction, Consumer> kafkaConsumerFactoryFn, @@ -85,6 +89,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn, KafkaSourceD @VisibleForTesting Set getAllTopicPartitions() { Set current = new HashSet<>(); +// TODO(BEAM-12192): Respect given topics from KafkaIO. try (Consumer kafkaConsumer = kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) { for (Map.Entry> topicInfo : @@ -107,13 +112,17 @@ class WatchKafkaTopicPartitionDoFn extends DoFn, KafkaSourceD current.forEach( topicPartition -> { if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) { +Counter foundedTopicPartition = +Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString()); +foundedTopicPartition.inc(); existingTopicPartitions.add(topicPartition); outputReceiver.output( KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null)); } }); -timer.set(Instant.now().plus(checkDuration.getMillis())); +timer.offset(checkDuration).setRelative(); +; } @OnTimer(TIMER_ID) @@ -130,13 +139,16 @@ class WatchKafkaTopicPartitionDoFn extends DoFn, KafkaSourceD }); existingTopicPartitions.clear(); -Set currentAll = getAllTopicPartitions(); +Set currentAll = this.getAllTopicPartitions(); // Emit new added TopicPartitions. Set newAdded = Sets.difference(currentAll, readingTopicPartitions); newAdded.forEach( topicPartition -> { if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) { +Counter foundedTopicPartition = +Me
[beam] branch master updated: add sdkContainerImage to Java WorkerPool PipelineOptions
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2300f45 add sdkContainerImage to Java WorkerPool PipelineOptions new 018f5e7 Merge pull request #14575 from [BEAM-12212] Adds --sdkContainerImage as new Java Dataflow PipelineOption 2300f45 is described below commit 2300f4547bbdc5c0e1cdcafa537d1a605bedc522 Author: Emily Ye AuthorDate: Sun Apr 18 23:58:24 2021 -0700 add sdkContainerImage to Java WorkerPool PipelineOptions --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 4 +- runners/google-cloud-dataflow-java/build.gradle| 3 +- .../examples-streaming/build.gradle| 4 +- .../beam/runners/dataflow/DataflowRunner.java | 85 ++ .../beam/runners/dataflow/DataflowRunnerInfo.java | 10 ++- .../options/DataflowPipelineWorkerPoolOptions.java | 38 +- .../beam/runners/dataflow/dataflow.properties | 1 + .../dataflow/DataflowPipelineTranslatorTest.java | 46 ++-- .../runners/dataflow/DataflowRunnerInfoTest.java | 15 +++- .../beam/runners/dataflow/DataflowRunnerTest.java | 68 ++--- 10 files changed, 214 insertions(+), 60 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 465061d..4c4d41b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1583,13 +1583,15 @@ class BeamModulePlugin implements Plugin { if (pipelineOptionsString.contains('use_runner_v2')) { def dockerImageName = project.project(':runners:google-cloud-dataflow-java').ext.dockerImageName allOptionsList.addAll([ -"--workerHarnessContainerImage=${dockerImageName}", +"--sdkContainerImage=${dockerImageName}", "--region=${dataflowRegion}" ]) } else { def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project.project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath allOptionsList.addAll([ +// Keep as legacy flag to ensure via test this flag works for +// legacy pipeline. '--workerHarnessContainerImage=', "--dataflowWorkerJar=${dataflowWorkerJar}", "--region=${dataflowRegion}" diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 80941db..a3db5a2 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -46,6 +46,7 @@ processResources { 'dataflow.legacy_environment_major_version' : '8', 'dataflow.fnapi_environment_major_version' : '8', 'dataflow.container_version' : 'beam-master-20210419', +'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3', ] } @@ -147,7 +148,7 @@ def runnerV2PipelineOptions = [ "--project=${dataflowProject}", "--region=${dataflowRegion}", "--tempRoot=${dataflowValidatesTempRoot}", - "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", + "--sdkContainerImage=${dockerImageContainer}:${dockerTag}", // TODO(BEAM-11779) remove shuffle_mode=appliance with runner v2 once issue is resolved. "--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=appliance", ] diff --git a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle index fd8705d..bc5f584d 100644 --- a/runners/google-cloud-dataflow-java/examples-streaming/build.gradle +++ b/runners/google-cloud-dataflow-java/examples-streaming/build.gradle @@ -40,8 +40,8 @@ task windmillPreCommit(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath - // Set workerHarnessContainerImage to empty to make Dataflow pick up the non-versioned container - // image, which handles a staged worker jar. + // Set workerHarnessContainerImage to empty to make Dataflow pick up the + // non-versioned container image, which handles a staged worker jar. def preCommitBeamTestPipelineOptions
[beam] branch master updated: [BEAM-12114] Dataflow should apply KAFKA_READ_OVERRIDE when it's not with runner_v2
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1fdfef5 [BEAM-12114] Dataflow should apply KAFKA_READ_OVERRIDE when it's not with runner_v2 new 4439170 Merge pull request #14608 from [BEAM-12114] Dataflow should apply KAFKA_READ_OVERRIDE when it's not with runner_v2 1fdfef5 is described below commit 1fdfef5298b64aa8b253e02e2df286cd78bcfe38 Author: Boyuan Zhang AuthorDate: Wed Apr 21 11:40:58 2021 -0700 [BEAM-12114] Dataflow should apply KAFKA_READ_OVERRIDE when it's not with runner_v2 --- .../src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0a79cd9..42d463e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -492,7 +492,7 @@ public class DataflowRunner extends PipelineRunner { new StreamingPubsubIOWriteOverrideFactory(this))); } } - if (useUnifiedWorker(options)) { + if (!fnApiEnabled) { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); } overridesBuilder.add(
[beam] branch master updated: Eliminate beam_fn_api from KafkaIO expansion
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f2cc926 Eliminate beam_fn_api from KafkaIO expansion new 3b85447 Merge pull request #14419 from [BEAM-12114] Eliminate beam_fn_api from KafkaIO expansion f2cc926 is described below commit f2cc92663ad8ae685183e076cdb652d8fc3ba4e0 Author: Boyuan Zhang AuthorDate: Fri Apr 2 15:20:53 2021 -0700 Eliminate beam_fn_api from KafkaIO expansion --- runners/google-cloud-dataflow-java/build.gradle| 1 + .../beam/runners/dataflow/DataflowRunner.java | 4 + .../SparkStructuredStreamingRunner.java| 3 + runners/spark/spark_runner.gradle | 1 + .../org/apache/beam/runners/spark/SparkRunner.java | 3 + .../beam/runners/spark/SparkRunnerDebugger.java| 3 + sdks/java/io/kafka/build.gradle| 1 + .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 176 +++-- .../beam/sdk/io/kafka/KafkaIOExternalTest.java | 4 +- 9 files changed, 147 insertions(+), 49 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 290ea94..476e8c5 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -72,6 +72,7 @@ dependencies { compile project(path: ":model:pipeline", configuration: "shadow") compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:extensions:google-cloud-platform-core") + compile project(":sdks:java:io:kafka") compile project(":sdks:java:io:google-cloud-platform") compile project(":runners:core-construction-java") compile library.java.avro diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c81631e..0a79cd9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -115,6 +115,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageId import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -491,6 +492,9 @@ public class DataflowRunner extends PipelineRunner { new StreamingPubsubIOWriteOverrideFactory(this))); } } + if (useUnifiedWorker(options)) { +overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); + } overridesBuilder.add( PTransformOverride.of( PTransformMatchers.writeWithRunnerDeterminedSharding(), diff --git a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index f08c36b..5d8230e 100644 --- a/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/2/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -37,12 +37,14 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.batch.Pipel import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.MetricsOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; @@ -193,6 +195,7 @@ public final class SparkStructuredStreamingRunner || ExperimentalOptions.hasExperiment( pipeline.getOptions(),
[beam] branch master updated: Fix PubsubReader to populate message id correctly
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c1fee04 Fix PubsubReader to populate message id correctly new c18e3cf Merge pull request #14503 from [BEAM-12143] Fix PubsubReader to populate message id correctly c1fee04 is described below commit c1fee0404a6c4fca4f86b0bedda00e5b4e2beeed Author: Boyuan Zhang AuthorDate: Fri Apr 9 15:42:24 2021 -0700 Fix PubsubReader to populate message id correctly --- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 19 +-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index bd5e868..c5b7f3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -893,7 +893,9 @@ public class PubsubUnboundedSource extends PTransform messageCoder; +if (getNeedsMessageId()) { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesAndMessageIdCoder.of() + : PubsubMessageWithMessageIdCoder.of(); +} else { + messageCoder = + getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of(); +} PCollection messages = input .getPipeline() @@ -1347,7 +1361,8 @@ public class PubsubUnboundedSource extends PTransform
[beam] branch master updated: Change PubSubSource and PubSubSink translation to avoid special transform overrides.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c472530 Change PubSubSource and PubSubSink translation to avoid special transform overrides. new a48abeb Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides c472530 is described below commit c4725301da8f1fbc3982bca986f4d9a1b9a4ce19 Author: Boyuan Zhang AuthorDate: Tue Mar 30 18:00:59 2021 -0700 Change PubSubSource and PubSubSink translation to avoid special transform overrides. --- .../pipeline/src/main/proto/beam_runner_api.proto | 4 + .../beam/runners/dataflow/DataflowRunner.java | 180 +--- sdks/java/io/google-cloud-platform/build.gradle| 3 +- .../io/gcp/pubsub/PubSubPayloadTranslation.java| 159 + .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 78 ++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 73 +--- .../sdk/io/gcp/pubsub/RunnerImplementedSink.java | 68 .../pubsub/RunnerImplementedSinkTranslation.java | 87 -- .../sdk/io/gcp/pubsub/RunnerImplementedSource.java | 83 - .../pubsub/RunnerImplementedSourceTranslation.java | 102 --- java => PubSubReadPayloadTranslationTest.java} | 189 +++-- ...java => PubSubWritePayloadTranslationTest.java} | 37 ++-- .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java| 12 +- .../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 80 +++-- 14 files changed, 450 insertions(+), 705 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 134fcb6..138e352 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -696,6 +696,8 @@ message WriteFilesPayload { // with a native implementation. // The SDK should guarantee that only one of topic, subscription, // topic_runtime_overridden and subscription_runtime_overridden is set. +// The output of PubSubReadPayload should be bytes of serialized PubsubMessage +// proto if with_attributes == true. Otherwise, the bytes is the raw payload. message PubSubReadPayload { // Topic to read from. Exactly one of topic or subscription should be set. @@ -727,6 +729,8 @@ message PubSubReadPayload { // with a native implementation. // The SDK should guarantee that only one of topic and topic_runtime_overridden // is set. +// The output of PubSubWritePayload should be bytes if serialized PubsubMessage +// proto. message PubSubWritePayload { // Topic to write to. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a06951f..cdb7e67 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -95,7 +95,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; @@ -113,12 +112,8 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSink; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -137,11 +132,9 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo;
[beam] branch master updated: SDF bounded wrapper returns None when any exception happen in the calculation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 961789e SDF bounded wrapper returns None when any exception happen in the calculation. new dada0f9 Merge pull request #14439 from boyuanzz/fix_py 961789e is described below commit 961789eded3fad003f8b8d5b3d16d88892d33a40 Author: Boyuan Zhang AuthorDate: Mon Apr 5 18:53:21 2021 -0700 SDF bounded wrapper returns None when any exception happen in the calculation. --- sdks/python/apache_beam/io/iobase.py | 60 +-- sdks/python/apache_beam/io/iobase_test.py | 13 +++ 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 521da25..71d8037 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1499,33 +1499,39 @@ class _SDFBoundedSourceRestriction(object): return self._source_bundle.source def try_split(self, fraction_of_remainder): -consumed_fraction = self.range_tracker().fraction_consumed() -fraction = ( -consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder) -position = self.range_tracker().position_at_fraction(fraction) -# Need to stash current stop_pos before splitting since -# range_tracker.split will update its stop_pos if splits -# successfully. -stop_pos = self._source_bundle.stop_position -split_result = self.range_tracker().try_split(position) -if split_result: - split_pos, split_fraction = split_result - primary_weight = self._source_bundle.weight * split_fraction - residual_weight = self._source_bundle.weight - primary_weight - # Update self to primary weight and end position. - self._source_bundle = SourceBundle( - primary_weight, - self._source_bundle.source, - self._source_bundle.start_position, - split_pos) - return ( - self, - _SDFBoundedSourceRestriction( - SourceBundle( - residual_weight, - self._source_bundle.source, - split_pos, - stop_pos))) +try: + consumed_fraction = self.range_tracker().fraction_consumed() + fraction = ( + consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder) + position = self.range_tracker().position_at_fraction(fraction) + # Need to stash current stop_pos before splitting since + # range_tracker.split will update its stop_pos if splits + # successfully. + stop_pos = self._source_bundle.stop_position + split_result = self.range_tracker().try_split(position) + if split_result: +split_pos, split_fraction = split_result +primary_weight = self._source_bundle.weight * split_fraction +residual_weight = self._source_bundle.weight - primary_weight +# Update self to primary weight and end position. +self._source_bundle = SourceBundle( +primary_weight, +self._source_bundle.source, +self._source_bundle.start_position, +split_pos) +return ( +self, +_SDFBoundedSourceRestriction( +SourceBundle( +residual_weight, +self._source_bundle.source, +split_pos, +stop_pos))) +except Exception: + # For any exceptions from underlying trySplit calls, the wrapper will + # think that the source refuses to split at this point. In this case, + # no split happens at the wrapper level. + return None class _SDFBoundedSourceRestrictionTracker(RestrictionTracker): diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index 303cb68..bde0566 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -27,6 +27,7 @@ import apache_beam as beam from apache_beam.io.concat_source import ConcatSource from apache_beam.io.concat_source_test import RangeSource from apache_beam.io import iobase +from apache_beam.io import range_trackers from apache_beam.io.iobase import SourceBundle from apache_beam.options.pipeline_options import DebugOptions from apache_beam.testing.util import assert_that @@ -181,6 +182,18 @@ class SDFBoundedSourceRestrictionTrackerTest(unittest.TestCase): actual_primary._source_bundle.weight, self.sdf_restriction_tracker.current_restriction().weight()) + def test_try_split_with_any_exception(self): +source_bundle = SourceBundle( +range_trackers.OffsetRangeTracker.OFFSET_INFINITY, +RangeSource(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY), +0
[beam] branch master updated: [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2dcb7da [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads new 58bd73c Merge pull request #14469 from [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads 2dcb7da is described below commit 2dcb7da3add3db01d0fbd14b9b64ea8636eda325 Author: Steve Niemitz AuthorDate: Thu Apr 8 00:16:52 2021 -0400 [BEAM-12126] Fix DirectRunner not respecting use_deprecated_reads --- .../src/main/java/org/apache/beam/runners/direct/DirectRunner.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 3404fa5..a9a154e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -184,7 +184,6 @@ public class DirectRunner extends PipelineRunner { DisplayDataValidator.validatePipeline(pipeline); DisplayDataValidator.validateOptions(options); - SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); ExecutorService metricsPool = Executors.newCachedThreadPool( @@ -253,6 +252,8 @@ public class DirectRunner extends PipelineRunner { // The last set of overrides includes GBK overrides used in WriteView pipeline.replaceAll(groupByKeyOverrides()); + + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } @SuppressWarnings("rawtypes")
[beam] branch master updated: Optimize reservoir sampling calculation
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6ddf0c2 Optimize reservoir sampling calculation new bcced0c Merge pull request #14406 from [BEAM-11836] Optimize reservoir sampling calculation in PCollectionConsumerRegistry 6ddf0c2 is described below commit 6ddf0c2ff6706883771ec9cb13309101b34b80c4 Author: kileys AuthorDate: Fri Apr 2 00:57:38 2021 + Optimize reservoir sampling calculation --- .../harness/data/PCollectionConsumerRegistry.java | 49 -- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java index 14d245dc..457cbe8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java @@ -377,23 +377,46 @@ public class PCollectionConsumerRegistry { } } -// Lowest sampling probability: 0.001%. -private static final int SAMPLING_TOKEN_UPPER_BOUND = 100; -private static final int SAMPLING_CUTOFF = 10; -private int samplingToken = 0; +private static final int RESERVOIR_SIZE = 10; +private static final int SAMPLING_THRESHOLD = 30; +private long samplingToken = 0; +private long nextSamplingToken = 0; private Random randomGenerator = new Random(); -// TODO(BEAM-11836): Implement fast approximation for reservoir sampling. private boolean shouldSampleElement() { // Sampling probability decreases as the element count is increasing. - // We unconditionally sample the first samplingCutoff elements. For the - // next samplingCutoff elements, the sampling probability drops from 100% - // to 50%. The probability of sampling the Nth element is: - // min(1, samplingCutoff / N), with an additional lower bound of - // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined - // later. - samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND); - return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF; + // We unconditionally sample the first samplingCutoff elements. Calculating + // nextInt(samplingToken) for each element is expensive, so after a threshold, calculate the + // gap to next sample. + // https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/ + + // Reset samplingToken if it's going to exceed the max value. + if (samplingToken + 1 == Long.MAX_VALUE) { +samplingToken = 0; +nextSamplingToken = getNextSamplingToken(samplingToken); + } + + samplingToken++; + // Use traditional sampling until the threshold of 30 + if (nextSamplingToken == 0) { +if (randomGenerator.nextInt((int) samplingToken) <= RESERVOIR_SIZE) { + if (samplingToken > SAMPLING_THRESHOLD) { +nextSamplingToken = getNextSamplingToken(samplingToken); + } + return true; +} + } else if (samplingToken >= nextSamplingToken) { +nextSamplingToken = getNextSamplingToken(samplingToken); +return true; + } + return false; +} + +private long getNextSamplingToken(long samplingToken) { + double gap = + Math.log(1.0 - randomGenerator.nextDouble()) + / Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken); + return samplingToken + (int) gap; } } }
[beam] branch master updated (6151674 -> e5b3333)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 6151674 [BEAM-12079] Enforce callable destination arg type for WriteToBigQuery schema argument (#14415) add 19a4ac0 Re-enable CrossLanguageKafkaIOTest add e5b Merge pull request #14414 [BEAM-10663] Re-enable CrossLanguageKafkaIOTest No new revisions were added by this update. Summary of changes: .../io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +++- sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py | 1 - sdks/python/test-suites/portable/common.gradle| 1 + 3 files changed, 4 insertions(+), 2 deletions(-)
[beam] branch master updated: Remove @Hidden and @Experimental annotation of CreateFromSnapshot pipelien option.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a954e6f Remove @Hidden and @Experimental annotation of CreateFromSnapshot pipelien option. new 38c9caa Merge pull request #14296 from andyxiexu/java-snapshot a954e6f is described below commit a954e6f5e03c7c9110c9d2b116e7774edbcda4fe Author: Andy Xu AuthorDate: Mon Mar 22 09:23:34 2021 -0700 Remove @Hidden and @Experimental annotation of CreateFromSnapshot pipelien option. --- .../apache/beam/runners/dataflow/options/DataflowPipelineOptions.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index e8c3708..8434ff7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -95,8 +95,6 @@ public interface DataflowPipelineOptions void setUpdate(boolean value); /** If set, the snapshot from which the job should be created. */ - @Hidden - @Experimental @Description("If set, the snapshot from which the job should be created.") String getCreateFromSnapshot();
[beam] branch master updated: Better error for missing java.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ee29766 Better error for missing java. new 6e799fa Merge pull request #14266 from robertwb/java-error ee29766 is described below commit ee29766db0f55b9ed3176e4c2cd5f889e8d0cfe7 Author: Robert Bradshaw AuthorDate: Wed Mar 17 18:10:58 2021 -0700 Better error for missing java. --- sdks/python/apache_beam/utils/subprocess_server.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 018a837..a7b623e 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -161,6 +161,9 @@ class JavaJarServer(SubprocessServer): dict(__init__=lambda self: setattr(self, 'replacements', {})))() def __init__(self, stub_class, path_to_jar, java_arguments): +if not shutil.which('java'): + raise RuntimeError( + 'Java must be installed on this system to use this transform/runner.') super(JavaJarServer, self).__init__( stub_class, ['java', '-jar', path_to_jar] + list(java_arguments)) self._existing_service = path_to_jar if _is_service_endpoint(
[beam] branch master updated: Add more comments to describe PubSubReadPayload and PubSubWritePayload.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 86c55f4 Add more comments to describe PubSubReadPayload and PubSubWritePayload. new 5afe677 Merge pull request #14204 from Add more comments to describe PubSubReadPayload and PubSubWritePayload. 86c55f4 is described below commit 86c55f41da7e9fb1c9fd92560a97f2d0f172ffdb Author: Boyuan Zhang AuthorDate: Thu Mar 11 11:04:51 2021 -0800 Add more comments to describe PubSubReadPayload and PubSubWritePayload. --- model/pipeline/src/main/proto/beam_runner_api.proto | 4 1 file changed, 4 insertions(+) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index f609c77..134fcb6 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -694,6 +694,8 @@ message WriteFilesPayload { // Payload used by Google Cloud Pub/Sub read transform. // This can be used by runners that wish to override Beam Pub/Sub read transform // with a native implementation. +// The SDK should guarantee that only one of topic, subscription, +// topic_runtime_overridden and subscription_runtime_overridden is set. message PubSubReadPayload { // Topic to read from. Exactly one of topic or subscription should be set. @@ -723,6 +725,8 @@ message PubSubReadPayload { // Payload used by Google Cloud Pub/Sub write transform. // This can be used by runners that wish to override Beam Pub/Sub write transform // with a native implementation. +// The SDK should guarantee that only one of topic and topic_runtime_overridden +// is set. message PubSubWritePayload { // Topic to write to.
[beam] branch master updated: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 51bb63f [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state new 06ce34e Merge pull request #14182 from [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state 51bb63f is described below commit 51bb63fc05441c7e9208407ecd5e172c009a269f Author: Yichi Zhang AuthorDate: Tue Mar 9 20:32:34 2021 -0800 [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state --- .../beam/fn/harness/WindowMergingFnRunner.java | 9 - .../beam/fn/harness/WindowMergingFnRunnerTest.java | 22 ++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java index edf0e0a..e7b169e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; /** @@ -154,7 +155,13 @@ public abstract class WindowMergingFnRunner { for (KV> mergedWindow : mergedWindows) { currentWindows.removeAll(mergedWindow.getValue()); } - return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows)); + KV, Iterable>>>> result = + KV.of( + windowsToMerge.getKey(), + KV.of(Sets.newHashSet(currentWindows), (Iterable) Lists.newArrayList(mergedWindows))); + currentWindows.clear(); + mergedWindows.clear(); + return result; } } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java index 9816ed6..359ea98 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java @@ -105,6 +105,28 @@ public class WindowMergingFnRunnerTest { Iterables.getOnlyElement(output.getValue().getValue()); assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey()); assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged)); + +// Process a new group of windows, make sure that previous result has been cleaned up. +BoundedWindow[] expectedToBeMergedGroup2 = +new BoundedWindow[] { + new IntervalWindow(new Instant(15L), new Instant(17L)), + new IntervalWindow(new Instant(16L), new Instant(18L)) +}; + +input = +KV.of( +"abc", +ImmutableList.builder() +.add(expectedToBeMergedGroup2) +.addAll(expectedToBeUnmerged) +.build()); + +output = mapFunction.apply(input); +assertEquals(input.getKey(), output.getKey()); +assertEquals(expectedToBeUnmerged, output.getValue().getKey()); +mergedOutput = Iterables.getOnlyElement(output.getValue().getValue()); +assertEquals(new IntervalWindow(new Instant(15L), new Instant(18L)), mergedOutput.getKey()); +assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMergedGroup2)); } private static RunnerApi.PTransform createMergeTransformForWindowFn(
[beam] branch master updated (1ad764b -> 48f9f66)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 1ad764b Merge pull request #14190 from tysonjh/snapshot-fix-fix-fix new 745dbe63 Add Pubsub RunnerImplementedSource and RunnerImplementedSink which can be translated into PubsubReadPayload and PubsubWritePayload over fnapi new 48f9f66 Merge pull request #14141 from [BEAM-10861] Add RunnerImplementedSource and RunnerImplementedSink to Pubsub The 31006 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../pipeline/src/main/proto/beam_runner_api.proto | 4 +- .../core/construction/PTransformTranslation.java | 4 + .../beam/runners/dataflow/DataflowRunner.java | 171 +-- sdks/java/io/google-cloud-platform/build.gradle| 7 +- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 4 +- .../beam/sdk/io/gcp/pubsub/PubsubMessages.java | 10 + .../sdk/io/gcp/pubsub/RunnerImplementedSink.java | 68 ++ .../pubsub/RunnerImplementedSinkTranslation.java | 87 .../sdk/io/gcp/pubsub/RunnerImplementedSource.java | 83 .../pubsub/RunnerImplementedSourceTranslation.java | 102 + .../beam/sdk/io/gcp/pubsub/PubsubClientTest.java | 4 +- .../RunnerImplementedSinkTranslationTest.java | 120 +++ .../RunnerImplementedSourceTranslationTest.java| 237 + 13 files changed, 781 insertions(+), 120 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java
[beam] branch master updated: [BEAM-11932] Add Dataflow ServiceOptions.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 73963a5 [BEAM-11932] Add Dataflow ServiceOptions. new 734c0a8 Merge pull request #14159 from [BEAM-11932] Add Dataflow ServiceOptions. 73963a5 is described below commit 73963a52341edfed25df2328d4502b9ba40caabc Author: Tyson Hamilton AuthorDate: Mon Mar 8 22:20:40 2021 + [BEAM-11932] Add Dataflow ServiceOptions. Introduce service options for Dataflow. The opaque option decouples service side feature availability from the Apache Beam release cycle. The sole purpose of these options are to allow retroactively exposing a service side feature on previously released SDK versions. This also includes bumping client library versions. --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 18 +++ .../dataflow/DataflowPipelineTranslator.java | 1 + .../dataflow/options/DataflowPipelineOptions.java | 12 ++ .../dataflow/DataflowPipelineTranslatorTest.java | 27 ++ .../io/gcp/healthcare/HttpHealthcareApiClient.java | 13 +-- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 84cf4ac..5843660 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -423,7 +423,7 @@ class BeamModulePlugin implements Plugin { def checkerframework_version = "3.10.0" def classgraph_version = "4.8.65" def errorprone_version = "2.3.4" -def google_clients_version = "1.30.10" +def google_clients_version = "1.31.0" def google_cloud_bigdataoss_version = "2.1.6" def google_cloud_pubsublite_version = "0.7.0" def google_code_gson_version = "2.8.6" @@ -501,17 +501,17 @@ class BeamModulePlugin implements Plugin { error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version", gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version gax_grpc: "com.google.api:gax-grpc", // google_cloud_platform_libraries_bom sets version -google_api_client : "com.google.api-client:google-api-client:$google_clients_version", +google_api_client : "com.google.api-client:google-api-client:1.31.1", // 1.31.1 is required to run 1.31.0 of google_clients_version below. google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version -google_api_services_bigquery: "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version", -google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version", -google_api_services_cloudresourcemanager: "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version", -google_api_services_dataflow: "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version", -google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1beta1-rev20200713-$google_clients_version", -google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20200713-$google_clients_version", -google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20200611-$google_clients_version", +google_api_services_bigquery: "com.google.apis:google-api-services-bigquery:v2-rev20210219-$google_clients_version", +google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200807-$google_clients_version", +google_api_services_cloudresourcemanager: "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20210222-$google_clients_version", +
[beam] branch master updated: Remove experiement --enable_streaming_auto_sharding
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c47ce74 Remove experiement --enable_streaming_auto_sharding new 4390b70 Merge pull request #14133 from [BEAM-11772, BEAM-11408] Remove Dataflow experiment --enable_streaming_auto_sharding c47ce74 is described below commit c47ce74c4b831c4209928278424d2c36bae2dad3 Author: sychen AuthorDate: Tue Mar 2 14:56:23 2021 -0800 Remove experiement --enable_streaming_auto_sharding --- .../beam/runners/dataflow/DataflowRunner.java | 7 +--- .../dataflow/DataflowPipelineTranslatorTest.java | 46 ++ .../beam/runners/dataflow/DataflowRunnerTest.java | 1 - .../runners/dataflow/dataflow_runner_test.py | 43 ++-- .../runners/dataflow/ptransform_overrides.py | 15 --- 5 files changed, 46 insertions(+), 66 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 946f020..cc0708f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1393,11 +1393,8 @@ public class DataflowRunner extends PipelineRunner { checkArgument( options.isEnableStreamingEngine(), "Runner determined sharding not available in Dataflow for GroupIntoBatches for" -+ " non-Streaming-Engine jobs."); -checkArgument( -hasExperiment(options, "enable_streaming_auto_sharding"), -"Runner determined sharding not enabled in Dataflow for GroupIntoBatches." -+ " Try adding the experiment: --experiments=enable_streaming_auto_sharding."); ++ " non-Streaming-Engine jobs. In order to use runner determined sharding, please use" ++ " --streaming --enable_streaming_engine"); pcollectionsRequiringAutoSharding.add(pcol); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index c9c08c3..d0781e4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -1150,7 +1150,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } - private JobSpecification runGroupIntoBatchesAndGetJobSpec( + private JobSpecification runStreamingGroupIntoBatchesAndGetJobSpec( Boolean withShardedKey, List experiments) throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); options.setExperiments(experiments); @@ -1179,10 +1179,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { List experiments = new ArrayList<>( ImmutableList.of( -"enable_streaming_auto_sharding", -GcpOptions.STREAMING_ENGINE_EXPERIMENT, -GcpOptions.WINDMILL_SERVICE_EXPERIMENT)); -JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(false, experiments); +GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT)); +JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(false, experiments); List steps = jobSpec.getJob().getSteps(); Step shardedStateStep = steps.get(steps.size() - 1); Map properties = shardedStateStep.getProperties(); @@ -1197,10 +1195,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { List experiments = new ArrayList<>( ImmutableList.of( -"enable_streaming_auto_sharding", -GcpOptions.STREAMING_ENGINE_EXPERIMENT, -GcpOptions.WINDMILL_SERVICE_EXPERIMENT)); -JobSpecification jobSpec = runGroupIntoBatchesAndGetJobSpec(true, experiments); +GcpOptions.STREAMING_ENGINE_EXPERIMENT, GcpOptions.WINDMILL_SERVICE_EXPERIMENT)); +JobSpecification jobSpec = runStreamingGroupIntoBatchesAndGetJobSpec(true, experiments); List steps = jobSpec.getJob().getSteps(); Step shardedStateStep = steps.get(steps.size() - 1); Map properties = shardedStateStep.getProperties(); @@ -1217,11 +1213,1
[beam] branch master updated: Add topic and subscription overridden field into pubsub proto.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 14e2ad4 Add topic and subscription overridden field into pubsub proto. new d9fd515 Merge pull request #14143 from Add topic and subscription overridden field into pubsub proto. 14e2ad4 is described below commit 14e2ad4e256212f44fa94ad22799da03a52e9c9d Author: Boyuan Zhang AuthorDate: Wed Mar 3 15:45:17 2021 -0800 Add topic and subscription overridden field into pubsub proto. --- model/pipeline/src/main/proto/beam_runner_api.proto | 12 1 file changed, 12 insertions(+) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 61b92be..8cbc932 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -697,9 +697,11 @@ message WriteFilesPayload { message PubSubReadPayload { // Topic to read from. Exactly one of topic or subscription should be set. + // Topic format is: /topics/project_id/subscription_name string topic = 1; // Subscription to read from. Exactly one of topic or subscription should be set. + // Subscription format is: /subscriptions/project_id/subscription_name string subscription = 2; // Attribute that provides element timestamps. @@ -710,6 +712,12 @@ message PubSubReadPayload { // If true, reads Pub/Sub payload as well as attributes. If false, reads only the payload. bool with_attributes = 5; + + // If set, the topic that is expected to be provided during runtime. + string topic_runtime_overridden = 6; + + // If set, the subscription that is expected to be provided during runtime. + string subscription_runtime_overridden = 7; } // Payload used by Google Cloud Pub/Sub write transform. @@ -718,6 +726,7 @@ message PubSubReadPayload { message PubSubWritePayload { // Topic to write to. + // Topic format is: /topics/project_id/subscription_name string topic = 1; // Attribute that provides element timestamps. @@ -725,6 +734,9 @@ message PubSubWritePayload { // Attribute that uniquely identify messages. string id_attribute = 3; + + // If set, the topic that is expected to be provided during runtime. + string topic_runtime_overridden = 4; } // Payload for GroupIntoBatches composite transform.
[beam] branch master updated: Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0b7904e Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes new 527be6e Merge pull request #14130 from Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes 0b7904e is described below commit 0b7904ef8e4f59a1d82f4390183bd861e850c039 Author: Yichi Zhang AuthorDate: Tue Mar 2 12:31:05 2021 -0800 Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes --- runners/google-cloud-dataflow-java/build.gradle | 4 ++-- runners/portability/java/build.gradle | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 8577194..d8fe8ff 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -367,7 +367,7 @@ task validatesRunnerV2 { 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows', - 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWitoutCustomWindowTypes', + 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes', 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming', @@ -418,7 +418,7 @@ task validatesRunnerV2Streaming { 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignBounded', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection', - 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithougCustomWindowTypes', + 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes', 'org.apache.beam.examples.complete.TopWikipediaSessionsTest.testComputeTopUsers', 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing', diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle index 0f94e42..fd2d8c0 100644 --- a/runners/portability/java/build.gradle +++ b/runners/portability/java/build.gradle @@ -175,7 +175,7 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" // https://issues.apache.org/jira/browse/BEAM-10448 excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows' excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection' - excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWitoutCustomWindowTypes' + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes' excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing' excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows'
[beam] branch master updated: Specify the time resolution for TestStreamPayload.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2d9c666 Specify the time resolution for TestStreamPayload. new 7e5e932 Merge pull request #14128 from Specify the time resolution for TestStreamPayload. 2d9c666 is described below commit 2d9c66647ecbc9aca6a8618168d44e4c815cea19 Author: Boyuan Zhang AuthorDate: Tue Mar 2 10:13:34 2021 -0800 Specify the time resolution for TestStreamPayload. --- model/pipeline/src/main/proto/beam_runner_api.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 9a2075c..61b92be 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -624,7 +624,7 @@ message TestStreamPayload { // Advances the watermark to the specified timestamp. message AdvanceWatermark { - // (Required) The watermark to advance to. + // (Required) The watermark in millisecond to advance to. int64 new_watermark = 1; // (Optional) The output watermark tag for a PCollection. If unspecified @@ -635,7 +635,7 @@ message TestStreamPayload { // Advances the processing time clock by the specified amount. message AdvanceProcessingTime { - // (Required) The duration to advance by. + // (Required) The duration in millisecond to advance by. int64 advance_duration = 1; } @@ -657,7 +657,7 @@ message TestStreamPayload { // encoding primitives. bytes encoded_element = 1; -// (Required) The event timestamp of this element. +// (Required) The event timestamp in millisecond of this element. int64 timestamp = 2; } }
[beam] branch master updated: [BEAM-11740] Estimate PCollection byte size
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d807210 [BEAM-11740] Estimate PCollection byte size new 9150193 Merge pull request #13924 from [BEAM-11740] Estimate PCollection byte size d807210 is described below commit d807210c3aa28f34c13b89f3f16bc104051532b0 Author: kileys AuthorDate: Tue Feb 2 20:03:47 2021 + [BEAM-11740] Estimate PCollection byte size --- .../beam/runners/core/metrics/LabeledMetrics.java | 6 + .../beam/sdk/metrics/DelegatingDistribution.java | 66 +++ .../beam/fn/harness/BeamFnDataWriteRunner.java | 4 +- .../org/apache/beam/fn/harness/CombineRunners.java | 4 +- .../org/apache/beam/fn/harness/FlattenRunner.java | 29 - .../apache/beam/fn/harness/FnApiDoFnRunner.java| 5 +- .../org/apache/beam/fn/harness/MapFnRunners.java | 29 - .../harness/data/PCollectionConsumerRegistry.java | 119 --- .../beam/fn/harness/AssignWindowsRunnerTest.java | 15 ++- .../beam/fn/harness/BeamFnDataReadRunnerTest.java | 5 +- .../apache/beam/fn/harness/CombineRunnersTest.java | 108 ++--- .../apache/beam/fn/harness/FlattenRunnerTest.java | 47 ++-- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 129 + .../apache/beam/fn/harness/MapFnRunnersTest.java | 32 +++-- .../data/PCollectionConsumerRegistryTest.java | 55 ++--- 15 files changed, 547 insertions(+), 106 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java index 2df2de9..f3d2793 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java @@ -19,7 +19,9 @@ package org.apache.beam.runners.core.metrics; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.DelegatingCounter; +import org.apache.beam.sdk.metrics.DelegatingDistribution; import org.apache.beam.sdk.metrics.DelegatingHistogram; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.util.HistogramData; @@ -40,6 +42,10 @@ public class LabeledMetrics { return new DelegatingCounter(metricName, processWideContainer); } + public static Distribution distribution(MonitoringInfoMetricName metricName) { +return new DelegatingDistribution(metricName); + } + public static Histogram histogram( MonitoringInfoMetricName metricName, HistogramData.BucketType bucketType, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java new file mode 100644 index 000..6cfe98e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingDistribution.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; + +/** + * Implementation of {@link Distribution} that delegates to the instance for the current context. + */ +@Internal +public class DelegatingDistribution implements Metric, Distribution, Serializable { + private final MetricName name; + private final boolean processWideContainer; + + public DelegatingDistribution(MetricName name) { +this(name, false); + } + + public DelegatingDistribution(MetricName name, boolean processWideContainer) { +this.name = name; +this.processWideContainer = processWideContainer; + } + + @Override + public void update(long value) { +MetricsContainer container = +this.processWideContainer +? MetricsEnvironment.getProcessWideContainer() +: MetricsEnvironment.getCurrentContainer(); +i
[beam] branch master updated: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new db6dd9b [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker new 990de63 Merge pull request #14013 from [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker db6dd9b is described below commit db6dd9bf0cfd27e26a8f29446b43ba33f47e5b47 Author: Jan Lukavsky AuthorDate: Thu Feb 18 14:37:10 2021 +0100 [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker --- .../src/main/java/org/apache/beam/sdk/io/Read.java | 40 +++-- .../test/java/org/apache/beam/sdk/io/ReadTest.java | 195 - 2 files changed, 223 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 4982066..09464db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -62,6 +62,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn; import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener; @@ -175,7 +176,8 @@ public class Read { public static class Unbounded extends PTransform> { private final UnboundedSource source; -private Unbounded(@Nullable String name, UnboundedSource source) { +@VisibleForTesting +Unbounded(@Nullable String name, UnboundedSource source) { super(name); this.source = (UnboundedSource) SerializableUtils.ensureSerializable(source); @@ -214,10 +216,7 @@ public class Read { .apply(ParDo.of(new OutputSingleSource<>(source))) .setCoder( SerializableCoder.of(new TypeDescriptor>() {})) - .apply( - ParDo.of( - new UnboundedSourceAsSDFWrapperFn<>( - (Coder) source.getCheckpointMarkCoder( + .apply(ParDo.of(createUnboundedSdfWrapper())) .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); if (source.requiresDeduping()) { @@ -229,6 +228,11 @@ public class Read { return outputWithIds.apply(ParDo.of(new StripIdsDoFn<>())); } +@VisibleForTesting +UnboundedSourceAsSDFWrapperFn createUnboundedSdfWrapper() { + return new UnboundedSourceAsSDFWrapperFn<>(source.getCheckpointMarkCoder()); +} + /** Returns the {@code UnboundedSource} used to create this {@code Read} {@code PTransform}. */ public UnboundedSource getSource() { return source; @@ -447,7 +451,8 @@ public class Read { private Cache> cachedReaders; private Coder> restrictionCoder; -private UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { +@VisibleForTesting +UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { this.checkpointCoder = checkpointCoder; } @@ -535,6 +540,7 @@ public class Read { UnboundedSourceValue[] out = new UnboundedSourceValue[1]; while (tracker.tryClaim(out) && out[0] != null) { +watermarkEstimator.setWatermark(out[0].getWatermark()); receiver.outputWithTimestamp( new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); } @@ -542,8 +548,11 @@ public class Read { UnboundedSourceRestriction currentRestriction = tracker.currentRestriction(); - // Advance the watermark even if zero elements may have been output. - watermarkEstimator.setWatermark(currentRestriction.getWatermark()); + // Advance the watermark even if zero elements may have been output, if we have not + // split the restriction + if (!currentRestriction.isSplit()) { +watermarkEstimator.setWatermark(currentRestriction.getWatermark()); + } // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not // the initial restriction. The initial restriction would have been finalized as part of @@ -602,9 +611,11 @@ public class Read { @AutoValue abstract static class UnboundedSourceValue { - public static UnboundedSourceValue create(byte[]
[beam] branch master updated (31825af -> 3a8993b)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 31825af Merge pull request #13862: [BEAM-11707] Change WindmillStateCache cache invalidation to be based… new f48af32 Add validate runner test for testing custom merging windows fn without custom window types new 9f5ded6 Fix up! formatting new 3a8993b Merge pull request #14096 from Add validate runner test for testing custom merging windows fn without custom window types The 30862 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: runners/google-cloud-dataflow-java/build.gradle| 2 + runners/portability/java/build.gradle | 1 + .../beam/sdk/transforms/windowing/WindowTest.java | 91 ++ 3 files changed, 94 insertions(+)
[beam] branch master updated: Do not stage dataflow worker jar when use runner_v2.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a31e004 Do not stage dataflow worker jar when use runner_v2. new 11be7aa Merge pull request #14100 from Do not stage dataflow worker jar when use runner_v2. a31e004 is described below commit a31e00433449fdbdf63a053cedec60d060d5ab9c Author: Boyuan Zhang AuthorDate: Thu Feb 25 17:24:07 2021 -0800 Do not stage dataflow worker jar when use runner_v2. --- .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 97a4329..f03b40c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -901,7 +901,7 @@ public class DataflowRunner extends PipelineRunner { String windmillBinary = options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); String dataflowWorkerJar = options.getDataflowWorkerJar(); -if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) { +if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty() && !useUnifiedWorker(options)) { // Put the user specified worker jar at the start of the classpath, to be consistent with the // built in worker order. pathsToStageBuilder.add("dataflow-worker.jar=" + dataflowWorkerJar); @@ -992,7 +992,7 @@ public class DataflowRunner extends PipelineRunner { options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME); dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); -if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar())) { +if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { List experiments = dataflowOptions.getExperiments() == null ? new ArrayList<>()
[beam] branch master updated (1a4d9a2 -> 643ad9e)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 1a4d9a2 [BEAM-10961] enable strict dependency checking for sdks/java/io/parquet (#14062) new 805dabf [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer new 7f8a315 [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer new 643ad9e Merge pull request #14069 from [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer The 30799 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../pubsublite/PerSubscriptionPartitionSdf.java| 32 ++ .../PerSubscriptionPartitionSdfTest.java | 23 ++-- 2 files changed, 23 insertions(+), 32 deletions(-)
[beam] branch master updated: [BEAM-11377] Disable mvn http keepalive.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e2bdd6d [BEAM-11377] Disable mvn http keepalive. new 6e03cfd Merge pull request #13952 from [BEAM-11377] Disable mvn http keepalive. e2bdd6d is described below commit e2bdd6d607a4c7d143753be81703cfeb90054023 Author: Tyson Hamilton AuthorDate: Wed Feb 10 20:10:31 2021 + [BEAM-11377] Disable mvn http keepalive. --- release/src/main/groovy/MobileGamingCommands.groovy | 1 + release/src/main/groovy/quickstart-java-dataflow.groovy | 1 + release/src/main/groovy/quickstart-java-direct.groovy | 1 + release/src/main/groovy/quickstart-java-flinklocal.groovy | 1 + release/src/main/groovy/quickstart-java-twister2.groovy | 1 + 5 files changed, 5 insertions(+) diff --git a/release/src/main/groovy/MobileGamingCommands.groovy b/release/src/main/groovy/MobileGamingCommands.groovy index 9a651f9..e3bacec 100644 --- a/release/src/main/groovy/MobileGamingCommands.groovy +++ b/release/src/main/groovy/MobileGamingCommands.groovy @@ -72,6 +72,7 @@ class MobileGamingCommands { return """mvn compile exec:java -q \ -Dmaven.wagon.http.retryHandler.class=default \ -Dmaven.wagon.http.retryHandler.count=5 \ + -Dhttp.keepAlive=false \ -Dexec.mainClass=org.apache.beam.examples.complete.game.${className} \ -Dexec.args=\"${getArgs(exampleName, runner, jobName)}\" \ -P${RUNNERS[runner]}""" diff --git a/release/src/main/groovy/quickstart-java-dataflow.groovy b/release/src/main/groovy/quickstart-java-dataflow.groovy index 5e59521..355b5e5 100644 --- a/release/src/main/groovy/quickstart-java-dataflow.groovy +++ b/release/src/main/groovy/quickstart-java-dataflow.groovy @@ -37,6 +37,7 @@ t.describe 'Run Apache Beam Java SDK Quickstart - Dataflow' t.run """mvn compile exec:java -q \ -Dmaven.wagon.http.retryHandler.class=default \ -Dmaven.wagon.http.retryHandler.count=5 \ + -Dhttp.keepAlive=false \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=DataflowRunner \ --project=${t.gcpProject()} \ diff --git a/release/src/main/groovy/quickstart-java-direct.groovy b/release/src/main/groovy/quickstart-java-direct.groovy index 0956624..3d0cbdf 100644 --- a/release/src/main/groovy/quickstart-java-direct.groovy +++ b/release/src/main/groovy/quickstart-java-direct.groovy @@ -33,6 +33,7 @@ t.describe 'Run Apache Beam Java SDK Quickstart - Direct' t.run """mvn compile exec:java -q \ -Dmaven.wagon.http.retryHandler.class=default \ -Dmaven.wagon.http.retryHandler.count=5 \ + -Dhttp.keepAlive=false \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts" \ -Pdirect-runner""" diff --git a/release/src/main/groovy/quickstart-java-flinklocal.groovy b/release/src/main/groovy/quickstart-java-flinklocal.groovy index fd32ef0..c52928f 100644 --- a/release/src/main/groovy/quickstart-java-flinklocal.groovy +++ b/release/src/main/groovy/quickstart-java-flinklocal.groovy @@ -33,6 +33,7 @@ t.describe 'Run Apache Beam Java SDK Quickstart - Flink Local' t.run """mvn compile exec:java -q \ -Dmaven.wagon.http.retryHandler.class=default \ -Dmaven.wagon.http.retryHandler.count=5 \ + -Dhttp.keepAlive=false \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts \ --runner=FlinkRunner" -Pflink-runner""" diff --git a/release/src/main/groovy/quickstart-java-twister2.groovy b/release/src/main/groovy/quickstart-java-twister2.groovy index 053051e..cd6fcbd 100644 --- a/release/src/main/groovy/quickstart-java-twister2.groovy +++ b/release/src/main/groovy/quickstart-java-twister2.groovy @@ -33,6 +33,7 @@ t.describe 'Run Apache Beam Java SDK Quickstart - Twister2' t.run """mvn compile exec:java -q \ -Dmaven.wagon.http.retryHandler.class=default \ -Dmaven.wagon.http.retryHandler.count=5 \ + -Dhttp.keepAlive=false \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts \ --runner=Twister2Runner" -Ptwister2-runner"""
[beam] branch master updated: Explicitly set dataflow runner v2 test shuffle mode to appliance
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ee4ccef Explicitly set dataflow runner v2 test shuffle mode to appliance new 5adf500 Merge pull request #13928 from [BEAM-11779] Explicitly set dataflow runner v2 test shuffle mode to appliance ee4ccef is described below commit ee4ccefd633f9f2a80c3a15948ab96a6dce8c244 Author: Yichi Zhang AuthorDate: Mon Feb 8 12:38:00 2021 -0800 Explicitly set dataflow runner v2 test shuffle mode to appliance --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 227a8b3..e808e4d 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -144,7 +144,7 @@ def runnerV2PipelineOptions = [ "--region=${dataflowRegion}", "--tempRoot=${dataflowValidatesTempRoot}", "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", - "--experiments=beam_fn_api,use_unified_worker,use_runner_v2", + "--experiments=beam_fn_api,use_unified_worker,use_runner_v2,shuffle_mode=appliance", ] def commonLegacyExcludeCategories = [
[beam] branch master updated: Using LoadingCache instead of Map to cache BundleProcessor
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 804bc28 Using LoadingCache instead of Map to cache BundleProcessor new 022a2ab Merge pull request #13893 from [BEAM-11752] Using LoadingCache instead of Map to cache BundleProcessor 804bc28 is described below commit 804bc28310a6093ae06fa1848aa9d5c317a6fbf8 Author: Boyuan Zhang AuthorDate: Wed Feb 3 17:19:40 2021 -0800 Using LoadingCache instead of Map to cache BundleProcessor --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 5 +- .../fn/harness/control/ProcessBundleHandler.java | 58 -- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 7e27b5f..502add9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -872,7 +872,10 @@ public abstract class DoFnThus, all work that depends on input elements, and all externally important side effects, * must be performed in the {@link ProcessElement} or {@link FinishBundle} methods. diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 4ecb5f5..7181420 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -20,6 +20,7 @@ package org.apache.beam.fn.harness.control; import com.google.auto.value.AutoValue; import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -74,11 +75,13 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Message; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.TextFormat; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.joda.time.Instant; @@ -538,7 +541,8 @@ public class ProcessBundleHandler { /** A cache for {@link BundleProcessor}s. */ public static class BundleProcessorCache { -private final Map> cachedBundleProcessors; +private final LoadingCache> +cachedBundleProcessors; private final Map activeBundleProcessors; @Override @@ -547,14 +551,32 @@ public class ProcessBundleHandler { } BundleProcessorCache() { - this.cachedBundleProcessors = Maps.newConcurrentMap(); + this.cachedBundleProcessors = + CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(1L)) + .removalListener( + removalNotification -> { +((ConcurrentLinkedQueue) removalNotification.getValue()) +.forEach( +bundleProcessor -> { + bundleProcessor.shutdown(); +}); + }) + .build( + new CacheLoader>() { +@Override +public ConcurrentLinkedQueue load(String s) throws Exception { + return new ConcurrentLinkedQueue<>(); +} + }); // We specifically use a weak hash map so that references will automatically go out of scope // and not need to be freed explicitly from the cache. this.activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap<>()); } +@VisibleForTesting Map> getCachedBundleProcessors() { - return cachedBundleProcessors; +
[beam] branch master updated: [BEAM-11325] Support KafkaIO dynamic read
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0fbb21f [BEAM-11325] Support KafkaIO dynamic read new 47d3326 Merge pull request #13750 from [BEAM-11325] Kafka Dynamic Read 0fbb21f is described below commit 0fbb21fd13b0f1ac1a28e4c839b8ebbe9420e9d3 Author: Boyuan Zhang AuthorDate: Fri Jan 8 10:31:34 2021 -0800 [BEAM-11325] Support KafkaIO dynamic read --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 156 +++- .../beam/sdk/io/kafka/TopicPartitionCoder.java | 56 +++ .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 156 .../beam/sdk/io/kafka/TopicPartitionCoderTest.java | 39 ++ .../io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 422 + 5 files changed, 821 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 04b48fa..10aac4a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -73,6 +73,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -163,6 +164,88 @@ import org.slf4j.LoggerFactory; * Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are interpreted using * key and value deserializers. * + * Read From Kafka Dynamically + * + * For a given kafka bootstrap_server, KafkaIO is also able to detect and read from available {@link + * TopicPartition} dynamically and stop reading from un. KafkaIO uses {@link + * WatchKafkaTopicPartitionDoFn} to emit any new added {@link TopicPartition} and uses {@link + * ReadFromKafkaDoFn} to read from each {@link KafkaSourceDescriptor}. Dynamic read is able to solve + * 2 scenarios: + * + * + * Certain topic or partition is added/deleted. + * Certain topic or partition is added, then removed but added back again + * + * + * Within providing {@code checkStopReadingFn}, there are 2 more cases that dynamic read can handle: + * + * + * Certain topic or partition is stopped + * Certain topic or partition is added, then stopped but added back again + * + * + * Race conditions may happen under 2 supported cases: + * + * + * A TopicPartition is removed, but added backed again + * A TopicPartition is stopped, then want to read it again + * + * + * When race condition happens, it will result in the stopped/removed TopicPartition failing to be + * emitted to ReadFromKafkaDoFn again. Or ReadFromKafkaDoFn will output replicated records. The + * major cause for such race condition is that both {@link WatchKafkaTopicPartitionDoFn} and {@link + * ReadFromKafkaDoFn} react to the signal from removed/stopped {@link TopicPartition} but we cannot + * guarantee that both DoFns perform related actions at the same time. + * + * Here is one example for failing to emit new added {@link TopicPartition}: + * + * + * A {@link WatchKafkaTopicPartitionDoFn} is configured with updating the current tracking set + * every 1 hour. + * One TopicPartition A is tracked by the {@link WatchKafkaTopicPartitionDoFn} at 10:00AM and + * {@link ReadFromKafkaDoFn} starts to read from TopicPartition A immediately. + * At 10:30AM, the {@link WatchKafkaTopicPartitionDoFn} notices that the {@link + * TopicPartition} has been stopped/removed, so it stops reading from it and returns {@code + * ProcessContinuation.stop()}. + * At 10:45 the pipeline author wants to read from TopicPartition A again. + * At 11:00AM when {@link WatchKafkaTopicPartitionDoFn} is invoked by firing timer, it doesn’t + * know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A + * is still an active TopicPartition and it will not emit TopicPartition A again. + * + * + * Another race condition example for producing duplicate records: + * + * + * At 10:00AM, {@link ReadFromKafkaDoFn} is processing TopicPartition A + * At 10:05AM, {@link ReadFromKafkaDoFn} starts to process other TopicPartitions(sdf-initiated + * checkpoint or runner-issued checkpoint happens) + * At 10:10AM, {@link WatchKafkaTopicPartitionDoFn} knows that TopicPartition A is + * stopped/removed + * At 10:15AM, {@link WatchKafkaTopicPartitionDoFn
[beam] branch master updated: Fix the check on maxBufferingDuration
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ebd2a3a Fix the check on maxBufferingDuration new 5c31997 Merge pull request #13894 from [BEAM-11746] Fix test flakiness: GroupIntoBatches.testInGlobalWindow ebd2a3a is described below commit ebd2a3ae2f25f821fde471971be0db8d159df995 Author: sychen AuthorDate: Wed Feb 3 17:17:18 2021 -0800 Fix the check on maxBufferingDuration --- .../src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 18a0341..243dd87 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -354,7 +354,7 @@ public class GroupIntoBatches // buffering timer (if not null) since the state is empty now. It'll be extended again if a // new element arrives prior to the expiration time set here. // TODO(BEAM-10887): Use clear() when it's available. - if (bufferingTimer != null && maxBufferingDuration != null) { + if (bufferingTimer != null && maxBufferingDuration.isLongerThan(Duration.ZERO)) { bufferingTimer.offset(maxBufferingDuration).setRelative(); } }
[beam] branch master updated: Bump Dataflow worker container version.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 9cbef09 Bump Dataflow worker container version. new 4be9e20 Merge pull request #13890 from Bump Dataflow worker container version. 9cbef09 is described below commit 9cbef09f646d3ae3d1c7f3298eabf64cb94d5e6c Author: Tyson Hamilton AuthorDate: Wed Feb 3 18:03:51 2021 + Bump Dataflow worker container version. --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 9fb2a58..227a8b3 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -45,7 +45,7 @@ processResources { filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [ 'dataflow.legacy_environment_major_version' : '8', 'dataflow.fnapi_environment_major_version' : '8', -'dataflow.container_version' : 'beam-master-20200629' +'dataflow.container_version' : 'beam-master-20201116' ] }
[beam] branch master updated (489ab66 -> b56b61a)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 489ab66 [BEAM-11531] Allow pandas <1.3.0 (#13681) new 816f297 Add transform translator for GroupIntoBatches in Java. new 9b92f41 Fix checkStyle error; add a condition to disable autosharding for JRH new b4a9a99 Fail jobs that would otherwise fall back to the default implementation of GIB new 4a10c01 Fix failed test; update checkArgument logs. new b56b61a Merge pull request #13805 from [BEAM-10475] Add transform translator for GroupIntoBatches in Java. The 30470 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: runners/core-construction-java/build.gradle| 1 + .../construction/GroupIntoBatchesTranslation.java | 100 ++ .../core/construction/PTransformTranslation.java | 5 + .../GroupIntoBatchesTranslationTest.java | 106 +++ .../beam/runners/dataflow/DataflowRunner.java | 31 ++--- .../runners/dataflow/GroupIntoBatchesOverride.java | 5 +- .../dataflow/DataflowPipelineTranslatorTest.java | 146 + .../beam/runners/dataflow/DataflowRunnerTest.java | 11 +- .../beam/sdk/transforms/GroupIntoBatches.java | 66 ++ 9 files changed, 403 insertions(+), 68 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslationTest.java
[beam] branch master updated: Add tag to docker push
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 453b115 Add tag to docker push new e24892b Merge pull request #13880 from [BEAM-11739] Add tag to docker image when pushing to container registry 453b115 is described below commit 453b115b608744f11351c25f4e20e67240dfea5c Author: kileys AuthorDate: Tue Feb 2 19:25:03 2021 + Add tag to docker push --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 73a3882..9fb2a58 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -259,7 +259,7 @@ task buildAndPushDockerContainer() { commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerImageName}" } exec { - commandLine "gcloud", "docker", "--", "push", "${dockerImageContainer}" + commandLine "gcloud", "docker", "--", "push", "${dockerImageName}" } } }
[beam] branch master updated: [BEAM-11581] Minor fix to skip ExecutionStateSampler.reset() in ProcessBundleHandler
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2237c69 [BEAM-11581] Minor fix to skip ExecutionStateSampler.reset() in ProcessBundleHandler new 0e91f1f Merge pull request #13876 from [BEAM-11581] Minor fix to skip ExecutionStateSampler.reset() in ProcessBundleHandler 2237c69 is described below commit 2237c69b9be739fde89b083a6a8bb4507d429659 Author: Yichi Zhang AuthorDate: Mon Feb 1 16:37:12 2021 -0800 [BEAM-11581] Minor fix to skip ExecutionStateSampler.reset() in ProcessBundleHandler --- .../java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java| 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 4ecb5f5..fbcd71e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -699,7 +699,6 @@ public class ProcessBundleHandler { getpCollectionConsumerRegistry().reset(); getMetricsContainerRegistry().reset(); getStateTracker().reset(); - ExecutionStateSampler.instance().reset(); getBundleFinalizationCallbackRegistrations().clear(); for (ThrowingRunnable resetFunction : getResetFunctions()) { resetFunction.run();
[beam] branch master updated: [BEAM-10120] Add dynamic timer support to portable Flink.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b8c8bf3 [BEAM-10120] Add dynamic timer support to portable Flink. new 73731ec Merge pull request #13783 from [BEAM-10120] Add dynamic timer support to portable Flink b8c8bf3 is described below commit b8c8bf3644b449fc009ebe0a2a07cb155bba0989 Author: Boyuan Zhang AuthorDate: Wed Jan 20 19:09:38 2021 -0800 [BEAM-10120] Add dynamic timer support to portable Flink. --- runners/flink/job-server/flink_job_server.gradle | 1 - .../translation/wrappers/streaming/DoFnOperator.java | 19 --- .../streaming/ExecutableStageDoFnOperator.java| 7 +++ .../streaming/ExecutableStageDoFnOperatorTest.java| 4 +++- .../fnexecution/control/TimerReceiverFactory.java | 1 + .../translation/PipelineTranslatorUtils.java | 4 ++-- .../runners/portability/flink_runner_test.py | 3 --- 7 files changed, 25 insertions(+), 14 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 130dd97..aa68411 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -163,7 +163,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering' -excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap' excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 9c1e437..b463db26 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1412,6 +1412,10 @@ public class DoFnOperator return timer.getOutputTimestamp().isBefore(timer.getTimestamp()); } +private String constructTimerId(String timerFamilyId, String timerId) { + return timerFamilyId + "+" + timerId; +} + @Override public void setTimer( StateNamespace namespace, @@ -1437,7 +1441,10 @@ public class DoFnOperator timer.getTimerId(), timer.getTimestamp().getMillis(), timer.getOutputTimestamp().getMillis()); -String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace()); +String contextTimerId = +getContextTimerId( +constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), +timer.getNamespace()); @Nullable final TimerData oldTimer = pendingTimersById.get(contextTimerId); if (!timer.equals(oldTimer)) { // Only one timer can exist at a time for a given timer id and context. @@ -1500,7 +1507,10 @@ public class DoFnOperator */ void onFiredOrDeletedTimer(TimerData timer) { try { -pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace())); +pendingTimersById.remove( +getContextTimerId( +constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), +timer.getNamespace())); if (timer.getDomain() == TimeDomain.EVENT_TIME || StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) { if (timerUsesOutputTimestamp(timer)) { @@ -1532,7 +1542,10 @@ public class DoFnOperator @Override @Deprecated public void deleteTimer(TimerData timer) { - deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getDomain()); + deleteTimer( + timer.getNamespace(), + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getDomain()); } void deleteTimerInternal(TimerData timer) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index d479a3c..42949c0 100644 --- a/runners/flink/src/main/java/org/apache/be
[beam] branch master updated (9a4505e -> ebc3bc3)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 9a4505e Merge pull request #13762: [BEAM-11643] Add BigDecimal support for SpannerIO new a9d8612 [BEAM-11679] Override PubsubUnboundedSource transform for dataflow runner v2 new 56cebda Populate PubsubMessage message id new 02ec09c Enable tests new 197d338 Exclude failing FhirIO ITs new bc7930e Include runner v2 IT tests in java post commit new ebc3bc3 Merge pull request #13797 from [BEAM-11679] Override PubsubUnboundedSource transform for dataflow runner v2 The 30389 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: build.gradle | 1 + runners/google-cloud-dataflow-java/build.gradle| 13 +- .../beam/runners/dataflow/DataflowRunner.java | 207 - .../beam/runners/dataflow/TestDataflowRunner.java | 6 +- .../beam/sdk/testing/TestPipelineOptions.java | 10 + .../beam/sdk/io/gcp/pubsub/PubsubMessages.java | 7 +- .../beam/sdk/io/gcp/pubsub/PubsubReadIT.java | 6 +- 7 files changed, 185 insertions(+), 65 deletions(-)
[beam] branch master updated (e1b42bf -> 29b9542)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from e1b42bf Merge pull request #12647 from [BEAM-10378] Deserializing Azure Credentials new 3d1f25a [BEAM-10112] Add more state and timer python examples to website new 89a5e3d Address comments new 3a2a351 Render py new 29b9542 Merge pull request #13591 from [BEAM-10112] Add more state and timer python examples to website The 30326 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../content/en/documentation/programming-guide.md | 150 +++-- 1 file changed, 139 insertions(+), 11 deletions(-)
[beam] branch master updated: [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ae60a7b [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop. new 390fb7e Merge pull request #13695 from [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop ae60a7b is described below commit ae60a7bd89a9917920b9bd07bce798848e5f6fa4 Author: Yichi Zhang AuthorDate: Wed Jan 6 10:02:56 2021 -0800 [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop. --- .../harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java | 4 1 file changed, 4 insertions(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 89b0a7c..3a7b741 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -36,6 +36,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builde import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; @@ -251,11 +252,14 @@ public class FnHarness { JvmInitializers.runBeforeProcessing(options); + ExecutionStateSampler.instance().start(); + LOG.info("Entering instruction processing loop"); control.processInstructionRequests(executorService); processBundleHandler.shutdown(); } finally { System.out.println("Shutting SDK harness down."); + ExecutionStateSampler.instance().stop(); executorService.shutdown(); } }
[beam] branch master updated: feat: Allow Pub/Sub Lite Sink to support increasing partitions
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a77818a feat: Allow Pub/Sub Lite Sink to support increasing partitions new aa98bcf Merge pull request #13787 from [BEAM-10114] Allow Pub/Sub Lite Sink to support increasing partitions a77818a is described below commit a77818a7e75d13d7174b4cde5cd418080975e87d Author: Evan Palmer AuthorDate: Thu Jan 21 14:19:49 2021 -0500 feat: Allow Pub/Sub Lite Sink to support increasing partitions --- .../beam/sdk/io/gcp/pubsublite/Publishers.java | 24 -- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java index c44c3df..a0a075e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java @@ -22,9 +22,10 @@ import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.che import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher; +import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; -import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken; @@ -42,15 +43,16 @@ class Publishers { checkArgument(token.isSupertypeOf(supplied.getClass())); return (Publisher) supplied; } -return RoutingPublisherBuilder.newBuilder() -.setTopic(options.topicPath()) -.setPublisherFactory( -partition -> -SinglePartitionPublisherBuilder.newBuilder() -.setTopic(options.topicPath()) -.setPartition(partition) -.setContext(PubsubContext.of(FRAMEWORK)) -.build()) -.build(); +return new PartitionCountWatchingPublisher( +PartitionCountWatchingPublisherSettings.newBuilder() +.setTopic(options.topicPath()) +.setPublisherFactory( +partition -> +SinglePartitionPublisherBuilder.newBuilder() +.setTopic(options.topicPath()) +.setPartition(partition) +.setContext(PubsubContext.of(FRAMEWORK)) +.build()) +.build()); } }
[beam] branch master updated: re-remove finalizedBy cleanup for docker container test
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a35d31f re-remove finalizedBy cleanup for docker container test new 0dd6980 Merge pull request #13796 from [BEAM-11678] Fix Java DataflowRunner test container cleanup a35d31f is described below commit a35d31f49257c7513818f2a2b284ac6ad82855a1 Author: emil...@google.com AuthorDate: Fri Jan 22 14:27:32 2021 -0800 re-remove finalizedBy cleanup for docker container test --- runners/google-cloud-dataflow-java/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 197eff8..13ebafb 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -250,7 +250,6 @@ def createRunnerV2ValidatesRunnerTest = { Map args -> task buildAndPushDockerContainer() { def javaVer = project.hasProperty('compileAndRunTestsWithJava11') ? "java11" : "java8" dependsOn ":sdks:java:container:${javaVer}:docker" - finalizedBy 'cleanUpDockerImages' def defaultDockerImageName = containerImageName( name: "${project.docker_image_default_repo_prefix}${javaVer}_sdk", root: "apache",
[beam] branch master updated: Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 04ae01c Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka. new 8b759d1 Merge pull request #13779 from [BEAM-11677] Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka 04ae01c is described below commit 04ae01ca97e0258d831d80bc5216ec160605a981 Author: Boyuan Zhang AuthorDate: Tue Jan 19 15:49:34 2021 -0800 Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka. --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 29 -- .../beam/sdk/io/kafka/KafkaIOExternalTest.java | 6 +++- sdks/python/apache_beam/io/kafka.py| 34 +++--- .../runners/portability/flink_runner_test.py | 5 +++- 4 files changed, 60 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 60608e0..d77d4fa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -586,8 +586,23 @@ public class KafkaIO { setMaxReadTime(Duration.standardSeconds(config.maxReadTime)); } setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords); -setCommitOffsetsInFinalizeEnabled(false); -setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); + +// Set committing offset configuration. +setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize); + +// Set timestamp policy with built-in types. +String timestampPolicy = config.timestampPolicy; +if (timestampPolicy.equals("ProcessingTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()); +} else if (timestampPolicy.equals("CreateTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO)); +} else if (timestampPolicy.equals("LogAppendTime")) { + setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime()); +} else { + throw new IllegalArgumentException( + "timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)"); +} + if (config.startReadTime != null) { setStartReadTime(Instant.ofEpochMilli(config.startReadTime)); } @@ -645,6 +660,8 @@ public class KafkaIO { private Long startReadTime; private Long maxNumRecords; private Long maxReadTime; +private Boolean commitOffsetInFinalize; +private String timestampPolicy; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -673,6 +690,14 @@ public class KafkaIO { public void setMaxReadTime(Long maxReadTime) { this.maxReadTime = maxReadTime; } + +public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) { + this.commitOffsetInFinalize = commitOffsetInFinalize; +} + +public void setTimestampPolicy(String timestampPolicy) { + this.timestampPolicy = timestampPolicy; +} } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 82c67eb..39b2981 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -89,12 +89,16 @@ public class KafkaIOExternalTest { "consumer_config", FieldType.map(FieldType.STRING, FieldType.STRING)), Field.of("key_deserializer", FieldType.STRING), Field.of("value_deserializer", FieldType.STRING), -Field.of("start_read_time", FieldType.INT64))) +Field.of("start_read_time", FieldType.INT64), +Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), +Field.of("timestamp_policy", FieldType.STRING))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) .withFieldValue("value_deserializer", valueDeserializer) .
[beam] branch master updated: [BEAM-10402] Add nullability annotations to SplitResult.of
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new aa57b51 [BEAM-10402] Add nullability annotations to SplitResult.of new 8194761 Merge pull request #13769 from [BEAM-10402] Add nullability annotations to SplitResult.of aa57b51 is described below commit aa57b51cad5d73beffd6d687b7ef9d7721ed8b7b Author: Jan Lukavsky AuthorDate: Tue Jan 19 15:09:24 2021 +0100 [BEAM-10402] Add nullability annotations to SplitResult.of --- .../java/org/apache/beam/sdk/transforms/splittabledofn/SplitResult.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/SplitResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/SplitResult.java index 8c44ae4..dd02101 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/SplitResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/SplitResult.java @@ -28,7 +28,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; public abstract class SplitResult { /** Returns a {@link SplitResult} for the specified primary and residual restrictions. */ public static SplitResult of( - RestrictionT primary, RestrictionT residual) { + @Nullable RestrictionT primary, @Nullable RestrictionT residual) { return new AutoValue_SplitResult(primary, residual); }
[beam] branch master updated: [BEAM-11325] ReadFromKafkaDoFn should stop reading when topic/partition is removed or marked as stopped.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e8eed0b [BEAM-11325] ReadFromKafkaDoFn should stop reading when topic/partition is removed or marked as stopped. new d2e1f69 Merge pull request #13710 from [BEAM-11325] ReadFromKafkaDoFn should stop reading when topic/partition is removed or marked as stopped e8eed0b is described below commit e8eed0bf70c334fe59327f0d70453302935410ee Author: Boyuan Zhang AuthorDate: Fri Jan 8 13:43:06 2021 -0800 [BEAM-11325] ReadFromKafkaDoFn should stop reading when topic/partition is removed or marked as stopped. --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 36 ++- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 43 ++- .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 344 + 3 files changed, 421 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 60608e0..93759e6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -141,6 +141,11 @@ import org.slf4j.LoggerFactory; * // offset consumed by the pipeline can be committed back. * .commitOffsetsInFinalize() * + * // Specified a serializable function which can determine whether to stop reading from given + * // TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the + * // signal. + * .withCheckStopReadingFn(new SerializedFunction() {}) + * * // finally, if you don't need Kafka metadata, you can drop it.g * .withoutMetadata() // PCollection> * ) @@ -514,6 +519,8 @@ public class KafkaIO { abstract @Nullable DeserializerProvider getValueDeserializerProvider(); +abstract @Nullable SerializableFunction getCheckStopReadingFn(); + abstract Builder toBuilder(); @Experimental(Kind.PORTABILITY) @@ -553,6 +560,9 @@ public class KafkaIO { abstract Builder setValueDeserializerProvider( DeserializerProvider deserializerProvider); + abstract Builder setCheckStopReadingFn( + SerializableFunction checkStopReadingFn); + abstract Read build(); @Override @@ -998,6 +1008,15 @@ public class KafkaIO { return toBuilder().setConsumerConfig(config).build(); } +/** + * A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn} + * should stop reading from the given {@link TopicPartition}. + */ +public Read withCheckStopReadingFn( +SerializableFunction checkStopReadingFn) { + return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build(); +} + /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform>> withoutMetadata() { return new TypedWithoutMetadata<>(this); @@ -1080,7 +1099,8 @@ public class KafkaIO { .withKeyDeserializerProvider(getKeyDeserializerProvider()) .withValueDeserializerProvider(getValueDeserializerProvider()) .withManualWatermarkEstimator() - .withTimestampPolicyFactory(getTimestampPolicyFactory()); + .withTimestampPolicyFactory(getTimestampPolicyFactory()) + .withCheckStopReadingFn(getCheckStopReadingFn()); if (isCommitOffsetsInFinalizeEnabled()) { readTransform = readTransform.commitOffsets(); } @@ -1267,6 +1287,8 @@ public class KafkaIO { abstract SerializableFunction, Consumer> getConsumerFactoryFn(); +abstract @Nullable SerializableFunction getCheckStopReadingFn(); + abstract @Nullable SerializableFunction, Instant> getExtractOutputTimestampFn(); @@ -1289,6 +1311,9 @@ public class KafkaIO { abstract ReadSourceDescriptors.Builder setConsumerFactoryFn( SerializableFunction, Consumer> consumerFactoryFn); + abstract ReadSourceDescriptors.Builder setCheckStopReadingFn( + SerializableFunction checkStopReadingFn); + abstract ReadSourceDescriptors.Builder setKeyDeserializerProvider( DeserializerProvider deserializerProvider); @@ -1403,6 +1428,15 @@ public class KafkaIO { } /** + * A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn} + * should stop reading from the given {@link TopicPartition}. + */ +public ReadSourceDescriptors withCheckStopReadingFn( +SerializableFunction checkStopReadingFn) { + return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build(); +} + +/** * Updat
[beam] branch master updated (4bdafec -> 343a847)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 4bdafec Merge pull request #13734 from [BEAM-11497] Avoid zerodivision error in custom BQ source new c9004b2 feat: Add partition increase handling to PubsubLiteIO new b109e21 fix: Ensure manually set partitions are not ignored. new 3ca4360 fix: Format new 1db9ec3 fix: Format new 49c5ed0 fix: Remove guava usage. new 6a70085 fix: docs new 343a847 Merge pull request #13567 from [BEAM-10114] Add partition increase handling to PubsubLiteIO The 30242 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sdk/io/gcp/pubsublite/SubscribeTransform.java | 40 ++-- .../sdk/io/gcp/pubsublite/SubscriberOptions.java | 30 +- .../pubsublite/SubscriptionPartitionLoader.java| 102 + .../SubscriptionPartitionLoaderTest.java | 84 + 4 files changed, 221 insertions(+), 35 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoaderTest.java
[beam] branch master updated (dd71c9a -> 6c9da02)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from dd71c9a Update doc/examples: BigQuerySource to ReadFromBigQuery (#13239) new 4c8f0b0 [BEAM-11474] Set log entry transform id with best effort new 327ec71 Address comment new 6c9da02 Merge pull request #13702 from [BEAM-11474] Set log entry transform id with best effort in Java SDK harness The 30200 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runners/core/metrics/ExecutionStateTracker.java | 20 .../beam/fn/harness/logging/BeamFnLoggingClient.java | 15 +++ 2 files changed, 31 insertions(+), 4 deletions(-)
[beam] branch release.2.24.0 created (now 9f9dcd6)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch release.2.24.0 in repository https://gitbox.apache.org/repos/asf/beam.git. at 9f9dcd6 [BEAM-9980] do not hardcode Python version for dataflow validate runner tests and make it switchable (#13330) No new revisions were added by this update.
[beam] branch release.2-24.0 created (now 9b43fad)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch release.2-24.0 in repository https://gitbox.apache.org/repos/asf/beam.git. at 9b43fad [Gradle Release Plugin] - new version commit: 'v2.22.0-SNAPSHOT'. No new revisions were added by this update.
[beam] branch master updated: Cache UnboundedReader per CheckpointMark in SDF Wrapper DoFn.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 98ee1f1 Cache UnboundedReader per CheckpointMark in SDF Wrapper DoFn. new b6243e7 Merge pull request #13592 from [BEAM-11403] Cache UnboundedReader per UnboundedSourceRestriction in SDF Wrapper DoFn. 98ee1f1 is described below commit 98ee1f178a9e80f4694f86775c06a54ecf82abb8 Author: Boyuan Zhang AuthorDate: Mon Dec 21 15:13:32 2020 -0800 Cache UnboundedReader per CheckpointMark in SDF Wrapper DoFn. --- .../src/main/java/org/apache/beam/sdk/io/Read.java | 96 +++ .../org/apache/beam/sdk/testing/TestPipeline.java | 45 +++ .../test/java/org/apache/beam/sdk/io/ReadTest.java | 130 + 3 files changed, 247 insertions(+), 24 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index e2f7a8f..4982066 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; @@ -27,6 +28,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.InstantCoder; @@ -60,6 +62,9 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn; import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -439,6 +444,8 @@ public class Read { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class); private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; private final Coder checkpointCoder; +private Cache> cachedReaders; +private Coder> restrictionCoder; private UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { this.checkpointCoder = checkpointCoder; @@ -450,6 +457,27 @@ public class Read { return UnboundedSourceRestriction.create(element, null, BoundedWindow.TIMESTAMP_MIN_VALUE); } +@Setup +public void setUp() throws Exception { + restrictionCoder = restrictionCoder(); + cachedReaders = + CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(100) + .removalListener( + (RemovalListener) + removalNotification -> { +if (removalNotification.wasEvicted()) { + try { +removalNotification.getValue().close(); + } catch (IOException e) { +LOG.warn("Failed to close UnboundedReader.", e); + } +} + }) + .build(); +} + @SplitRestriction public void splitRestriction( @Restriction UnboundedSourceRestriction restriction, @@ -488,7 +516,10 @@ public class Read { restrictionTracker( @Restriction UnboundedSourceRestriction restriction, PipelineOptions pipelineOptions) { - return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); + checkNotNull(restrictionCoder); + checkNotNull(cachedReaders); + return new UnboundedSourceAsSDFRestrictionTracker( + restriction, pipelineOptions, cachedReaders, restrictionCoder); } @ProcessElement @@ -756,22 +787,47 @@ public class Read { private final PipelineOptions pipelineOptions; private UnboundedSource.UnboundedReader currentReader; private boolean readerHasBeenStarted; + private Cache> cachedReaders; + private Coder> restrictionCoder; UnboundedSourceAsSDFRestrictionTracker( UnboundedSourceRestriction initialRestriction, - PipelineOptions pipelineOptions) { +
[beam] branch master updated: Remove usages of WriteStringsToPubSub in examples
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new feb2b84 Remove usages of WriteStringsToPubSub in examples new b0e3d36 Merge pull request #13615 from [BEAM-11524] Remove usages of WriteStringsToPubSub in examples feb2b84 is described below commit feb2b84c6448eaa2750bf3166c4c0d109ed8c69e Author: Brian Hulette AuthorDate: Thu Dec 24 08:24:39 2020 -0800 Remove usages of WriteStringsToPubSub in examples --- sdks/python/apache_beam/examples/snippets/snippets.py | 6 +++--- sdks/python/apache_beam/examples/snippets/snippets_test.py | 10 +- sdks/python/apache_beam/examples/sql_taxi.py | 3 ++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index a5d2b7b..42e5886 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -697,12 +697,12 @@ def examples_wordcount_streaming(argv): | 'Group' >> beam.GroupByKey() | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) -| -'Format' >> beam.Map(lambda word_and_count: '%s: %d' % word_and_count)) +| 'Format' >> +beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8'))) # [START example_wordcount_streaming_write] # Write to Pub/Sub -output | beam.io.WriteStringsToPubSub(known_args.output_topic) +output | beam.io.WriteToPubSub(known_args.output_topic) # [END example_wordcount_streaming_write] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 238ecc8..35c60e7 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -748,7 +748,7 @@ class SnippetsTest(unittest.TestCase): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') @mock.patch('apache_beam.io.ReadFromPubSub') - @mock.patch('apache_beam.io.WriteStringsToPubSub') + @mock.patch('apache_beam.io.WriteToPubSub') def test_examples_wordcount_streaming(self, *unused_mocks): def FakeReadFromPubSub(topic=None, subscription=None, values=None): expected_topic = topic @@ -768,7 +768,7 @@ class SnippetsTest(unittest.TestCase): def expand(self, pcoll): assert_that(pcoll, self.matcher) -def FakeWriteStringsToPubSub(topic=None, values=None): +def FakeWriteToPubSub(topic=None, values=None): expected_topic = topic def _inner(topic=None, subscription=None): @@ -785,11 +785,11 @@ class SnippetsTest(unittest.TestCase): TimestampedValue(b'a b c c c', 20) ] output_topic = 'projects/fake-beam-test-project/topic/outtopic' -output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3'] +output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3'] beam.io.ReadFromPubSub = ( FakeReadFromPubSub(topic=input_topic, values=input_values)) -beam.io.WriteStringsToPubSub = ( -FakeWriteStringsToPubSub(topic=output_topic, values=output_values)) +beam.io.WriteToPubSub = ( +FakeWriteToPubSub(topic=output_topic, values=output_values)) snippets.examples_wordcount_streaming([ '--input_topic', 'projects/fake-beam-test-project/topic/intopic', diff --git a/sdks/python/apache_beam/examples/sql_taxi.py b/sdks/python/apache_beam/examples/sql_taxi.py index 607dea1..32fd80a 100644 --- a/sdks/python/apache_beam/examples/sql_taxi.py +++ b/sdks/python/apache_beam/examples/sql_taxi.py @@ -80,7 +80,8 @@ def run(output_topic, pipeline_args): "window_end": window.end.to_rfc3339() }) | "Convert to JSON" >> beam.Map(json.dumps) -| beam.io.WriteStringsToPubSub(topic=output_topic)) +| "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8")) +| beam.io.WriteToPubSub(topic=output_topic)) if __name__ == '__main__':
[beam] branch master updated: Release bundle processor when any exceptions during processing.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e9fa621 Release bundle processor when any exceptions during processing. new 060b083 Merge pull request #13568 from [BEAM-3245] Release bundle processor when any exceptions during processing. e9fa621 is described below commit e9fa621a10ce6a84179fba1d54b413535a1a46a7 Author: Boyuan Zhang AuthorDate: Wed Dec 16 10:44:57 2020 -0800 Release bundle processor when any exceptions during processing. --- .../fn/harness/control/ProcessBundleHandler.java | 87 -- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index e93c67b..4ecb5f5 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -293,55 +293,60 @@ public class ProcessBundleHandler { ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker(); QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient(); -try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) { - try (Closeable closeTracker = stateTracker.activate()) { -// Already in reverse topological order so we don't need to do anything. -for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) { - LOG.debug("Starting function {}", startFunction); - startFunction.run(); -} +try { + try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) { +try (Closeable closeTracker = stateTracker.activate()) { + // Already in reverse topological order so we don't need to do anything. + for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) { +LOG.debug("Starting function {}", startFunction); +startFunction.run(); + } -queueingClient.drainAndBlock(); + queueingClient.drainAndBlock(); -// Need to reverse this since we want to call finish in topological order. -for (ThrowingRunnable finishFunction : -Lists.reverse(finishFunctionRegistry.getFunctions())) { - LOG.debug("Finishing function {}", finishFunction); - finishFunction.run(); + // Need to reverse this since we want to call finish in topological order. + for (ThrowingRunnable finishFunction : + Lists.reverse(finishFunctionRegistry.getFunctions())) { +LOG.debug("Finishing function {}", finishFunction); +finishFunction.run(); + } } - } - // Add all checkpointed residuals to the response. - response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots()); - - // TODO(BEAM-6597): This should be reporting monitoring infos using the short id system. - // Get start bundle Execution Time Metrics. - response.addAllMonitoringInfos( - bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos()); - // Get process bundle Execution Time Metrics. - response.addAllMonitoringInfos( - bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos()); - // Get finish bundle Execution Time Metrics. - response.addAllMonitoringInfos( - bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos()); - // Extract MonitoringInfos that come from the metrics container registry. - response.addAllMonitoringInfos( - bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos()); - // Add any additional monitoring infos that the "runners" report explicitly. - for (ProgressRequestCallback progressRequestCallback : - bundleProcessor.getProgressRequestCallbacks()) { - response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos()); - } +// Add all checkpointed residuals to the response. + response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots()); + +// TODO(BEAM-6597): This should be reporting monitoring infos using the short id system. +// Get start bundle Execution Time Metrics. +response.addAllMonitoringInfos( + bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos()); +// Get process bundle Execution Time Metrics. +response.addAllMonitoringInfos(
[beam] branch master updated (8de3288 -> 6869dfa)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 8de3288 [BEAM-10925] Add interface for SQL Java scalar UDF. (#13305) new 554d254 Track transform processing thread in Java SDK harness and set log entry transform id field. new a6ba301 Add javadoc new 23b2007 Add tracking to other ProcessElement methods new 945e794 Use LoadingCache instead of ConcurrentHashMap to limit the size for thread tracker new 857d41f Address comment new c8d9d1d Fix checkstyle new 6869dfa Merge pull request #13533 from [BEAM-11474] Track transform processing thread in Java SDK harness and set log entry field The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/beam/fn/harness/FnApiDoFnRunner.java| 27 .../harness/TransformProcessingThreadTracker.java | 49 ++ .../fn/harness/logging/BeamFnLoggingClient.java| 8 3 files changed, 84 insertions(+) create mode 100644 sdks/java/harness/src/main/java/org/apache/beam/fn/harness/TransformProcessingThreadTracker.java
[beam] branch master updated: fix PerPartitionSubscriptionSdf methods with incorrect visibility
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 97ad770 fix PerPartitionSubscriptionSdf methods with incorrect visibility new 92209d1 Merge pull request #13566 from fix PerPartitionSubscriptionSdf methods with incorrect visibility 97ad770 is described below commit 97ad7702ea1d3d01a1b6e52d87dec3fc75a6a886 Author: dpcollins-google <40498610+dpcollins-goo...@users.noreply.github.com> AuthorDate: Wed Dec 16 13:00:50 2020 -0500 fix PerPartitionSubscriptionSdf methods with incorrect visibility --- .../beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java index 6c47908..c9e1e69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java @@ -57,12 +57,12 @@ class PerSubscriptionPartitionSdf extends DoFn
[beam] branch master updated (f137246 -> f87c984)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from f137246 Merge pull request #13556 [BEAM-11472] Support SDF on non-splitting runners such as legacy Dataflow. new 31dcd53 [BEAM-10114] Convert PubsubLiteIO read to use SplittableDoFn. new ec3efe2 Fix test. new d3336fe fix: Testing issues new 8892c1c [BEAM-10114] Convert PubsubLiteIO to use an SDF for reads. new f5c6108 [BEAM-10114] Add byte and time limiting new 6d8c0a9 [BEAM-10114] Add byte and time limiting new 762b7ba [BEAM-10114] Add byte and time limiting new 65b2c42 [BEAM-10114] Add byte and time limiting new 418b3ac [BEAM-10114] Add byte and time limiting new 843fc4a [BEAM-10114] Add byte and time limiting new fa3fc2d fix: Linter issues new ea33f9e fix: linters new 88a3021 fix: linters new b731da8 fix: Remove unnecessary shuffle. new a432e8f fix: Address comments new f87c984 Merge pull request #13470 from [BEAM-10114] Convert PubsubLiteIO read to use SplittableDoFn. The 29980 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...zableSupplier.java => InitialOffsetReader.java} | 11 +- .../io/gcp/pubsublite/InitialOffsetReaderImpl.java | 54 .../gcp/pubsublite/LimitingTopicBacklogReader.java | 81 + ...ffsetFinalizer.java => OffsetByteProgress.java} | 17 +- .../io/gcp/pubsublite/OffsetByteRangeTracker.java | 172 +++ .../io/gcp/pubsublite/OffsetCheckpointMark.java| 74 - .../pubsublite/PerSubscriptionPartitionSdf.java| 110 +++ .../beam/sdk/io/gcp/pubsublite/PublisherCache.java | 2 +- .../beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java | 14 +- .../gcp/pubsublite/PubsubLiteUnboundedReader.java | 333 - .../gcp/pubsublite/PubsubLiteUnboundedSource.java | 126 ...zer.java => SerializableSubscriberFactory.java} | 17 +- .../io/gcp/pubsublite/SerializableSupplier.java| 2 +- .../sdk/io/gcp/pubsublite/SubscribeTransform.java | 124 .../sdk/io/gcp/pubsublite/SubscriberOptions.java | 42 ++- ...etFinalizer.java => SubscriptionPartition.java} | 19 +- ...dCoder.java => SubscriptionPartitionCoder.java} | 36 ++- ...er.java => SubscriptionPartitionProcessor.java} | 14 +- ... => SubscriptionPartitionProcessorFactory.java} | 14 +- .../SubscriptionPartitionProcessorImpl.java| 152 ++ .../sdk/io/gcp/pubsublite/TopicBacklogReader.java | 33 +- .../io/gcp/pubsublite/TopicBacklogReaderImpl.java | 80 ++--- .../gcp/pubsublite/TopicBacklogReaderSettings.java | 31 +- .../gcp/pubsublite/TranslatingPullSubscriber.java | 55 .../beam/sdk/io/gcp/pubsublite/UuidCoder.java | 7 +- .../gcp/pubsublite/OffsetByteRangeTrackerTest.java | 158 ++ .../gcp/pubsublite/OffsetCheckpointMarkTest.java | 81 - .../PerSubscriptionPartitionSdfTest.java | 189 .../pubsublite/PubsubLiteUnboundedReaderTest.java | 330 .../SubscriptionPartitionProcessorImplTest.java| 227 ++ .../gcp/pubsublite/TopicBacklogReaderImplTest.java | 129 ++-- 31 files changed, 1493 insertions(+), 1241 deletions(-) copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/{SerializableSupplier.java => InitialOffsetReader.java} (80%) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/InitialOffsetReaderImpl.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/LimitingTopicBacklogReader.java copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/{OffsetFinalizer.java => OffsetByteProgress.java} (68%) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetCheckpointMark.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java copy sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/{OffsetFinalizer.java => SerializableSubscriberFactory.java} (65%) create mode 1
[beam] branch master updated: Bump up python container versions
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3d62aab Bump up python container versions new 3710357 Merge pull request #13552 from Bump up python container versions 3d62aab is described below commit 3d62aabbce08801e75d64a288497f5223062ee8c Author: Yichi Zhang AuthorDate: Mon Dec 14 18:32:45 2020 -0800 Bump up python container versions --- sdks/python/apache_beam/runners/dataflow/internal/names.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 79ef7c1..b3d7831 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -41,10 +41,10 @@ SERIALIZED_SOURCE_KEY = 'serialized_source' # Update this version to the next version whenever there is a change that will # require changes to legacy Dataflow worker execution environment. -BEAM_CONTAINER_VERSION = 'beam-master-20201116' +BEAM_CONTAINER_VERSION = 'beam-master-20201214' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20201116' +BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20201214' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3'
[beam] branch master updated: Skip dynamic timer test in portable spark test
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7e09870 Skip dynamic timer test in portable spark test new 5878cc5 Merge pull request #13553 from Skip dynamic timer test in portable spark test 7e09870 is described below commit 7e09870a7e8ef10579f8b2681f61d57435253311 Author: Yichi Zhang AuthorDate: Tue Dec 15 10:42:03 2020 -0800 Skip dynamic timer test in portable spark test --- sdks/python/apache_beam/runners/portability/spark_runner_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py b/sdks/python/apache_beam/runners/portability/spark_runner_test.py index 4cf7e4b..062e06f 100644 --- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py @@ -172,6 +172,9 @@ class SparkRunnerTest(portable_runner_test.PortableRunnerTest): # Skip until Spark runner supports SDF and self-checkpoint. raise unittest.SkipTest("BEAM-7222") + def test_pardo_dynamic_timer(self): +raise unittest.SkipTest("BEAM-9912") + def test_flattened_side_input(self): # Blocked on support for transcoding # https://jira.apache.org/jira/browse/BEAM-7236
[beam] branch master updated: Enable more tests on Java + Python FnRunner
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 447d9ae Enable more tests on Java + Python FnRunner new 531cded Merge pull request #13547 from [BEAM-10450][BEAM-10449][BEAM-10453][BEAM-10995] Enable more Java tests on Python FnRunner 447d9ae is described below commit 447d9ae65bea59719491301fa1464dd940ff Author: Boyuan Zhang AuthorDate: Mon Dec 14 11:24:40 2020 -0800 Enable more tests on Java + Python FnRunner --- runners/portability/java/build.gradle | 24 1 file changed, 24 deletions(-) diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle index 32f64d6..137769b 100644 --- a/runners/portability/java/build.gradle +++ b/runners/portability/java/build.gradle @@ -140,7 +140,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' - excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' @@ -154,9 +153,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" // https://issues.apache.org/jira/browse/BEAM-10446 excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics' - // This test seems erroneously labeled ValidatesRunner - excludeTestsMatching 'org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy' - // Teardown not called in exceptions // https://issues.apache.org/jira/browse/BEAM-10447 excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' @@ -176,23 +172,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows' - // Flatten with empty PCollections hangs - // https://issues.apache.org/jira/browse/BEAM-10450 - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty' - excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo' - - // Empty side inputs hang - // https://issues.apache.org/jira/browse/BEAM-10449 - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputFixedToFixedWithDefault' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyIterableSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyListSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInputWithNonDeterministicKeyCoder' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInput' - excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInputWithNonDeterministicKeyCoder' - // Misc failures // https://issues.apache.org/jira/browse/BEAM-10451 excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers' @@ -202,9 +181,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" // https://issues.apache.org/jira/browse/BEAM-10454 excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo' - // https://issues.apache.org/jira/browse/BEAM-10453 - excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming' - // https://issues.apache.org/jira/browse/BEAM-10452 excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
[beam] branch master updated (77683be -> 8cf88c3)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 77683be [BEAM-11439] Fix Snowflake streaming docs (#13528) new a679bab [BEAM-9602] Add timer family support for python SDK new 8cf88c3 Merge pull request #13421 from [BEAM-9602] Add timer family support for python SDK The 29923 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/python/apache_beam/runners/common.pxd | 1 + sdks/python/apache_beam/runners/common.py | 28 ++- .../apache_beam/runners/direct/direct_userstate.py | 35 ++-- .../runners/direct/transform_evaluator.py | 8 +- sdks/python/apache_beam/runners/direct/util.py | 18 +- .../runners/direct/watermark_manager.py| 10 +- .../runners/portability/flink_runner_test.py | 3 + .../portability/fn_api_runner/fn_runner_test.py| 24 +++ .../apache_beam/runners/worker/bundle_processor.py | 41 ++-- .../apache_beam/runners/worker/operations.py | 6 +- sdks/python/apache_beam/transforms/core.py | 3 +- sdks/python/apache_beam/transforms/trigger.py | 39 ++-- sdks/python/apache_beam/transforms/trigger_test.py | 10 +- sdks/python/apache_beam/transforms/userstate.py| 55 ++ .../apache_beam/transforms/userstate_test.py | 216 - 15 files changed, 385 insertions(+), 112 deletions(-)
[beam] branch master updated: Add a small announcement for Splittable DoFn.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 379d796 Add a small announcement for Splittable DoFn. new c4af7f9 Merge pull request #13456 from [BEAM-10480] Add a small announcement for Splittable DoFn. 379d796 is described below commit 379d7967444db0b7f5ab932fe331f9e0dea83d63 Author: Boyuan Zhang AuthorDate: Tue Dec 1 17:42:26 2020 -0800 Add a small announcement for Splittable DoFn. --- .../en/blog/splittable-do-fn-is-available.md | 91 ++ 1 file changed, 91 insertions(+) diff --git a/website/www/site/content/en/blog/splittable-do-fn-is-available.md b/website/www/site/content/en/blog/splittable-do-fn-is-available.md new file mode 100644 index 000..5e73a8c --- /dev/null +++ b/website/www/site/content/en/blog/splittable-do-fn-is-available.md @@ -0,0 +1,91 @@ +--- +title: "Splittable DoFn in Apache Beam is Ready to Use" +date: 2020-12-14 00:00:01 -0800 +categories: + - blog +aliases: + - /blog/2020/12/14/splittable-do-fn-is-available.html +authors: + - boyuanzz +--- + + +We are pleased to announce that Splittable DoFn (SDF) is ready for use in the Beam Python, Java, +and Go SDKs for versions 2.25.0 and later. + +In 2017, [Splittable DoFn Blog Post](https://beam.apache.org/blog/splittable-do-fn/) proposed +to build [Splittable DoFn](https://s.apache.org/splittable-do-fn) APIs as the new recommended way of +building I/O connectors. Splittable DoFn is a generalization of `DoFn` that gives it the core +capabilities of `Source` while retaining `DoFn`'s syntax, flexibility, modularity, and ease of +coding. Thus, it becomes much easier to develop complex I/O connectors with simpler and reusable +code. + +SDF has three advantages over the existing `UnboundedSource` and `BoundedSource`: +* SDF provides a unified set of APIs to handle both unbounded and bounded cases. +* SDF enables reading from source descriptors dynamically. + - Taking KafkaIO as an example, within `UnboundedSource`/`BoundedSource` API, you must specify + the topic and partition you want to read from during pipeline construction time. There is no way + for `UnboundedSource`/`BoundedSource` to accept topics and partitions as inputs during execution + time. But it's built-in to SDF. +* SDF fits in as any node on a pipeline freely with the ability of splitting. + - `UnboundedSource`/`BoundedSource` has to be the root node of the pipeline to gain performance + benefits from splitting strategies, which limits many real-world usages. This is no longer a limit + for an SDF. + +As SDF is now ready to use with all the mentioned improvements, it is the recommended +way to build the new I/O connectors. Try out building your own Splittable DoFn by following the +[programming guide](https://beam.apache.org/documentation/programming-guide/#splittable-dofns). We +have provided tonnes of common utility classes such as common types of `RestrictionTracker` and +`WatermarkEstimator` in Beam SDK, which will help you onboard easily. As for the existing I/O +connectors, we have wrapped `UnboundedSource` and `BoundedSource` implementations into Splittable +DoFns, yet we still encourage developers to convert `UnboundedSource`/`BoundedSource` into actual +Splittable DoFn implementation to gain more performance benefits. + +Many thanks to every contributor who brought this highly anticipated design into the data processing +world. We are really excited to see that users benefit from SDF. + +Below are some real-world SDF examples for you to explore. + +## Real world Splittable DoFn examples + +**Java Examples** + +* [Kafka](https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L118): +An I/O connector for [Apache Kafka](https://kafka.apache.org/) +(an open-source distributed event streaming platform). +* [Watch](https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787): +Uses a polling function producing a growing set of outputs for each input until a per-input +termination condition is met. +* [Parquet](https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L365): +An I/O connector for [Apache Parquet](https://parquet.apache.org/) +(an open-source columnar storage format). +* [HL7v2](https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java#L493): +An I/O connector for HL7v2 messages (a clinical messaging format that provides data about events +
[beam] branch master updated: Add SDF capability_matrix.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5e1c544 Add SDF capability_matrix. new 1481977 Merge pull request #13368 from [BEAM-10480] Add SDF capability_matrix. 5e1c544 is described below commit 5e1c544c46789c7bb9a75f6d29e84d2f7fbc5126 Author: Boyuan Zhang AuthorDate: Tue Nov 17 17:03:22 2020 -0800 Add SDF capability_matrix. --- website/www/site/data/capability_matrix.yaml | 714 +++-- .../www/site/data/capability_matrix_snapshot.yaml | 195 ++ 2 files changed, 862 insertions(+), 47 deletions(-) diff --git a/website/www/site/data/capability_matrix.yaml b/website/www/site/data/capability_matrix.yaml index 8c2b79f..dacddee 100644 --- a/website/www/site/data/capability_matrix.yaml +++ b/website/www/site/data/capability_matrix.yaml @@ -36,6 +36,10 @@ capability-matrix: name: Hazelcast Jet - class: twister2 name: Twister2 +- class: python direct + name: Python Direct FnRunner +- class: go direct + name: Go Direct Runner categories: - description: What is being computed? @@ -95,6 +99,14 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: GroupByKey values: - class: model @@ -145,6 +157,14 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: Flatten values: - class: model @@ -195,6 +215,14 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: Combine values: - class: model @@ -245,6 +273,14 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: Composite Transforms values: - class: model @@ -295,6 +331,14 @@ capability-matrix: l1: "Partially" l2: supported via inlining l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: Side Inputs values: - class: model @@ -345,6 +389,14 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" +- class: python direct + l1: "" + l2: + l3: "" +- class: go direct + l1: "" + l2: + l3: "" - name: Source API values: - class: model @@ -395,55 +447,13 @@ capability-matrix: l1: "Yes" l2: fully supported l3: "" -- name: Splittable DoFn (SDF) - values: -- class: model - l1: "Partially" - l2: DoFn where processing of each element can be split for parallelism, or suspended and resumed - l3: Allows users to develop DoFn's that process a single element in portions ("restrictions"), executed in parallel or sequentially. This supersedes the unbounded and bounded `Source` APIs by supporting all of their features on a per-element basis. See http://s.apache.org/splittable-do-fn. Design is in progress on achieving parity with Source API regarding progress signals. -- class: dataflow - l1: "Yes" - l2: - l3: Does not yet support autotuning features of the Source API. -- class: flink -
[beam] branch master updated (310a710 -> bc0bac7)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 310a710 Merge pull request #13502: [BEAM-11415] Setup and bugfixes for Go ValidatesRunner framework. new b942279 [BEAM-10114] Bump Pub/Sub lite version and clean up settings classes. new c1d9794 [BEAM-10114] Fix lint errors new bc0bac7 Merge pull request #13509 from [BEAM-10114] Bump Pub/Sub lite version and clean up settings classes. The 29897 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../sdk/io/gcp/pubsublite/CloudPubsubChecks.java | 21 ++-- .../io/gcp/pubsublite/OffsetCheckpointMark.java| 12 +-- .../sdk/io/gcp/pubsublite/OffsetFinalizer.java | 4 +- .../beam/sdk/io/gcp/pubsublite/PublisherCache.java | 8 +- .../sdk/io/gcp/pubsublite/PublisherOptions.java| 47 +++-- .../sdk/io/gcp/pubsublite/PublisherOrError.java| 6 +- .../beam/sdk/io/gcp/pubsublite/Publishers.java | 56 ++ .../beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java | 18 ++-- .../gcp/pubsublite/PubsubLiteUnboundedReader.java | 38 +++ .../gcp/pubsublite/PubsubLiteUnboundedSource.java | 42 +++- .../gcp/pubsublite/SerializableStatusFunction.java | 26 - .../io/gcp/pubsublite/SerializableSupplier.java| 3 +- .../sdk/io/gcp/pubsublite/SubscriberOptions.java | 117 - .../sdk/io/gcp/pubsublite/TopicBacklogReader.java | 3 +- .../gcp/pubsublite/TopicBacklogReaderSettings.java | 27 ++--- .../gcp/pubsublite/TranslatingPullSubscriber.java | 11 +- .../gcp/pubsublite/UuidDeduplicationOptions.java | 15 +-- .../cloud/spanner/FakeBatchTransactionId.java | 1 + .../sdk/io/gcp/pubsublite/PubsubLiteSinkTest.java | 47 - .../pubsublite/PubsubLiteUnboundedReaderTest.java | 2 +- .../gcp/pubsublite/TopicBacklogReaderImplTest.java | 17 ++- .../pubsublite/UuidDeduplicationTransformTest.java | 13 +-- 23 files changed, 234 insertions(+), 302 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SerializableStatusFunction.java
[beam] branch master updated: Add GroupIntoBatches to runner API; add Dataflow override in Python SDK
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 05c8471 Add GroupIntoBatches to runner API; add Dataflow override in Python SDK new 2c96aeb Merge pull request #13405 from [BEAM-10703, BEAM-10475] Add GroupIntoBatches to runner API; add Dataflow override in Python SDK 05c8471 is described below commit 05c8471b27e03e5611a2a13137c4a785f2d17fc9 Author: sychen AuthorDate: Mon Nov 9 21:16:50 2020 -0800 Add GroupIntoBatches to runner API; add Dataflow override in Python SDK --- .../pipeline/src/main/proto/beam_runner_api.proto | 18 sdks/python/apache_beam/portability/common_urns.py | 1 + .../runners/dataflow/dataflow_runner.py| 32 +- .../runners/dataflow/dataflow_runner_test.py | 62 .../apache_beam/runners/dataflow/internal/names.py | 5 + .../runners/dataflow/ptransform_overrides.py | 45 + sdks/python/apache_beam/transforms/util.py | 111 - sdks/python/apache_beam/transforms/util_test.py| 41 8 files changed, 288 insertions(+), 27 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index cbfd817..ce561d3 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -344,6 +344,10 @@ message StandardPTransforms { // Payload: PubSubWritePayload. PUBSUB_WRITE = 5 [(beam_urn) = "beam:transform:pubsub_write:v1"]; + +// Represents the GroupIntoBatches.WithShardedKey operation. +// Payload: GroupIntoBatchesPayload +GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"]; } // Payload for all of these: CombinePayload enum CombineComponents { @@ -414,6 +418,10 @@ message StandardPTransforms { // Output: KV(KV(element, restriction), size). TRUNCATE_SIZED_RESTRICTION = 3 [(beam_urn) = "beam:transform:sdf_truncate_sized_restrictions:v1"]; } + // Payload for all of these: GroupIntoBatchesPayload + enum GroupIntoBatchesComponents { +GROUP_INTO_BATCHES = 0 [(beam_urn) = "beam:transform:group_into_batches:v1"]; + } } message StandardSideInputTypes { @@ -706,6 +714,16 @@ message PubSubWritePayload { string id_attribute = 3; } +// Payload for GroupIntoBatches composite transform. +message GroupIntoBatchesPayload { + + // (Required) Max size of a batch. + int64 batch_size = 1; + + // (Optional) Max duration a batch is allowed to be cached in states. + int64 max_buffering_duration_millis = 2; +} + // A coder, the binary format for serialization and deserialization of data in // a pipeline. message Coder { diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 819264d..e356319 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -42,6 +42,7 @@ deprecated_primitives = StandardPTransforms.DeprecatedPrimitives composites = StandardPTransforms.Composites combine_components = StandardPTransforms.CombineComponents sdf_components = StandardPTransforms.SplittableParDoComponents +group_into_batches_components = StandardPTransforms.GroupIntoBatchesComponents side_inputs = StandardSideInputTypes.Enum coders = StandardCoders.Enum diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 06853f4..92d9c19 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -505,7 +505,11 @@ class DataflowRunner(PipelineRunner): pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) from apache_beam.runners.dataflow.ptransform_overrides import WriteToBigQueryPTransformOverride -pipeline.replace_all([WriteToBigQueryPTransformOverride(pipeline, options)]) +from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride +pipeline.replace_all([ +WriteToBigQueryPTransformOverride(pipeline, options), +GroupIntoBatchesWithShardedKeyPTransformOverride(self, options) +]) if use_fnapi and not apiclient._use_unified_worker(options): pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES) @@ -727,6 +731,18 @@ class DataflowRunner(PipelineRunner): window_coder = None return self._get_typehint_based_encoding(element_type, window_coder) + def get_pcoll_with_auto_sharding(self): +if not hasattr(self, '_pcoll_with_auto_sharding'): + return set() +return self._pcoll_with_auto_s
[beam] branch master updated: [BEAM-11417] Use Cache with time eviction policy for commitCallbacks
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 80eff6c [BEAM-11417] Use Cache with time eviction policy for commitCallbacks new a9a1c8e Merge pull request #13507 from [BEAM-11417] Use Cache with time eviction policy for commitCallbacks 80eff6c is described below commit 80eff6c571fd34c9a4b1024ff4201c1e2dd23dad Author: Boyuan Zhang AuthorDate: Tue Dec 8 11:00:20 2020 -0800 [BEAM-11417] Use Cache with time eviction policy for commitCallbacks --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 6c127c8..6771e04 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -138,6 +138,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.EvictingQueue; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -397,8 +399,11 @@ public class StreamingDataflowWorker { new WeightedBoundedQueue<>( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); - // Map of tokens to commit callbacks. - private final ConcurrentMap commitCallbacks = new ConcurrentHashMap<>(); + // Cache of tokens to commit callbacks. + // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are + // discarded by Dataflow service and calling commitCallback is best-effort. + private final Cache commitCallbacks = + CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build(); // Map of user state names to system state names. // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in @@ -1200,12 +1205,13 @@ public class StreamingDataflowWorker { private void callFinalizeCallbacks(Windmill.WorkItem work) { for (Long callbackId : work.getSourceState().getFinalizeIdsList()) { - final Runnable callback = commitCallbacks.remove(callbackId); + final Runnable callback = commitCallbacks.getIfPresent(callbackId); // NOTE: It is possible the same callback id may be removed twice if // windmill restarts. // TODO: It is also possible for an earlier finalized id to be lost. // We should automatically discard all older callbacks for the same computation and key. if (callback != null) { +commitCallbacks.invalidate(callbackId); workUnitExecutor.forceExecute( () -> { try {
[beam] branch master updated: Remove Experimental annotation from SDF related APIs.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c3367f7 Remove Experimental annotation from SDF related APIs. new 2462fe9 Merge pull request #13199 from [BEAM-11132] Remove Experimental annotation from SDF related APIs. c3367f7 is described below commit c3367f7db1c6e8ca2bc26f827dbb8a2c2f32c002 Author: Boyuan Zhang AuthorDate: Mon Oct 26 12:30:47 2020 -0700 Remove Experimental annotation from SDF related APIs. --- .../java/org/apache/beam/sdk/annotations/Experimental.java | 7 --- .../core/src/main/java/org/apache/beam/sdk/io/AvroIO.java | 7 ++- .../core/src/main/java/org/apache/beam/sdk/io/FileIO.java | 5 ++--- .../core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 5 ++--- .../java/org/apache/beam/sdk/transforms/Deduplicate.java| 3 --- .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java | 13 - .../org/apache/beam/sdk/transforms/PeriodicSequence.java| 3 --- .../src/main/java/org/apache/beam/sdk/transforms/Watch.java | 3 --- .../sdk/transforms/splittabledofn/ByteKeyRangeTracker.java | 3 --- .../splittabledofn/GrowableOffsetRangeTracker.java | 3 --- .../sdk/transforms/splittabledofn/HasDefaultTracker.java| 4 .../splittabledofn/HasDefaultWatermarkEstimator.java| 3 --- .../transforms/splittabledofn/ManualWatermarkEstimator.java | 3 --- .../sdk/transforms/splittabledofn/OffsetRangeTracker.java | 3 --- .../sdk/transforms/splittabledofn/RestrictionTracker.java | 3 --- .../beam/sdk/transforms/splittabledofn/SplitResult.java | 3 --- .../TimestampObservingWatermarkEstimator.java | 3 --- .../sdk/transforms/splittabledofn/WatermarkEstimator.java | 3 --- .../sdk/transforms/splittabledofn/WatermarkEstimators.java | 3 --- .../beam/sdk/transforms/splittabledofn/package-info.java| 3 --- .../src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 1 - sdks/python/apache_beam/io/iobase.py| 7 +-- 22 files changed, 7 insertions(+), 84 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index b9a5a49..48ff40d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -74,13 +74,6 @@ public @interface Experimental { /** Experimental APIs related to customizing the output time for computed values. */ OUTPUT_TIME, -/** - * https://s.apache.org/splittable-do-fn";>Splittable DoFn. See https://beam.apache.org/documentation/runners/capability-matrix/";>capability matrix - * for runner support. - */ -SPLITTABLE_DO_FN, - /** Metrics-related experimental APIs. */ METRICS, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 97fd464..8fb4186 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -652,9 +652,9 @@ public class AvroIO { * interval, until the given termination condition is reached. The returned {@link PCollection} * is unbounded. * - * This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}. + * This works only in runners supporting splittable {@link + * org.apache.beam.sdk.transforms.DoFn}. */ -@Experimental(Kind.SPLITTABLE_DO_FN) public Read watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return withMatchConfiguration( @@ -877,7 +877,6 @@ public class AvroIO { } /** Like {@link Read#watchForNewFiles}. */ -@Experimental(Kind.SPLITTABLE_DO_FN) public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return withMatchConfiguration( @@ -1009,7 +1008,6 @@ public class AvroIO { } /** Like {@link Read#watchForNewFiles}. */ -@Experimental(Kind.SPLITTABLE_DO_FN) public Parse watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return withMatchConfiguration( @@ -1189,7 +1187,6 @@ public class AvroIO { } /** Like {@link Read#watchForNewFiles}. */ -@Experimental(Kind.SPLITTABLE_DO_FN) public ParseAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return withMatchConfiguration( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/F
[beam] branch master updated: Use EventRecorder instead of relying on class var.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new e4f9054 Use EventRecorder instead of relying on class var. new 46ac3bc Merge pull request #13478 from [BEAM-11070]Use EventRecorder instead of relying on class var. e4f9054 is described below commit e4f90544a881df2116bc32f7c42987e4f3e01c9d Author: Boyuan Zhang AuthorDate: Thu Dec 3 12:01:51 2020 -0800 Use EventRecorder instead of relying on class var. --- .../portability/fn_api_runner/fn_runner_test.py| 59 ++ 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 4f7fc27..05bd0d2 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -23,6 +23,7 @@ import collections import logging import os import random +import shutil import sys import tempfile import threading @@ -881,21 +882,19 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(res, equal_to(['1', '2'])) def test_register_finalizations(self): -class FinalizableSplittableDoFn(beam.DoFn): - was_finalized = False - - def set_finalized(self): -self.was_finalized = True +event_recorder = EventRecorder(tempfile.gettempdir()) +class FinalizableSplittableDoFn(beam.DoFn): def process( self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam, restriction_tracker=beam.DoFn.RestrictionParam( - OffsetRangeProvider(use_bounded_offset_range=True))): + OffsetRangeProvider( + use_bounded_offset_range=True, checkpoint_only=True))): # We use SDF to enforce finalization call happens by using # self-initiated checkpoint. -if self.was_finalized: +if 'finalized' in event_recorder.events(): restriction_tracker.try_claim( restriction_tracker.current_restriction().start) yield element @@ -903,7 +902,7 @@ class FnApiRunnerTest(unittest.TestCase): return if restriction_tracker.try_claim( restriction_tracker.current_restriction().start): - bundle_finalizer.register(lambda: self.set_finalized()) + bundle_finalizer.register(lambda: event_recorder.record('finalized')) # We sleep here instead of setting a resume time since the resume time # doesn't need to be honored. time.sleep(1) @@ -917,6 +916,8 @@ class FnApiRunnerTest(unittest.TestCase): | beam.ParDo(FinalizableSplittableDoFn())) assert_that(res, equal_to([max_retries])) +event_recorder.cleanup() + def test_sdf_synthetic_source(self): common_attrs = { 'key_size': 1, @@ -1763,6 +1764,36 @@ def _unpickle_element_counter(name): return _pickled_element_counters[name] +class EventRecorder(object): + """Used to be registered as a callback in bundle finalization. + + The reason why records are written into a tmp file is, the in-memory dataset + cannot keep callback records when passing into one DoFn. + """ + def __init__(self, tmp_dir): +self.tmp_dir = os.path.join(tmp_dir, uuid.uuid4().hex) +os.mkdir(self.tmp_dir) + + def record(self, content): +file_path = os.path.join(self.tmp_dir, uuid.uuid4().hex + '.txt') +with open(file_path, 'w') as f: + f.write(content) + + def events(self): +content = [] +record_files = [ +f for f in os.listdir(self.tmp_dir) +if os.path.isfile(os.path.join(self.tmp_dir, f)) +] +for file in record_files: + with open(os.path.join(self.tmp_dir, file), 'r') as f: +content.append(f.read()) +return sorted(content) + + def cleanup(self): +shutil.rmtree(self.tmp_dir) + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): """A RestrictionProvider that used for sdf related tests.""" def initial_restriction(self, element): @@ -1786,13 +1817,23 @@ class UnboundedOffsetRestrictionTracker( class OffsetRangeProvider(beam.transforms.core.RestrictionProvider): - def __init__(self, use_bounded_offset_range): + def __init__(self, use_bounded_offset_range, checkpoint_only=False): self.use_bounded_offset_range = use_bounded_offset_range +self.checkpoint_only = checkpoint_only def initial_restriction(self, element): return restriction_trackers.OffsetRange(0, element) def create_tracker(self, restriction): +if self.checkpoi
[beam] branch master updated: Add splittable dofn as the recommended way of building connectors.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 4c51569 Add splittable dofn as the recommended way of building connectors. new 3de140f Merge pull request #13227 from [BEAM-10480] Add splittable dofn as the recommended way of building connectors. 4c51569 is described below commit 4c51569d3b972e2271efcc520a48ccb1bd20c9be Author: Boyuan Zhang AuthorDate: Thu Oct 29 15:13:35 2020 -0700 Add splittable dofn as the recommended way of building connectors. --- .../en/documentation/io/developing-io-java.md | 3 + .../en/documentation/io/developing-io-overview.md | 80 +- .../en/documentation/io/developing-io-python.md| 3 + 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/website/www/site/content/en/documentation/io/developing-io-java.md b/website/www/site/content/en/documentation/io/developing-io-java.md index 7de2024..7836a3c 100644 --- a/website/www/site/content/en/documentation/io/developing-io-java.md +++ b/website/www/site/content/en/documentation/io/developing-io-java.md @@ -17,6 +17,9 @@ limitations under the License. --> # Developing I/O connectors for Java +**IMPORTANT:** Use ``Splittable DoFn`` to develop your new I/O. For more details, read the +[new I/O connector overview](/documentation/io/developing-io-overview/). + To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, diff --git a/website/www/site/content/en/documentation/io/developing-io-overview.md b/website/www/site/content/en/documentation/io/developing-io-overview.md index 0ea507f..c8e0482 100644 --- a/website/www/site/content/en/documentation/io/developing-io-overview.md +++ b/website/www/site/content/en/documentation/io/developing-io-overview.md @@ -46,33 +46,32 @@ are the recommended steps to get started: For **bounded (batch) sources**, there are currently two options for creating a Beam source: +1. Use `Splittable DoFn`. + 1. Use `ParDo` and `GroupByKey`. -1. Use the `Source` interface and extend the `BoundedSource` abstract subclass. -`ParDo` is the recommended option, as implementing a `Source` can be tricky. See -[When to use the Source interface](#when-to-use-source) for a list of some use -cases where you might want to use a `Source` (such as -[dynamic work rebalancing](/blog/2016/05/18/splitAtFraction-method.html)). +`Splittable DoFn` is the recommended option, as it's the most recent source framework for both +bounded and unbounded sources. This is meant to replace the `Source` APIs( +[BoundedSource](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/BoundedSource.html) and +[UnboundedSource](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/UnboundedSource.html)) +in the new system. Read +[Splittable DoFn Programming Guide](/learn/programming-guide/#splittable-dofns) for how to write one +Splittable DoFn. For more information, see the +[roadmap for multi-SDK connector efforts](/roadmap/connectors-multi-sdk/). -(Java only) For **unbounded (streaming) sources**, you must use the `Source` -interface and extend the `UnboundedSource` abstract subclass. `UnboundedSource` -supports features that are useful for streaming pipelines, such as -checkpointing. +For Java and Python **unbounded (streaming) sources**, you must use the `Splittable DoFn`, which +supports features that are useful for streaming pipelines, including checkpointing, controlling +watermark, and tracking backlog. -Splittable DoFn is a new sources framework that is under development and will -replace the other options for developing bounded and unbounded sources. For more -information, see the -[roadmap for multi-SDK connector efforts](/roadmap/connectors-multi-sdk/). -### When to use the Source interface {#when-to-use-source} +### When to use the Splittable DoFn interface {#when-to-use-splittable-dofn} -If you are not sure whether to use `Source`, feel free to email the [Beam dev -mailing list](/get-started/support) and we can discuss the -specific pros and cons of your case. +If you are not sure whether to use `Splittable DoFn`, feel free to email the +[Beam dev mailing list](/get-started/support) and we can discuss the specific pros and cons of your +case. -In some cases, implementing a `Source` might be necessary or result in better -performance: +In some cases, implementing a `Splittable DoFn` might be necessary or result in better performance: * **Unbounded sources:** `ParDo` does not work for reading from unbounded sources. `ParDo` does not support checkpointing or mechanisms like de-duping @@ -90,22 +89,40 @@ performance: jobs. Depe
[beam] branch master updated: Revert "Add use_unified_worker to pipeline options."
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 14b4889 Revert "Add use_unified_worker to pipeline options." new eb11a3f Merge pull request #13446 from Revert "Add use_unified_worker to pipeline options." 14b4889 is described below commit 14b48899f15b524e608c23e1cf62ddcf1aa4083c Author: Boyuan Zhang AuthorDate: Mon Nov 30 15:02:12 2020 -0800 Revert "Add use_unified_worker to pipeline options." This reverts commit ab2e94d3 --- sdks/python/apache_beam/examples/kafkataxi/README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/kafkataxi/README.md b/sdks/python/apache_beam/examples/kafkataxi/README.md index 674dbd8..e8e144d 100644 --- a/sdks/python/apache_beam/examples/kafkataxi/README.md +++ b/sdks/python/apache_beam/examples/kafkataxi/README.md @@ -185,7 +185,5 @@ python -m apache_beam.examples.kafkataxi.kafka_taxi \ --job_name $JOB_NAME \ --bootstrap_servers $BOOTSTRAP_SERVER \ --sdk_harness_container_image_overrides ".*java.*,${DOCKER_ROOT}/beam_java8_sdk:latest" \ - --experiments=use_runner_v2 \ - --experiments=use_unified_worker \ - --experiments=beam_fn_api + --experiments=use_runner_v2 ```
[beam] branch master updated (b002530 -> 682f2ea)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b002530 Merge pull request #13417 from ibzib/BEAM-10278 new 72b8894 [BEAM-10962] Add Multiple PubSub reader to Python SDK new 50aaf77 [BEAM-10962] Add Multiple PubSub reader to Python SDK new 296f3a4 Merge branch 'master' into multi-pubsub-reader new d0263c7 Merge remote-tracking branch 'origin/multi-pubsub-reader' into multi-pubsub-reader new 34b92bf [BEAM-10962] Add Multiple PubSub reader to Python SDK new e046a63 [BEAM-10962] Add Multiple PubSub reader to Python SDK new 660cdf5 [BEAM-10962] Add Multiple PubSub reader to Python SDK new dfc57ee [BEAM-10962] Add Multiple PubSub reader to Python SDK new 0a0dc6b [BEAM-10962] Add Multiple PubSub reader to Python SDK new c33e97f [BEAM-10962] Add Multiple PubSub reader to Python SDK new 2508a4f [BEAM-10962] Add Multiple PubSub reader to Python SDK new 2e7bc4a [BEAM-10962] Add Multiple PubSub reader to Python SDK new a279c7d [BEAM-10962] Add Multiple PubSub reader to Python SDK new 17af76c Merge branch 'master' into multi-pubsub-reader new 682f2ea Merge pull request #12930 from [BEAM-10962] Add Multiple PubSub reader to Python SDK The 29784 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: sdks/python/apache_beam/io/gcp/pubsub.py | 114 sdks/python/apache_beam/io/gcp/pubsub_test.py | 123 ++ 2 files changed, 237 insertions(+)
[beam] branch master updated: [BEAM-11070] Use self-checkpoint to enfore finalization happens.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new b29102e [BEAM-11070] Use self-checkpoint to enfore finalization happens. new 9b51d4b Merge pull request #13338 from [BEAM-11070] Use self-checkpoint to enforce finalization happens. b29102e is described below commit b29102ed4c4186964cd281edf7851898535cfe02 Author: Boyuan Zhang AuthorDate: Fri Nov 13 12:06:20 2020 -0800 [BEAM-11070] Use self-checkpoint to enfore finalization happens. --- .../runners/portability/flink_runner_test.py | 3 +- .../portability/fn_api_runner/fn_runner_test.py| 81 ++ 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 303fe96..12ed1b8 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -442,7 +442,8 @@ class FlinkRunnerTestStreaming(FlinkRunnerTest): super(FlinkRunnerTest, self).test_callbacks_with_exception() def test_register_finalizations(self): -raise unittest.SkipTest("BEAM-11070") +self.enable_commit = True +super(FlinkRunnerTest, self).test_register_finalizations() if __name__ == '__main__': diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 91343b5..18a30e9 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -23,7 +23,6 @@ import collections import logging import os import random -import shutil import sys import tempfile import threading @@ -835,23 +834,41 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(res, equal_to(['1', '2'])) def test_register_finalizations(self): -event_recorder = EventRecorder(tempfile.gettempdir()) -elements_list = ['2', '1'] +class FinalizableSplittableDoFn(beam.DoFn): + was_finalized = False + + def set_finalized(self): +self.was_finalized = True -class FinalizableDoFn(beam.DoFn): def process( - self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam): -bundle_finalizer.register(lambda: event_recorder.record(element)) -yield element + self, + element, + bundle_finalizer=beam.DoFn.BundleFinalizerParam, + restriction_tracker=beam.DoFn.RestrictionParam( + OffsetRangeProvider(use_bounded_offset_range=True))): +# We use SDF to enforce finalization call happens by using +# self-initiated checkpoint. +if self.was_finalized: + restriction_tracker.try_claim( + restriction_tracker.current_restriction().start) + yield element + restriction_tracker.try_claim(element) + return +if restriction_tracker.try_claim( +restriction_tracker.current_restriction().start): + bundle_finalizer.register(lambda: self.set_finalized()) + # We sleep here instead of setting a resume time since the resume time + # doesn't need to be honored. + time.sleep(1) + restriction_tracker.defer_remainder() with self.create_pipeline() as p: - res = (p | beam.Create(elements_list) | beam.ParDo(FinalizableDoFn())) - - assert_that(res, equal_to(elements_list)) - -results = event_recorder.events() -event_recorder.cleanup() -self.assertEqual(results, sorted(elements_list)) + max_retries = 100 + res = ( + p + | beam.Create([max_retries]) + | beam.ParDo(FinalizableSplittableDoFn())) + assert_that(res, equal_to([max_retries])) def test_sdf_synthetic_source(self): common_attrs = { @@ -1331,6 +1348,9 @@ class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest): def test_sdf_with_watermark_tracking(self): raise unittest.SkipTest("This test is for a single worker only.") + def test_register_finalizations(self): +raise unittest.SkipTest("This test is for a single worker only.") + class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest): def create_pipeline(self, is_drain=False): @@ -1355,6 +1375,9 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest): def test_sdf_with_watermark_tracking(self): raise unittest.SkipTest("This test is for a single worker only.") + def test_register_finalizations(self): +raise unittest.SkipTest("This test is for a single worker only.")
[beam] branch master updated: Enable more runner v2 tests.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 07cbed5 Enable more runner v2 tests. new 208690c Merge pull request #13372 from boyuanzz/enable_test 07cbed5 is described below commit 07cbed52ec185c3c4b1eeff7d237a35a6da107fd Author: Boyuan Zhang AuthorDate: Tue Nov 17 20:20:54 2020 -0800 Enable more runner v2 tests. --- runners/google-cloud-dataflow-java/build.gradle | 24 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 9e5f085..197eff8 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -171,6 +171,7 @@ def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesOrderedListState', 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime', +'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', ] // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to @@ -358,26 +359,16 @@ task validatesRunnerV2 { dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2Test', excludedCategories: [ - 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesUnboundedPCollections', + 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo', ], excludedTests: [ 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection', 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnUnboundedSplittableDoFn', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointBounded', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexBasicBounded', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexBasicUnbounded', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testWindowedSideInputWithCheckpointsBounded', - 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming', - 'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalization', - 'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithSideInputs', - 'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithState', 'org.apache.beam.sdk.transforms.ParDoTest$OnWindowExpirationTests.testOnWindowExpirationSimpleBounded', 'org.apache.beam.sdk.transforms.ParDoTest$OnWindowExpirationTests.testOnWindowExpirationSimpleUnbounded', 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testValueStateTaggedOutput', @@ -418,10 +409,6 @@ task validatesRunnerV2Streaming { name: 'validatesRunnerV2TestStreaming', pipelineOptions: runnerV2PipelineOptions + ['--streaming=true'], excludedCategories: [ - // Not supported in Dataflow streaming mode. - 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', - // Timeouts, need investigation. - 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo', 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo', 'org.apache.beam.sdk.testing.LargeKeys$Above10KB', ], @@ -446,6 +433,11 @@ task validatesRunnerV2Streaming { 'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime', + // TODO(BEAM-11306): Pipeline is hanging for these 3 tests. + 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexBasicUnbounded', + 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded', + 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnUnboundedSplittableDoFn', + // TODO(BEAM-8543): streaming timers are not strictly time ordered 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate', 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOther',
[beam] branch master updated: [BEAM-113100] Exclude org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent from flink validates runner test
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fa5c512 [BEAM-113100] Exclude org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent from flink validates runner test new 2fed655 Merge pull request #13392 from [BEAM-11310] Exclude org.apache.beam.sdk.transforms.ViewTest.testWin fa5c512 is described below commit fa5c5127dc9b255ee893a9ee49b42c9251629957 Author: Sam Whittle AuthorDate: Fri Nov 20 02:10:48 2020 -0800 [BEAM-113100] Exclude org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent from flink validates runner test --- runners/flink/job-server/flink_job_server.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 1ca09a1..6fc9068 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -182,6 +182,8 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi testFilter: { // TODO(BEAM-10016) excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2' + // TODO(BEAM-11310) + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent' }, ) }
[beam] branch master updated: Update SDF programming guide.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fc52b63 Update SDF programming guide. new 774185b Merge pull request #13326 from [BEAM-10480] Update SDF programming guide fc52b63 is described below commit fc52b633a2c1fe58d98290d131dd9dcb373165a8 Author: Boyuan Zhang AuthorDate: Thu Nov 12 14:54:42 2020 -0800 Update SDF programming guide. --- .../content/en/documentation/programming-guide.md | 85 ++ 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index abd682e..d8b3ec0 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -5188,16 +5188,70 @@ restriction pairs. 12.1.1. A basic SDF {#a-basic-sdf} A basic SDF is composed of three parts: a restriction, a restriction provider, and a -restriction tracker. The restriction is used to represent a subset of work for a given element. -The restriction provider lets SDF authors override default implementations for splitting, sizing, -watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92) +restriction tracker. If you want to control the watermark, especially in a streaming +pipeline, two more components are needed: a watermark estimator provider and a watermark estimator. + +The restriction is a user-defined object that is used to represent a subset of +work for a given element. For example, we defined `OffsetRange` as a restriction to represent offset +positions in [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/range/OffsetRange.html) +and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRange). + +The restriction provider lets SDF authors override default implementations, including the ones for +splitting and sizing. In [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226), -this is the `DoFn`. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213) -has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking -what subset of the restriction has been completed during processing. +this is the `DoFn`. [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.RestrictionProvider) +has a dedicated `RestrictionProvider` type. + +The restriction tracker is responsible for tracking which subset of the restriction has been +completed during processing. For APIs details, read the [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.html) +and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.RestrictionTracker) +reference documentation. + +There are some built-in `RestrictionTracker` implementations defined in Java: +1. [OffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html) +2. [GrowableOffsetRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.html) +3. [ByteKeyRangeTracker](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.html) + +The SDF also has a built-in `RestrictionTracker` implementation in Python: +1. [OffsetRangeTracker](https://beam.apache.org/releases/pydoc/current/apache_beam.io.restriction_trackers.html#apache_beam.io.restriction_trackers.OffsetRestrictionTracker) + +The watermark state is a user-defined object which is used to create a `WatermarkEstimator` from a +`WatermarkEstimatorProvider`. The simplest watermark state could be a `timestamp`. + +The watermark estimator provider lets SDF authors define how to initialize the watermark state and +create a watermark estimator. In [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) +this is the `DoFn`. [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.WatermarkEstimatorProvider) +has a dedicated `WatermarkEstimatorProvider` type
[beam] branch master updated: Make UsesTestStream extend UsesUnboundedPCollections for exclusion in batch only runners
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d4c82ec Make UsesTestStream extend UsesUnboundedPCollections for exclusion in batch only runners new 434bce5 Merge pull request #13376 from Make UsesTestStream extend UsesUnboundedPCollections for exclusion in batch-only runners d4c82ec is described below commit d4c82ecb6723123dd2a2d3422dcc25e5009db48e Author: Kenneth Knowles AuthorDate: Wed Nov 18 09:43:55 2020 -0800 Make UsesTestStream extend UsesUnboundedPCollections for exclusion in batch only runners --- .../core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java index 4cd0920..f3e8302 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java @@ -24,4 +24,4 @@ import org.apache.beam.sdk.annotations.Internal; * special feature currently only implemented by the direct runner and the Flink Runner (streaming). */ @Internal -public interface UsesTestStream {} +public interface UsesTestStream extends UsesUnboundedPCollections {}
[beam] branch master updated: Upgrade Conscrypt to 2.5.1 in GrpcVendoring_1_26_0
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a95dbc8 Upgrade Conscrypt to 2.5.1 in GrpcVendoring_1_26_0 new ed09e96 Merge pull request #13150 from [BEAM-5748] Upgrade Conscrypt to 2.5.1 in GrpcVendoring_1_26_0 a95dbc8 is described below commit a95dbc8ad88dc0bb6624039e06c11eb1ab5b961a Author: Esun Kim AuthorDate: Wed Nov 11 10:42:28 2020 -0800 Upgrade Conscrypt to 2.5.1 in GrpcVendoring_1_26_0 --- .../src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy index e0ce931..898faa2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy @@ -38,7 +38,7 @@ class GrpcVendoring_1_26_0 { static def compress_lzf_version = "1.0.3" static def lz4_version = "1.3.0" static def bouncycastle_version = "1.54" - static def conscrypt_version = "1.3.0" + static def conscrypt_version = "2.5.1" static def alpn_api_version = "1.1.2.v20150522" static def npn_api_version = "1.1.1.v20141010" static def jboss_marshalling_version = "1.4.11.Final"
[beam] branch master updated: Fix NPE in CountingSource
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a872aca Fix NPE in CountingSource new fc41a9d Merge pull request #13361 from Fix NPE in CountingSource a872aca is described below commit a872aca93099cf43429626baf0e526c2f5ed856b Author: Boyuan Zhang AuthorDate: Mon Nov 16 20:10:29 2020 -0800 Fix NPE in CountingSource --- .../java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index c1601e3..3fbd0e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -435,7 +435,8 @@ public class CountingSource { } private long expectedValue() { - if (source.period.getMillis() == 0L) { + // Within the SDF unbounded wrapper, we will query the initial size before we start to read. + if (source.period.getMillis() == 0L || firstStarted == null) { return Long.MAX_VALUE; } double periodsElapsed =
[beam] branch master updated: [BEAM-11270] Dataflow Java on runner v2 tests are failing because sdk docker container is cleaned up incorrectly
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 9a3d3ea [BEAM-11270] Dataflow Java on runner v2 tests are failing because sdk docker container is cleaned up incorrectly new 39b95c4 Merge pull request #13356 from [BEAM-11270] Fix container clean-up logic 9a3d3ea is described below commit 9a3d3eaf3d57c66a245291febc16b6e5623d394d Author: Heejong Lee AuthorDate: Mon Nov 16 14:17:08 2020 -0800 [BEAM-11270] Dataflow Java on runner v2 tests are failing because sdk docker container is cleaned up incorrectly --- runners/google-cloud-dataflow-java/build.gradle | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 03e57df..6973cbe 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -315,8 +315,12 @@ task validatesRunnerStreaming { )) } +task setupXVR { + dependsOn buildAndPushDockerContainer +} + createCrossLanguageValidatesRunnerTask( - startJobServer: buildAndPushDockerContainer, + startJobServer: setupXVR, cleanupJobServer: cleanUpDockerImages, classpath: configurations.validatesRunner, numParallelTests: Integer.MAX_VALUE,
[beam] branch master updated: [BEAM-11263] Java cleanUpDockerImages now force removes container images.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 78b9d05 [BEAM-11263] Java cleanUpDockerImages now force removes container images. new 7ac82a8 Merge pull request #13344 from [BEAM-11263] Java cleanUpDockerImages now force removes container images. 78b9d05 is described below commit 78b9d0575f9146296027d7c02ea48cd70a7bcd31 Author: Tyson Hamilton AuthorDate: Fri Nov 13 22:56:00 2020 + [BEAM-11263] Java cleanUpDockerImages now force removes container images. --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c791a20..95d2ba9 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -251,7 +251,7 @@ task buildAndPushDockerContainer() { task cleanUpDockerImages() { doLast { exec { - commandLine "docker", "rmi", "${dockerImageName}" + commandLine "docker", "rmi", "--force", "${dockerImageName}" } exec { commandLine "gcloud", "--quiet", "container", "images", "delete", "--force-delete-tags", "${dockerImageName}"
[beam] branch master updated: [BEAM-11192] Add use_runner_v2 to experiments.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 656c21b [BEAM-11192] Add use_runner_v2 to experiments. new 43e8b99 Merge pull request #13339 from [BEAM-11192] Add use_runner_v2 to experiments. 656c21b is described below commit 656c21bb1d3b1b2a75398d8f585515ed092d686e Author: Boyuan Zhang AuthorDate: Fri Nov 13 12:42:56 2020 -0800 [BEAM-11192] Add use_runner_v2 to experiments. --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 3a6f8e1..c791a20 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -143,7 +143,7 @@ def runnerV2PipelineOptions = [ "--region=${dataflowRegion}", "--tempRoot=${dataflowValidatesTempRoot}", "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", - "--experiments=beam_fn_api,use_unified_worker", + "--experiments=beam_fn_api,use_unified_worker,use_runner_v2", ] def commonLegacyExcludeCategories = [
[beam] branch master updated: Update SDF related documentation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8337b8 Update SDF related documentation. new d2ff41e Merge pull request #13313 from [BEAM-10480] Update SDF related documentation. a8337b8 is described below commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3 Author: Boyuan Zhang AuthorDate: Wed Nov 11 17:03:51 2020 -0800 Update SDF related documentation. --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +++- .../splittabledofn/RestrictionTracker.java | 23 +- sdks/python/apache_beam/io/iobase.py | 12 ++ sdks/python/apache_beam/transforms/core.py | 28 +++--- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 78b6ef7..115b480 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -670,7 +670,9 @@ public abstract class DoFnIt should define a {@link SplitRestriction} method. This method enables runners to - * perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link + * perform bulk splitting initially allowing for a rapid increase in parallelism. If it is + * not defined, there is no initial split happening by default. Note that initial split is a + * different concept from the split during element processing time. See {@link * RestrictionTracker#trySplit} for details about splitting when the current element and * restriction are actively being processed. * It may define a {@link TruncateRestriction} method to choose how to truncate a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 9ce33bf..77a0592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * Manages access to the restriction and keeps track of its claimed part for a https://s.apache.org/splittable-do-fn";>splittable {@link DoFn}. * + * The restriction may be modified by different threads, however the system will ensure + * sufficient locking such that no methods on the restriction tracker will be called concurrently. + * * {@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling * of workers and/or splitting may result if the progress is an inaccurate representation of the * known amount of completed and remaining work. @@ -49,12 +52,20 @@ public abstract class RestrictionTracker { * call to this method). * {@link RestrictionTracker#checkDone} MUST succeed. * + * + * This method is required to be implemented. */ public abstract boolean tryClaim(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. + * + * The current restriction returned by method may be updated dynamically due to due to + * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link + * RestrictionTracker#trySplit(double)}. + * + * This method is required to be implemented. */ public abstract RestrictionT currentRestriction(); @@ -83,6 +94,9 @@ public abstract class RestrictionTracker { * The API is recommended to be implemented for a batch pipeline to improve parallel processing * performance. * + * The API is recommended to be implemented for batch pipeline given that it is very important + * for pipeline scaling and end to end pipeline execution. + * * The API is required to be implemented for a streaming pipeline. * * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should @@ -94,10 +108,15 @@ public abstract class RestrictionTracker { public abstract @Nullable SplitResult trySplit(double fractionOfRemainder); /** - * Called by the runner after {@link DoFn.ProcessElement} returns. + * Checks whether the restriction has been fully processed. + * + * Called by the SDK harness after {@link DoFn.ProcessElement} returns. * * Must throw an exception with an informative error message, if there is still any unclaimed * work remainin
[beam] branch master updated: Update SDF related documentation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8337b8 Update SDF related documentation. new d2ff41e Merge pull request #13313 from [BEAM-10480] Update SDF related documentation. a8337b8 is described below commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3 Author: Boyuan Zhang AuthorDate: Wed Nov 11 17:03:51 2020 -0800 Update SDF related documentation. --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +++- .../splittabledofn/RestrictionTracker.java | 23 +- sdks/python/apache_beam/io/iobase.py | 12 ++ sdks/python/apache_beam/transforms/core.py | 28 +++--- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 78b6ef7..115b480 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -670,7 +670,9 @@ public abstract class DoFnIt should define a {@link SplitRestriction} method. This method enables runners to - * perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link + * perform bulk splitting initially allowing for a rapid increase in parallelism. If it is + * not defined, there is no initial split happening by default. Note that initial split is a + * different concept from the split during element processing time. See {@link * RestrictionTracker#trySplit} for details about splitting when the current element and * restriction are actively being processed. * It may define a {@link TruncateRestriction} method to choose how to truncate a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 9ce33bf..77a0592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * Manages access to the restriction and keeps track of its claimed part for a https://s.apache.org/splittable-do-fn";>splittable {@link DoFn}. * + * The restriction may be modified by different threads, however the system will ensure + * sufficient locking such that no methods on the restriction tracker will be called concurrently. + * * {@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling * of workers and/or splitting may result if the progress is an inaccurate representation of the * known amount of completed and remaining work. @@ -49,12 +52,20 @@ public abstract class RestrictionTracker { * call to this method). * {@link RestrictionTracker#checkDone} MUST succeed. * + * + * This method is required to be implemented. */ public abstract boolean tryClaim(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. + * + * The current restriction returned by method may be updated dynamically due to due to + * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link + * RestrictionTracker#trySplit(double)}. + * + * This method is required to be implemented. */ public abstract RestrictionT currentRestriction(); @@ -83,6 +94,9 @@ public abstract class RestrictionTracker { * The API is recommended to be implemented for a batch pipeline to improve parallel processing * performance. * + * The API is recommended to be implemented for batch pipeline given that it is very important + * for pipeline scaling and end to end pipeline execution. + * * The API is required to be implemented for a streaming pipeline. * * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should @@ -94,10 +108,15 @@ public abstract class RestrictionTracker { public abstract @Nullable SplitResult trySplit(double fractionOfRemainder); /** - * Called by the runner after {@link DoFn.ProcessElement} returns. + * Checks whether the restriction has been fully processed. + * + * Called by the SDK harness after {@link DoFn.ProcessElement} returns. * * Must throw an exception with an informative error message, if there is still any unclaimed * work remainin
[beam] branch master updated: Update SDF related documentation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8337b8 Update SDF related documentation. new d2ff41e Merge pull request #13313 from [BEAM-10480] Update SDF related documentation. a8337b8 is described below commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3 Author: Boyuan Zhang AuthorDate: Wed Nov 11 17:03:51 2020 -0800 Update SDF related documentation. --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +++- .../splittabledofn/RestrictionTracker.java | 23 +- sdks/python/apache_beam/io/iobase.py | 12 ++ sdks/python/apache_beam/transforms/core.py | 28 +++--- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 78b6ef7..115b480 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -670,7 +670,9 @@ public abstract class DoFnIt should define a {@link SplitRestriction} method. This method enables runners to - * perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link + * perform bulk splitting initially allowing for a rapid increase in parallelism. If it is + * not defined, there is no initial split happening by default. Note that initial split is a + * different concept from the split during element processing time. See {@link * RestrictionTracker#trySplit} for details about splitting when the current element and * restriction are actively being processed. * It may define a {@link TruncateRestriction} method to choose how to truncate a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 9ce33bf..77a0592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * Manages access to the restriction and keeps track of its claimed part for a https://s.apache.org/splittable-do-fn";>splittable {@link DoFn}. * + * The restriction may be modified by different threads, however the system will ensure + * sufficient locking such that no methods on the restriction tracker will be called concurrently. + * * {@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling * of workers and/or splitting may result if the progress is an inaccurate representation of the * known amount of completed and remaining work. @@ -49,12 +52,20 @@ public abstract class RestrictionTracker { * call to this method). * {@link RestrictionTracker#checkDone} MUST succeed. * + * + * This method is required to be implemented. */ public abstract boolean tryClaim(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. + * + * The current restriction returned by method may be updated dynamically due to due to + * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link + * RestrictionTracker#trySplit(double)}. + * + * This method is required to be implemented. */ public abstract RestrictionT currentRestriction(); @@ -83,6 +94,9 @@ public abstract class RestrictionTracker { * The API is recommended to be implemented for a batch pipeline to improve parallel processing * performance. * + * The API is recommended to be implemented for batch pipeline given that it is very important + * for pipeline scaling and end to end pipeline execution. + * * The API is required to be implemented for a streaming pipeline. * * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should @@ -94,10 +108,15 @@ public abstract class RestrictionTracker { public abstract @Nullable SplitResult trySplit(double fractionOfRemainder); /** - * Called by the runner after {@link DoFn.ProcessElement} returns. + * Checks whether the restriction has been fully processed. + * + * Called by the SDK harness after {@link DoFn.ProcessElement} returns. * * Must throw an exception with an informative error message, if there is still any unclaimed * work remainin
[beam] branch master updated: Update SDF related documentation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8337b8 Update SDF related documentation. new d2ff41e Merge pull request #13313 from [BEAM-10480] Update SDF related documentation. a8337b8 is described below commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3 Author: Boyuan Zhang AuthorDate: Wed Nov 11 17:03:51 2020 -0800 Update SDF related documentation. --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +++- .../splittabledofn/RestrictionTracker.java | 23 +- sdks/python/apache_beam/io/iobase.py | 12 ++ sdks/python/apache_beam/transforms/core.py | 28 +++--- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 78b6ef7..115b480 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -670,7 +670,9 @@ public abstract class DoFnIt should define a {@link SplitRestriction} method. This method enables runners to - * perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link + * perform bulk splitting initially allowing for a rapid increase in parallelism. If it is + * not defined, there is no initial split happening by default. Note that initial split is a + * different concept from the split during element processing time. See {@link * RestrictionTracker#trySplit} for details about splitting when the current element and * restriction are actively being processed. * It may define a {@link TruncateRestriction} method to choose how to truncate a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 9ce33bf..77a0592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * Manages access to the restriction and keeps track of its claimed part for a https://s.apache.org/splittable-do-fn";>splittable {@link DoFn}. * + * The restriction may be modified by different threads, however the system will ensure + * sufficient locking such that no methods on the restriction tracker will be called concurrently. + * * {@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling * of workers and/or splitting may result if the progress is an inaccurate representation of the * known amount of completed and remaining work. @@ -49,12 +52,20 @@ public abstract class RestrictionTracker { * call to this method). * {@link RestrictionTracker#checkDone} MUST succeed. * + * + * This method is required to be implemented. */ public abstract boolean tryClaim(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. + * + * The current restriction returned by method may be updated dynamically due to due to + * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link + * RestrictionTracker#trySplit(double)}. + * + * This method is required to be implemented. */ public abstract RestrictionT currentRestriction(); @@ -83,6 +94,9 @@ public abstract class RestrictionTracker { * The API is recommended to be implemented for a batch pipeline to improve parallel processing * performance. * + * The API is recommended to be implemented for batch pipeline given that it is very important + * for pipeline scaling and end to end pipeline execution. + * * The API is required to be implemented for a streaming pipeline. * * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should @@ -94,10 +108,15 @@ public abstract class RestrictionTracker { public abstract @Nullable SplitResult trySplit(double fractionOfRemainder); /** - * Called by the runner after {@link DoFn.ProcessElement} returns. + * Checks whether the restriction has been fully processed. + * + * Called by the SDK harness after {@link DoFn.ProcessElement} returns. * * Must throw an exception with an informative error message, if there is still any unclaimed * work remainin
[beam] branch master updated: Update SDF related documentation.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a8337b8 Update SDF related documentation. new d2ff41e Merge pull request #13313 from [BEAM-10480] Update SDF related documentation. a8337b8 is described below commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3 Author: Boyuan Zhang AuthorDate: Wed Nov 11 17:03:51 2020 -0800 Update SDF related documentation. --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +++- .../splittabledofn/RestrictionTracker.java | 23 +- sdks/python/apache_beam/io/iobase.py | 12 ++ sdks/python/apache_beam/transforms/core.py | 28 +++--- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 78b6ef7..115b480 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -670,7 +670,9 @@ public abstract class DoFnIt should define a {@link SplitRestriction} method. This method enables runners to - * perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link + * perform bulk splitting initially allowing for a rapid increase in parallelism. If it is + * not defined, there is no initial split happening by default. Note that initial split is a + * different concept from the split during element processing time. See {@link * RestrictionTracker#trySplit} for details about splitting when the current element and * restriction are actively being processed. * It may define a {@link TruncateRestriction} method to choose how to truncate a diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 9ce33bf..77a0592 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; * Manages access to the restriction and keeps track of its claimed part for a https://s.apache.org/splittable-do-fn";>splittable {@link DoFn}. * + * The restriction may be modified by different threads, however the system will ensure + * sufficient locking such that no methods on the restriction tracker will be called concurrently. + * * {@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling * of workers and/or splitting may result if the progress is an inaccurate representation of the * known amount of completed and remaining work. @@ -49,12 +52,20 @@ public abstract class RestrictionTracker { * call to this method). * {@link RestrictionTracker#checkDone} MUST succeed. * + * + * This method is required to be implemented. */ public abstract boolean tryClaim(PositionT position); /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. + * + * The current restriction returned by method may be updated dynamically due to due to + * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link + * RestrictionTracker#trySplit(double)}. + * + * This method is required to be implemented. */ public abstract RestrictionT currentRestriction(); @@ -83,6 +94,9 @@ public abstract class RestrictionTracker { * The API is recommended to be implemented for a batch pipeline to improve parallel processing * performance. * + * The API is recommended to be implemented for batch pipeline given that it is very important + * for pipeline scaling and end to end pipeline execution. + * * The API is required to be implemented for a streaming pipeline. * * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should @@ -94,10 +108,15 @@ public abstract class RestrictionTracker { public abstract @Nullable SplitResult trySplit(double fractionOfRemainder); /** - * Called by the runner after {@link DoFn.ProcessElement} returns. + * Checks whether the restriction has been fully processed. + * + * Called by the SDK harness after {@link DoFn.ProcessElement} returns. * * Must throw an exception with an informative error message, if there is still any unclaimed * work remainin
[beam] branch master updated (6649ccc -> 2ab6c8a)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 6649ccc Merge pull request #13283 from ihji/BEAM-11142 new 8cc104c fix: Fix split() logic for PubsubliteIO to split to `desiredNumSplits` groups instead of groups of size `desiredNumSplits` new c2b8c24 fix: Fix nit new 4a378bf fix: run spotless new 2ab6c8a Merge pull request #13161 from dpcollins-google/fix-lite-2 The 29630 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java | 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-)
[beam] branch master updated: Add sdf initiated checkpoint support to portable Flink.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 28914c2 Add sdf initiated checkpoint support to portable Flink. new cf6fd1c Merge pull request #13105 from [BEAM-10940] Add sdf initiated checkpoint support to portable Flink. 28914c2 is described below commit 28914c2679feaae8bf00955229f64bb46d3970cd Author: Boyuan Zhang AuthorDate: Tue Oct 13 19:37:26 2020 -0700 Add sdf initiated checkpoint support to portable Flink. --- runners/flink/job-server/flink_job_server.gradle | 10 +- .../FlinkBatchPortablePipelineTranslator.java | 3 +- .../FlinkStreamingPortablePipelineTranslator.java | 43 ++- .../functions/FlinkExecutableStageFunction.java| 84 +- .../wrappers/streaming/DoFnOperator.java | 31 ++- .../streaming/ExecutableStageDoFnOperator.java | 288 ++--- .../streaming/SdfByteBufferKeySelector.java| 61 + .../FlinkExecutableStageFunctionTest.java | 14 +- .../streaming/ExecutableStageDoFnOperatorTest.java | 18 +- .../control/BundleCheckpointHandlers.java | 142 ++ .../control/DefaultJobBundleFactory.java | 6 +- .../fnexecution/control/SdkHarnessClient.java | 17 +- .../SingleEnvironmentInstanceJobBundleFactory.java | 6 +- .../fnexecution/control/StageBundleFactory.java| 35 ++- .../fnexecution/control/RemoteExecutionTest.java | 1 + .../SparkExecutableStageFunctionTest.java | 4 +- .../runners/portability/flink_runner_test.py | 3 - 17 files changed, 673 insertions(+), 93 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 24b0838..1ca09a1 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -146,8 +146,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi if (streaming && checkpointing) { includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' -// TODO(BEAM-10940): Enable this test suite once we have support. -excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' // TestStreamSource does not support checkpointing excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } else { @@ -169,18 +167,16 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' if (streaming) { + excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp' } else { + excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } -//SplitableDoFnTests -excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' -excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' -excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' - } }, testFilter: { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 25a0ed0..32726f3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -342,7 +342,8 @@ public class FlinkBatchPortablePipelineTranslator context.getJobInfo(), outputMap, FlinkExecutableStageContextFactory.getInstance(), -getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder()); +getWindowingStrategy(inputPCollectionId, compo
[beam] branch master updated: Add sdf initiated checkpoint support to portable Flink.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 28914c2 Add sdf initiated checkpoint support to portable Flink. new cf6fd1c Merge pull request #13105 from [BEAM-10940] Add sdf initiated checkpoint support to portable Flink. 28914c2 is described below commit 28914c2679feaae8bf00955229f64bb46d3970cd Author: Boyuan Zhang AuthorDate: Tue Oct 13 19:37:26 2020 -0700 Add sdf initiated checkpoint support to portable Flink. --- runners/flink/job-server/flink_job_server.gradle | 10 +- .../FlinkBatchPortablePipelineTranslator.java | 3 +- .../FlinkStreamingPortablePipelineTranslator.java | 43 ++- .../functions/FlinkExecutableStageFunction.java| 84 +- .../wrappers/streaming/DoFnOperator.java | 31 ++- .../streaming/ExecutableStageDoFnOperator.java | 288 ++--- .../streaming/SdfByteBufferKeySelector.java| 61 + .../FlinkExecutableStageFunctionTest.java | 14 +- .../streaming/ExecutableStageDoFnOperatorTest.java | 18 +- .../control/BundleCheckpointHandlers.java | 142 ++ .../control/DefaultJobBundleFactory.java | 6 +- .../fnexecution/control/SdkHarnessClient.java | 17 +- .../SingleEnvironmentInstanceJobBundleFactory.java | 6 +- .../fnexecution/control/StageBundleFactory.java| 35 ++- .../fnexecution/control/RemoteExecutionTest.java | 1 + .../SparkExecutableStageFunctionTest.java | 4 +- .../runners/portability/flink_runner_test.py | 3 - 17 files changed, 673 insertions(+), 93 deletions(-) diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 24b0838..1ca09a1 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -146,8 +146,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi if (streaming && checkpointing) { includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' -// TODO(BEAM-10940): Enable this test suite once we have support. -excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' // TestStreamSource does not support checkpointing excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } else { @@ -169,18 +167,16 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' if (streaming) { + excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp' } else { + excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } -//SplitableDoFnTests -excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' -excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs' -excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' - } }, testFilter: { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 25a0ed0..32726f3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -342,7 +342,8 @@ public class FlinkBatchPortablePipelineTranslator context.getJobInfo(), outputMap, FlinkExecutableStageContextFactory.getInstance(), -getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder()); +getWindowingStrategy(inputPCollectionId, compo
[beam] branch master updated: Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a2f22df Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming new d535e6d Merge pull request #13253 from nehsyc/gib_timeout_py a2f22df is described below commit a2f22dfd2c72a9ca152a8d44907fc7344361b499 Author: sychen AuthorDate: Tue Nov 3 15:00:26 2020 -0800 Fix GroupIntoBathces.test_buffering_timer_in_fixed_window_streaming --- sdks/python/apache_beam/transforms/util_test.py | 19 ++- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index cbca2a1..49598f7 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -675,13 +675,14 @@ class GroupIntoBatchesTest(unittest.TestCase): max_buffering_duration_secs = 100 start_time = timestamp.Timestamp(0) -test_stream = TestStream().add_elements( -[TimestampedValue(value, start_time + i) - for i, value in enumerate(GroupIntoBatchesTest._create_test_data())]) \ - .advance_watermark_to( -start_time + GroupIntoBatchesTest.NUM_ELEMENTS + 1) \ - .advance_processing_time(100) \ - .advance_watermark_to_infinity() +test_stream = ( +TestStream().add_elements([ +TimestampedValue(value, start_time + i) for i, +value in enumerate(GroupIntoBatchesTest._create_test_data()) +]).advance_processing_time(150).advance_watermark_to( +start_time + window_duration).advance_watermark_to( +start_time + window_duration + +1).advance_watermark_to_infinity()) with TestPipeline(options=StandardOptions(streaming=True)) as pipeline: # To trigger the processing time timer, use a fake clock with start time @@ -704,10 +705,10 @@ class GroupIntoBatchesTest(unittest.TestCase): # should be 5 (flush because of batch size reached). expected_0 = 5 # There is only one element left in the window so batch size - # should be 1 (flush because of end of window reached). + # should be 1 (flush because of max buffering duration reached). expected_1 = 1 # Collection has 10 elements, there are only 4 left, so batch size should - # be 4 (flush because of max buffering duration reached). + # be 4 (flush because of end of window reached). expected_2 = 4 assert_that( num_elements_per_batch,
[beam] branch master updated: Exclude SDF test suite because it requires support of self-checkpoint.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0dc4afe Exclude SDF test suite because it requires support of self-checkpoint. new f2002c6 Merge pull request #13250 from [BEAM-11187] Exclude SDF test suite because it requires support of self-checkpoint. 0dc4afe is described below commit 0dc4afe168a53c474650279d393615a3f146b7cb Author: Boyuan Zhang AuthorDate: Tue Nov 3 14:00:42 2020 -0800 Exclude SDF test suite because it requires support of self-checkpoint. --- ...Commit_Java_PortableValidatesRunner_Flink_Streaming.groovy | 11 ++- runners/flink/job-server/flink_job_server.gradle | 3 +++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink_Streaming.groovy b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink_Streaming.groovy index 5707a5b..e3d427e 100644 --- a/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink_Streaming.groovy +++ b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink_Streaming.groovy @@ -39,10 +39,11 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Flink_Streaming', tasks(':runners:flink:1.10:job-server:validatesPortableRunnerStreaming') commonJobProperties.setGradleSwitches(delegate) } -gradle { - rootBuildScriptDir(commonJobProperties.checkoutDir) - tasks(':runners:flink:1.10:job-server:validatesPortableRunnerStreamingCheckpoint') - commonJobProperties.setGradleSwitches(delegate) -} +// TODO(BEAM-10940): Enable this test suite once we have support. +//gradle { +// rootBuildScriptDir(commonJobProperties.checkoutDir) +// tasks(':runners:flink:1.10:job-server:validatesPortableRunnerStreamingCheckpoint') +// commonJobProperties.setGradleSwitches(delegate) +//} } } diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index c43865d..24b0838 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -145,6 +145,9 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi testCategories: { if (streaming && checkpointing) { includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' +excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo' +// TODO(BEAM-10940): Enable this test suite once we have support. +excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' // TestStreamSource does not support checkpointing excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } else {
[beam] branch master updated: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 7a309c2 [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2 new c9e2580 Merge pull request #12806 from [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2 7a309c2 is described below commit 7a309c2ce602fba67351f36067f9c65b150b86a8 Author: Boyuan Zhang AuthorDate: Wed Sep 9 21:05:02 2020 -0700 [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2 --- .../beam/runners/dataflow/DataflowRunner.java | 150 ++--- .../beam/sdk/io/gcp/pubsub/ExternalRead.java | 20 +-- .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 19 +-- .../beam/sdk/io/gcp/pubsub/PubsubMessages.java | 58 sdks/python/apache_beam/io/gcp/pubsub.py | 46 --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 5 +- .../runners/dataflow/dataflow_runner.py| 8 +- .../apache_beam/runners/direct/direct_runner.py| 19 ++- 8 files changed, 240 insertions(+), 85 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 8931143..1e0415d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -79,6 +79,7 @@ import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.core.construction.WriteFilesTranslation; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; +import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; @@ -92,6 +93,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; @@ -109,6 +111,7 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -129,6 +132,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; @@ -155,6 +159,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; @@ -476,10 +481,18 @@ public class DataflowRunner extends PipelineRunner { new StreamingPubsubIOReadOverrideFactory())); } if (!hasExperiment(options, "enable_custom_pubsub_sink")) { -overridesBuilder.add( -PTransformOverride.of( -PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), -new StreamingPubsubIOWriteOverrideFactory(this))); +if (hasExperiment(options, "use_runner_v2") +|| hasExperiment(options, "use_unified_worker")) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), + new DataflowWriteT
[beam] branch master updated: Add max buffering duration for GroupIntoBatches in Python SDK
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 578d6f6 Add max buffering duration for GroupIntoBatches in Python SDK new 5620419 Merge pull request #13144 from [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python 578d6f6 is described below commit 578d6f6816311d3a649608a5ec33d40d174d7e7b Author: sychen AuthorDate: Mon Oct 19 16:19:49 2020 -0700 Add max buffering duration for GroupIntoBatches in Python SDK --- .../apache_beam/runners/direct/direct_runner.py| 7 ++ sdks/python/apache_beam/transforms/util.py | 76 sdks/python/apache_beam/transforms/util_test.py| 132 +++-- 3 files changed, 157 insertions(+), 58 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8f221aa..914cebd 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -46,11 +46,13 @@ from apache_beam.runners.direct.clock import TestClock from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState +from apache_beam.transforms import userstate from apache_beam.transforms.core import CombinePerKey from apache_beam.transforms.core import CombineValuesDoFn from apache_beam.transforms.core import DoFn from apache_beam.transforms.core import ParDo from apache_beam.transforms.ptransform import PTransform +from apache_beam.transforms.timeutil import TimeDomain from apache_beam.typehints import trivial_inference # Note that the BundleBasedDirectRunner and SwitchingDirectRunner names are @@ -107,6 +109,11 @@ class SwitchingDirectRunner(PipelineRunner): if any(isinstance(arg, ArgumentPlaceholder) for arg in args_to_check): self.supported_by_fnapi_runner = False + if userstate.is_stateful_dofn(dofn): +_, timer_specs = userstate.get_dofn_specs(dofn) +for timer in timer_specs: + if timer.time_domain == TimeDomain.REAL_TIME: +self.supported_by_fnapi_runner = False # Check whether all transforms used in the pipeline are supported by the # FnApiRunner, and the pipeline was not meant to be run as streaming. diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index e6662f0..12f9c8a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -751,24 +751,42 @@ class GroupIntoBatches(PTransform): GroupIntoBatches is experimental. Its use case will depend on the runner if it has support of States and Timers. """ - def __init__(self, batch_size): + def __init__( + self, batch_size, max_buffering_duration_secs=None, clock=time.time): """Create a new GroupIntoBatches with batch size. Arguments: batch_size: (required) How many elements should be in a batch + max_buffering_duration_secs: (optional) How long in seconds at most an +incomplete batch of elements is allowed to be buffered in the states. +The duration must be a positive second duration and should be given as +an int or float. + clock: (optional) an alternative to time.time (mostly for testing) """ self.batch_size = batch_size +if max_buffering_duration_secs is not None: + assert max_buffering_duration_secs > 0, ( + 'max buffering duration should be a positive value') +self.max_buffering_duration_secs = max_buffering_duration_secs +self.clock = clock + def expand(self, pcoll): input_coder = coders.registry.get_coder(pcoll) return pcoll | ParDo( -_pardo_group_into_batches(self.batch_size, input_coder)) +_pardo_group_into_batches( +input_coder, +self.batch_size, +self.max_buffering_duration_secs, +self.clock)) -def _pardo_group_into_batches(batch_size, input_coder): +def _pardo_group_into_batches( +input_coder, batch_size, max_buffering_duration_secs, clock=time.time): ELEMENT_STATE = BagStateSpec('values', input_coder) COUNT_STATE = CombiningValueStateSpec('count', input_coder, CountCombineFn()) - EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK) + WINDOW_TIMER = TimerSpec('window_end', TimeDomain.WATERMARK) + BUFFERING_TIMER = TimerSpec('buffering_end', TimeDomain.REAL_TIME) class _GroupIntoBatchesDoFn(DoFn): def process( @@ -777,33 +795,47 @@ def _pardo_group_into_batches(batc
[beam] branch master updated: Lengthprefix any input coder for an ProcessBundleDescriptor.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new bc955de Lengthprefix any input coder for an ProcessBundleDescriptor. new 5419c3b Merge pull request #13120 from [BEAM-10940] Lengthprefix any input coder for an ProcessBundleDescriptor. bc955de is described below commit bc955ded10e0a054d437adf5c7117004de978d46 Author: Boyuan Zhang AuthorDate: Wed Oct 14 13:47:55 2020 -0700 Lengthprefix any input coder for an ProcessBundleDescriptor. --- .../control/ProcessBundleDescriptors.java | 36 +++- .../control/ProcessBundleDescriptorsTest.java | 101 + 2 files changed, 113 insertions(+), 24 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java index e76c130..ac3b882 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java @@ -37,7 +37,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.WireCoderSetting; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; -import org.apache.beam.runners.core.construction.ModelCoders; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.runners.core.construction.graph.ExecutableStage; @@ -59,7 +58,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable; @@ -141,9 +139,7 @@ public class ProcessBundleDescriptors { Map> timerSpecs = forTimerSpecs(stage, components); -if (bagUserStateSpecs.size() > 0 || timerSpecs.size() > 0) { - lengthPrefixKeyCoder(stage.getInputPCollection().getId(), components); -} +lengthPrefixAnyInputCoder(stage.getInputPCollection().getId(), components); // Copy data from components to ProcessBundleDescriptor. ProcessBundleDescriptor.Builder bundleDescriptorBuilder = @@ -174,26 +170,18 @@ public class ProcessBundleDescriptors { } /** - * Patches the input coder of a stateful transform to ensure that the byte representation of a key - * used to partition the input element at the Runner, matches the key byte representation received - * for state requests and timers from the SDK Harness. Stateful transforms always have a KvCoder - * as input. + * Patches the input coder of the transform to ensure that the byte representation of input used + * at the Runner, matches the byte representation received from the SDK Harness. */ - private static void lengthPrefixKeyCoder( - String inputColId, Components.Builder componentsBuilder) { -RunnerApi.PCollection pcollection = componentsBuilder.getPcollectionsOrThrow(inputColId); -RunnerApi.Coder kvCoder = componentsBuilder.getCodersOrThrow(pcollection.getCoderId()); -Preconditions.checkState( -ModelCoders.KV_CODER_URN.equals(kvCoder.getSpec().getUrn()), -"Stateful executable stages must use a KV coder, but is: %s", -kvCoder.getSpec().getUrn()); -String keyCoderId = ModelCoders.getKvCoderComponents(kvCoder).keyCoderId(); -// Retain the original coder, but wrap in LengthPrefixCoder -String newKeyCoderId = -LengthPrefixUnknownCoders.addLengthPrefixedCoder(keyCoderId, componentsBuilder, false); -// Replace old key coder with LengthPrefixCoder -kvCoder = kvCoder.toBuilder().setComponentCoderIds(0, newKeyCoderId).build(); -componentsBuilder.putCoders(pcollection.getCoderId(), kvCoder); + private static void lengthPrefixAnyInputCoder( + String inputPCollectionId, Components.Builder componentsBuilder) { +RunnerApi.PCollection pcollection = +componentsBuilder.getPcollectionsOrThrow(inputPCollectionId); +String newInputCoderId = +LengthPrefixUnknownCoders.addLengthPrefixedCoder( +pco
[beam] branch master updated: Add unsupported BundleFinalizationHandler to portable batch Flink.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6b00b0b Add unsupported BundleFinalizationHandler to portable batch Flink. new c0755cd Merge pull request #13054 from [BEAM-11021] Add unsupported BundleFinalizationHandler to portable batch Flink. 6b00b0b is described below commit 6b00b0bcd83dac89bf178cca3657e3d95305c5b5 Author: Boyuan Zhang AuthorDate: Thu Oct 8 16:41:58 2020 -0700 Add unsupported BundleFinalizationHandler to portable batch Flink. --- .../functions/FlinkExecutableStageFunction.java| 11 - .../FlinkExecutableStageFunctionTest.java | 27 +++--- .../fnexecution/control/SdkHarnessClient.java | 5 ++-- .../fnexecution/control/StageBundleFactory.java| 10 .../SparkExecutableStageFunctionTest.java | 8 --- .../runners/portability/flink_runner_test.py | 4 ++-- 6 files changed, 54 insertions(+), 11 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index f28d526..5081951 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler; import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; @@ -101,6 +102,7 @@ public class FlinkExecutableStageFunction extends AbstractRichFunction private transient ExecutableStageContext stageContext; private transient StageBundleFactory stageBundleFactory; private transient BundleProgressHandler progressHandler; + private transient BundleFinalizationHandler finalizationHandler; // Only initialized when the ExecutableStage is stateful private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory; private transient ExecutableStage executableStage; @@ -153,6 +155,12 @@ public class FlinkExecutableStageFunction extends AbstractRichFunction metricContainer.updateMetrics(stepName, response.getMonitoringInfosList()); } }; +// TODO(BEAM-11021): Support bundle finalization in portable batch. +finalizationHandler = +bundleId -> { + throw new UnsupportedOperationException( + "Portable Flink runner doesn't support bundle finalization in batch mode. For more details, please refer to https://issues.apache.org/jira/browse/BEAM-11021.";); +}; } private StateRequestHandler getStateRequestHandler( @@ -199,7 +207,8 @@ public class FlinkExecutableStageFunction extends AbstractRichFunction ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap); try (RemoteBundle bundle = -stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) { +stageBundleFactory.getBundle( +receiverFactory, stateRequestHandler, progressHandler, finalizationHandler)) { processElements(iterable, bundle); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java index 3142e1c..20f6f64 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java @@ -115,7 +115,18 @@ public class FlinkExecutableStageFunctionTest { when(runtimeContext.getDistributedCache()).thenReturn(distributedCache); when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory); RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class); -when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle); +when(stageBundleFactory.getBundle( +any(), +any(StateRequestHandler.class), +any(BundleProgressHandler.class)
[beam] branch master updated (eb75286 -> 9a1dc84)
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from eb75286 Merge pull request #13065: Remove stability history from Dataflow streaming VR job add a10ccb7 [BEAM-11053] Set projectId in BigQuery service. new 9a1dc84 Merge pull request #13068 [BEAM-11053] Set projectId in BigQuerySamplesIT BQ service. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../snippets/transforms/io/gcp/bigquery/BigQuerySamplesIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[beam] 01/01: Merge pull request #13068 [BEAM-11053] Set projectId in BigQuerySamplesIT BQ service.
This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit 9a1dc845483d44929214158e5b33116588c40464 Merge: eb75286 a10ccb7 Author: Boyuan Zhang <36090911+boyua...@users.noreply.github.com> AuthorDate: Mon Oct 12 10:54:05 2020 -0700 Merge pull request #13068 [BEAM-11053] Set projectId in BigQuerySamplesIT BQ service. [BEAM-11053] Set projectId in BigQuerySamplesIT BQ service. .../snippets/transforms/io/gcp/bigquery/BigQuerySamplesIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)