This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e95ef975a3c DataflowRunner: Experiment added to disable unbounded 
PCcollection checks, allowing batch execution over unbounded PCollections 
(#16773)
e95ef975a3c is described below

commit e95ef975a3cb2f2562493ff2e0943d3e000376dc
Author: Balázs Németh <nb...@users.noreply.github.com>
AuthorDate: Tue Jun 7 21:48:44 2022 +0200

    DataflowRunner: Experiment added to disable unbounded PCcollection checks, 
allowing batch execution over unbounded PCollections (#16773)
---
 .../beam/runners/dataflow/DataflowRunner.java      | 45 ++++++++++++--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 15 ++++-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 28 ++++++++-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   | 69 +++++++++++++++++++++-
 4 files changed, 144 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fbef0b8000c..459ad9340c9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -199,6 +199,10 @@ import org.slf4j.LoggerFactory;
 })
 public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
+  /** Experiment to "unsafely attempt to process unbounded data in batch 
mode". */
+  public static final String 
UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE =
+      "unsafely_attempt_to_process_unbounded_data_in_batch_mode";
+
   private static final Logger LOG = 
LoggerFactory.getLogger(DataflowRunner.class);
   /** Provided configuration options. */
   private final DataflowPipelineOptions options;
@@ -1054,7 +1058,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
 
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
-    if (containsUnboundedPCollection(pipeline)) {
+    if (shouldActAsStreaming(pipeline)) {
       options.setStreaming(true);
     }
 
@@ -1479,29 +1483,58 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   // setup overrides.
   @VisibleForTesting
   protected void replaceV1Transforms(Pipeline pipeline) {
-    boolean streaming = options.isStreaming() || 
containsUnboundedPCollection(pipeline);
+    boolean streaming = shouldActAsStreaming(pipeline);
     // Ensure all outputs of all reads are consumed before potentially 
replacing any
     // Read PTransforms
     UnconsumedReads.ensureAllReadsConsumed(pipeline);
     pipeline.replaceAll(getOverrides(streaming));
   }
 
-  private boolean containsUnboundedPCollection(Pipeline p) {
+  private boolean shouldActAsStreaming(Pipeline p) {
     class BoundednessVisitor extends PipelineVisitor.Defaults {
 
-      IsBounded boundedness = IsBounded.BOUNDED;
+      final List<PCollection> unboundedPCollections = new ArrayList<>();
 
       @Override
       public void visitValue(PValue value, Node producer) {
         if (value instanceof PCollection) {
-          boundedness = boundedness.and(((PCollection) value).isBounded());
+          PCollection pc = (PCollection) value;
+          if (pc.isBounded() == IsBounded.UNBOUNDED) {
+            unboundedPCollections.add(pc);
+          }
         }
       }
     }
 
     BoundednessVisitor visitor = new BoundednessVisitor();
     p.traverseTopologically(visitor);
-    return visitor.boundedness == IsBounded.UNBOUNDED;
+    if (visitor.unboundedPCollections.isEmpty()) {
+      if (options.isStreaming()) {
+        LOG.warn(
+            "No unbounded PCollection(s) found in a streaming pipeline! "
+                + "You might consider using 'streaming=false'!");
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      if (options.isStreaming()) {
+        return true;
+      } else if (hasExperiment(options, 
UNSAFELY_ATTEMPT_TO_PROCESS_UNBOUNDED_DATA_IN_BATCH_MODE)) {
+        LOG.info(
+            "Turning a batch pipeline into streaming due to unbounded 
PCollection(s) has been avoided! "
+                + "Unbounded PCollection(s): {}",
+            visitor.unboundedPCollections);
+        return false;
+      } else {
+        LOG.warn(
+            "Unbounded PCollection(s) found in a batch pipeline! "
+                + "You might consider using 'streaming=true'! "
+                + "Unbounded PCollection(s): {}",
+            visitor.unboundedPCollections);
+        return true;
+      }
+    }
   };
 
   /** Returns the DataflowPipelineTranslator associated with this object. */
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 509aa17103e..82a6f30fcfa 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1445,6 +1445,9 @@ public class KafkaIO {
         if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
           readTransform = readTransform.commitOffsets();
         }
+        if (kafkaRead.getStopReadTime() != null) {
+          readTransform = readTransform.withBounded();
+        }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
           Set<String> topics = new HashSet<>();
@@ -1843,6 +1846,8 @@ public class KafkaIO {
 
     abstract @Nullable TimestampPolicyFactory<K, V> 
getTimestampPolicyFactory();
 
+    abstract boolean isBounded();
+
     abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
 
     @AutoValue.Builder
@@ -1880,6 +1885,8 @@ public class KafkaIO {
       abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
           TimestampPolicyFactory<K, V> policy);
 
+      abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);
+
       abstract ReadSourceDescriptors<K, V> build();
     }
 
@@ -1888,6 +1895,7 @@ public class KafkaIO {
           .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
           .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
           .setCommitOffsetEnabled(false)
+          .setBounded(false)
           .build()
           .withProcessingTime()
           .withMonotonicallyIncreasingWatermarkEstimator();
@@ -2180,6 +2188,11 @@ public class KafkaIO {
           .withManualWatermarkEstimator();
     }
 
+    /** Enable treating the Kafka sources as bounded as opposed to the 
unbounded default. */
+    ReadSourceDescriptors<K, V> withBounded() {
+      return toBuilder().setBounded(true).build();
+    }
+
     @Override
     public PCollection<KafkaRecord<K, V>> 
expand(PCollection<KafkaSourceDescriptor> input) {
       checkArgument(getKeyDeserializerProvider() != null, 
"withKeyDeserializer() is required");
@@ -2214,7 +2227,7 @@ public class KafkaIO {
       try {
         PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> 
outputWithDescriptor =
             input
-                .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+                .apply(ParDo.of(ReadFromKafkaDoFn.<K, V>create(this)))
                 .setCoder(
                     KvCoder.of(
                         input
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 311a03351ab..b898af1ac6b 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
 import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
@@ -137,15 +138,36 @@ import org.slf4j.LoggerFactory;
  * stopping reading from removed {@link TopicPartition}, the stopping reading 
may not happens
  * immediately.
  */
-@UnboundedPerElement
 @SuppressWarnings({
   "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-class ReadFromKafkaDoFn<K, V>
+abstract class ReadFromKafkaDoFn<K, V>
     extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, 
KafkaRecord<K, V>>> {
 
-  ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
+  static <K, V> ReadFromKafkaDoFn<K, V> create(ReadSourceDescriptors 
transform) {
+    if (transform.isBounded()) {
+      return new Bounded<K, V>(transform);
+    } else {
+      return new Unbounded<K, V>(transform);
+    }
+  }
+
+  @UnboundedPerElement
+  private static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> {
+    Unbounded(ReadSourceDescriptors transform) {
+      super(transform);
+    }
+  }
+
+  @BoundedPerElement
+  private static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
+    Bounded(ReadSourceDescriptors transform) {
+      super(transform);
+    }
+  }
+
+  private ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
     this.consumerConfig = transform.getConsumerConfig();
     this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
     this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 0c0dc00b48c..80b1fac21b7 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -25,14 +25,27 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -52,6 +65,7 @@ import 
org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
 import org.joda.time.Instant;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -70,13 +84,13 @@ public class ReadFromKafkaDoFnTest {
       new SimpleMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition);
 
   private final ReadFromKafkaDoFn<String, String> dofnInstance =
-      new ReadFromKafkaDoFn(makeReadSourceDescriptor(consumer));
+      ReadFromKafkaDoFn.create(makeReadSourceDescriptor(consumer));
 
   private final ExceptionMockKafkaConsumer exceptionConsumer =
       new ExceptionMockKafkaConsumer(OffsetResetStrategy.NONE, topicPartition);
 
   private final ReadFromKafkaDoFn<String, String> exceptionDofnInstance =
-      new ReadFromKafkaDoFn<>(makeReadSourceDescriptor(exceptionConsumer));
+      ReadFromKafkaDoFn.create(makeReadSourceDescriptor(exceptionConsumer));
 
   private ReadSourceDescriptors<String, String> makeReadSourceDescriptor(
       Consumer kafkaMockConsumer) {
@@ -417,7 +431,7 @@ public class ReadFromKafkaDoFnTest {
   public void testProcessElementWhenTopicPartitionIsStopped() throws Exception 
{
     MockOutputReceiver receiver = new MockOutputReceiver();
     ReadFromKafkaDoFn<String, String> instance =
-        new ReadFromKafkaDoFn(
+        ReadFromKafkaDoFn.create(
             makeReadSourceDescriptor(consumer)
                 .toBuilder()
                 .setCheckStopReadingFn(
@@ -454,4 +468,53 @@ public class ReadFromKafkaDoFnTest {
         null,
         (OutputReceiver) receiver);
   }
+
+  private static final TypeDescriptor<KafkaSourceDescriptor>
+      KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR = new 
TypeDescriptor<KafkaSourceDescriptor>() {};
+
+  @Test
+  public void testBounded() {
+    BoundednessVisitor visitor = testBoundedness(rsd -> rsd.withBounded());
+    Assert.assertEquals(0, visitor.unboundedPCollections.size());
+  }
+
+  @Test
+  public void testUnbounded() {
+    BoundednessVisitor visitor = testBoundedness(rsd -> rsd);
+    Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
+  }
+
+  private BoundednessVisitor testBoundedness(
+      Function<ReadSourceDescriptors<String, String>, 
ReadSourceDescriptors<String, String>>
+          readSourceDescriptorsDecorator) {
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.empty(KAFKA_SOURCE_DESCRIPTOR_TYPE_DESCRIPTOR))
+        .apply(
+            ParDo.of(
+                ReadFromKafkaDoFn.<String, String>create(
+                    
readSourceDescriptorsDecorator.apply(makeReadSourceDescriptor(consumer)))))
+        .setCoder(
+            KvCoder.of(
+                SerializableCoder.of(KafkaSourceDescriptor.class),
+                org.apache.beam.sdk.io.kafka.KafkaRecordCoder.of(
+                    StringUtf8Coder.of(), StringUtf8Coder.of())));
+
+    BoundednessVisitor visitor = new BoundednessVisitor();
+    p.traverseTopologically(visitor);
+    return visitor;
+  }
+
+  static class BoundednessVisitor extends PipelineVisitor.Defaults {
+    final List<PCollection> unboundedPCollections = new ArrayList<>();
+
+    @Override
+    public void visitValue(PValue value, Node producer) {
+      if (value instanceof PCollection) {
+        PCollection pc = (PCollection) value;
+        if (pc.isBounded() == IsBounded.UNBOUNDED) {
+          unboundedPCollections.add(pc);
+        }
+      }
+    }
+  }
 }

Reply via email to