This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e95ef975a3c DataflowRunner: Experiment added to disable unbounded PCcollection checks, allowing batch execution over unbounded PCollections (#16773) e95ef975a3c is described below commit e95ef975a3cb2f2562493ff2e0943d3e000376dc Author: Balázs Németh <nb...@users.noreply.github.com> AuthorDate: Tue Jun 7 21:48:44 2022 +0200 DataflowRunner: Experiment added to disable unbounded PCcollection checks, allowing batch execution over unbounded PCollections (#16773) --- .../beam/runners/dataflow/DataflowRunner.java | 45 ++++++++++++-- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 15 ++++- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 28 ++++++++- .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 69 +++++++++++++++++++++- 4 files changed, 144 insertions(+), 13 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fbef0b8000c..459ad9340c9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -199,6 +199,10 @@ import org.slf4j.LoggerFactory; }) public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { + /** Experiment to "unsafely attempt to process unbounded data in batch mode". */ + public static final String UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE = + "unsafely_attempt_to_process_unbounded_data_in_batch_mode"; + private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class); /** Provided configuration options. */ private final DataflowPipelineOptions options; @@ -1054,7 +1058,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); - if (containsUnboundedPCollection(pipeline)) { + if (shouldActAsStreaming(pipeline)) { options.setStreaming(true); } @@ -1479,29 +1483,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // setup overrides. @VisibleForTesting protected void replaceV1Transforms(Pipeline pipeline) { - boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline); + boolean streaming = shouldActAsStreaming(pipeline); // Ensure all outputs of all reads are consumed before potentially replacing any // Read PTransforms UnconsumedReads.ensureAllReadsConsumed(pipeline); pipeline.replaceAll(getOverrides(streaming)); } - private boolean containsUnboundedPCollection(Pipeline p) { + private boolean shouldActAsStreaming(Pipeline p) { class BoundednessVisitor extends PipelineVisitor.Defaults { - IsBounded boundedness = IsBounded.BOUNDED; + final List<PCollection> unboundedPCollections = new ArrayList<>(); @Override public void visitValue(PValue value, Node producer) { if (value instanceof PCollection) { - boundedness = boundedness.and(((PCollection) value).isBounded()); + PCollection pc = (PCollection) value; + if (pc.isBounded() == IsBounded.UNBOUNDED) { + unboundedPCollections.add(pc); + } } } } BoundednessVisitor visitor = new BoundednessVisitor(); p.traverseTopologically(visitor); - return visitor.boundedness == IsBounded.UNBOUNDED; + if (visitor.unboundedPCollections.isEmpty()) { + if (options.isStreaming()) { + LOG.warn( + "No unbounded PCollection(s) found in a streaming pipeline! " + + "You might consider using 'streaming=false'!"); + return true; + } else { + return false; + } + } else { + if (options.isStreaming()) { + return true; + } else if (hasExperiment(options, UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE)) { + LOG.info( + "Turning a batch pipeline into streaming due to unbounded PCollection(s) has been avoided! " + + "Unbounded PCollection(s): {}", + visitor.unboundedPCollections); + return false; + } else { + LOG.warn( + "Unbounded PCollection(s) found in a batch pipeline! " + + "You might consider using 'streaming=true'! " + + "Unbounded PCollection(s): {}", + visitor.unboundedPCollections); + return true; + } + } }; /** Returns the DataflowPipelineTranslator associated with this object. */ diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 509aa17103e..82a6f30fcfa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1445,6 +1445,9 @@ public class KafkaIO { if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { readTransform = readTransform.commitOffsets(); } + if (kafkaRead.getStopReadTime() != null) { + readTransform = readTransform.withBounded(); + } PCollection<KafkaSourceDescriptor> output; if (kafkaRead.isDynamicRead()) { Set<String> topics = new HashSet<>(); @@ -1843,6 +1846,8 @@ public class KafkaIO { abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory(); + abstract boolean isBounded(); + abstract ReadSourceDescriptors.Builder<K, V> toBuilder(); @AutoValue.Builder @@ -1880,6 +1885,8 @@ public class KafkaIO { abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory( TimestampPolicyFactory<K, V> policy); + abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded); + abstract ReadSourceDescriptors<K, V> build(); } @@ -1888,6 +1895,7 @@ public class KafkaIO { .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setCommitOffsetEnabled(false) + .setBounded(false) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2180,6 +2188,11 @@ public class KafkaIO { .withManualWatermarkEstimator(); } + /** Enable treating the Kafka sources as bounded as opposed to the unbounded default. */ + ReadSourceDescriptors<K, V> withBounded() { + return toBuilder().setBounded(true).build(); + } + @Override public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) { checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required"); @@ -2214,7 +2227,7 @@ public class KafkaIO { try { PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor = input - .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this))) + .apply(ParDo.of(ReadFromKafkaDoFn.<K, V>create(this))) .setCoder( KvCoder.of( input diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 311a03351ab..b898af1ac6b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; @@ -137,15 +138,36 @@ import org.slf4j.LoggerFactory; * stopping reading from removed {@link TopicPartition}, the stopping reading may not happens * immediately. */ -@UnboundedPerElement @SuppressWarnings({ "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) -class ReadFromKafkaDoFn<K, V> +abstract class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> { - ReadFromKafkaDoFn(ReadSourceDescriptors transform) { + static <K, V> ReadFromKafkaDoFn<K, V> create(ReadSourceDescriptors transform) { + if (transform.isBounded()) { + return new Bounded<K, V>(transform); + } else { + return new Unbounded<K, V>(transform); + } + } + + @UnboundedPerElement + private static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> { + Unbounded(ReadSourceDescriptors transform) { + super(transform); + } + } + + @BoundedPerElement + private static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> { + Bounded(ReadSourceDescriptors transform) { + super(transform); + } + } + + private ReadFromKafkaDoFn(ReadSourceDescriptors transform) { this.consumerConfig = transform.getConsumerConfig(); this.offsetConsumerConfig = transform.getOffsetConsumerConfig(); this.keyDeserializerProvider = transform.getKeyDeserializerProvider(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 0c0dc00b48c..80b1fac21b7 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -25,14 +25,27 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -52,6 +65,7 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Instant; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -70,13 +84,13 @@ public class ReadFromKafkaDoFnTest { new SimpleMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition); private final ReadFromKafkaDoFn<String, String> dofnInstance = - new ReadFromKafkaDoFn(makeReadSourceDescriptor(consumer)); + ReadFromKafkaDoFn.create(makeReadSourceDescriptor(consumer)); private final ExceptionMockKafkaConsumer exceptionConsumer = new ExceptionMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition); private final ReadFromKafkaDoFn<String, String> exceptionDofnInstance = - new ReadFromKafkaDoFn<>(makeReadSourceDescriptor(exceptionConsumer)); + ReadFromKafkaDoFn.create(makeReadSourceDescriptor(exceptionConsumer)); private ReadSourceDescriptors<String, String> makeReadSourceDescriptor( Consumer kafkaMockConsumer) { @@ -417,7 +431,7 @@ public class ReadFromKafkaDoFnTest { public void testProcessElementWhenTopicPartitionIsStopped() throws Exception { MockOutputReceiver receiver = new MockOutputReceiver(); ReadFromKafkaDoFn<String, String> instance = - new ReadFromKafkaDoFn( + ReadFromKafkaDoFn.create( makeReadSourceDescriptor(consumer) .toBuilder() .setCheckStopReadingFn( @@ -454,4 +468,53 @@ public class ReadFromKafkaDoFnTest { null, (OutputReceiver) receiver); } + + private static final TypeDescriptor<KafkaSourceDescriptor> + KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR = new TypeDescriptor<KafkaSourceDescriptor>() {}; + + @Test + public void testBounded() { + BoundednessVisitor visitor = testBoundedness(rsd -> rsd.withBounded()); + Assert.assertEquals(0, visitor.unboundedPCollections.size()); + } + + @Test + public void testUnbounded() { + BoundednessVisitor visitor = testBoundedness(rsd -> rsd); + Assert.assertNotEquals(0, visitor.unboundedPCollections.size()); + } + + private BoundednessVisitor testBoundedness( + Function<ReadSourceDescriptors<String, String>, ReadSourceDescriptors<String, String>> + readSourceDescriptorsDecorator) { + TestPipeline p = TestPipeline.create(); + p.apply(Create.empty(KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR)) + .apply( + ParDo.of( + ReadFromKafkaDoFn.<String, String>create( + readSourceDescriptorsDecorator.apply(makeReadSourceDescriptor(consumer))))) + .setCoder( + KvCoder.of( + SerializableCoder.of(KafkaSourceDescriptor.class), + org.apache.beam.sdk.io.kafka.KafkaRecordCoder.of( + StringUtf8Coder.of(), StringUtf8Coder.of()))); + + BoundednessVisitor visitor = new BoundednessVisitor(); + p.traverseTopologically(visitor); + return visitor; + } + + static class BoundednessVisitor extends PipelineVisitor.Defaults { + final List<PCollection> unboundedPCollections = new ArrayList<>(); + + @Override + public void visitValue(PValue value, Node producer) { + if (value instanceof PCollection) { + PCollection pc = (PCollection) value; + if (pc.isBounded() == IsBounded.UNBOUNDED) { + unboundedPCollections.add(pc); + } + } + } + } }