This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d6759cf7dd6 [Dataflow Streaming] Add a pipeline option to skip input 
elements that cannot be decoded successfully (#37762)
d6759cf7dd6 is described below

commit d6759cf7dd6d27192cb46d3c7d4cfaa535cae603
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 13:49:43 2026 +0000

    [Dataflow Streaming] Add a pipeline option to skip input elements that 
cannot be decoded successfully (#37762)
    
    Such messages will log an error but are otherwise discarded.
    Update PaneInfoCoder to throw a CoderException instead of 
ArrayOutOfBoundsException
---
 .../options/DataflowStreamingPipelineOptions.java  |   7 ++
 .../beam/runners/dataflow/worker/PubsubReader.java |  42 ++++++---
 .../dataflow/worker/UngroupedWindmillReader.java   |  41 +++++---
 .../dataflow/worker/WindmillKeyedWorkItem.java     | 105 ++++++++++++++-------
 .../worker/WindmillReaderIteratorBase.java         |  59 ++++++++----
 .../beam/runners/dataflow/worker/WindmillSink.java |  12 ++-
 .../dataflow/worker/WindowingWindmillReader.java   |  55 +++++++----
 .../worker/StreamingDataflowWorkerTest.java        |  68 ++++++++++++-
 .../dataflow/worker/WindmillKeyedWorkItemTest.java |  97 +++++++++++++++++++
 .../worker/WindmillReaderIteratorBaseTest.java     |  42 ++++++++-
 .../beam/sdk/transforms/windowing/PaneInfo.java    |   6 +-
 11 files changed, 418 insertions(+), 116 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index ffb2e27e55b..9cc98276f2d 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.joda.time.Duration;
 
 /** [Internal] Options for configuring StreamingDataflowWorker. */
@@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceStreamMaxBackoffMillis(int value);
 
+  @Description(
+      "If true, log and skip input elements that are unable to successfully 
decode from the streaming backend.")
+  ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
+
+  void setSkipInputElementsWithDecodingExceptions(ValueProvider<Boolean> 
value);
+
   @Description("Enables direct path mode for streaming engine.")
   @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
   boolean getIsWindmillServiceDirectPathEnabled();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
index 024b790e8ca..b60cb84415f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
@@ -24,6 +24,7 @@ import com.google.auto.service.AutoService;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import 
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
@@ -32,6 +33,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.WindowedValue;
@@ -41,26 +43,24 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A Reader that receives elements from Pubsub, via a Windmill server. */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 class PubsubReader<T> extends NativeReader<WindowedValue<T>> {
   private final Coder<T> coder;
   private final StreamingModeExecutionContext context;
   // Function used to parse Windmill data.
   // If non-null, data from Windmill is expected to be a PubsubMessage 
protobuf.
-  private final SimpleFunction<PubsubMessage, T> parseFn;
+  private final @Nullable SimpleFunction<PubsubMessage, T> parseFn;
+  private final ValueProvider<Boolean> skipUndecodableElements;
 
   PubsubReader(
       Coder<WindowedValue<T>> coder,
       StreamingModeExecutionContext context,
-      SimpleFunction<PubsubMessage, T> parseFn) {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    WindowedValueCoder<T> windowedCoder = (WindowedValueCoder) coder;
+      @Nullable SimpleFunction<PubsubMessage, T> parseFn,
+      ValueProvider<Boolean> skipUndecodableElements) {
+    WindowedValueCoder<T> windowedCoder = (WindowedValueCoder<T>) coder;
     this.coder = windowedCoder.getValueCoder();
     this.context = context;
     this.parseFn = parseFn;
+    this.skipUndecodableElements = skipUndecodableElements;
   }
 
   /** A {@link ReaderFactory.Registrar} for pubsub sources. */
@@ -75,19 +75,19 @@ class PubsubReader<T> extends 
NativeReader<WindowedValue<T>> {
     }
   }
 
+  @SuppressWarnings({"unchecked", "rawtypes"})
   static class Factory implements ReaderFactory {
     @Override
     public NativeReader<?> create(
         CloudObject cloudSourceSpec,
-        Coder<?> coder,
+        @Nullable Coder<?> coder,
         @Nullable PipelineOptions options,
         @Nullable DataflowExecutionContext executionContext,
         DataflowOperationContext operationContext)
         throws Exception {
-      coder = checkArgumentNotNull(coder);
-      @SuppressWarnings("unchecked")
-      Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) 
coder;
-      SimpleFunction<PubsubMessage, Object> parseFn = null;
+      Coder<WindowedValue<Object>> typedCoder =
+          (Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
+      @Nullable SimpleFunction<PubsubMessage, Object> parseFn = null;
       byte[] attributesFnBytes =
           getBytes(cloudSourceSpec, 
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, null);
       // If attributesFnBytes is set, Pubsub data will be in PubsubMessage 
protobuf format. The
@@ -98,8 +98,20 @@ class PubsubReader<T> extends NativeReader<WindowedValue<T>> 
{
             (SimpleFunction<PubsubMessage, Object>)
                 SerializableUtils.deserializeFromByteArray(attributesFnBytes, 
"serialized fn info");
       }
+      @Nullable
+      ValueProvider<Boolean> skipUndecodableElements =
+          (options != null)
+              ? options
+                  .as(DataflowStreamingPipelineOptions.class)
+                  .getSkipInputElementsWithDecodingExceptions()
+              : null;
       return new PubsubReader<>(
-          typedCoder, (StreamingModeExecutionContext) executionContext, 
parseFn);
+          typedCoder,
+          (StreamingModeExecutionContext) 
checkArgumentNotNull(executionContext),
+          parseFn,
+          skipUndecodableElements != null
+              ? skipUndecodableElements
+              : ValueProvider.StaticValueProvider.of(false));
     }
   }
 
@@ -110,7 +122,7 @@ class PubsubReader<T> extends 
NativeReader<WindowedValue<T>> {
 
   class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
     protected PubsubReaderIterator(Windmill.WorkItem work) {
-      super(work);
+      super(work, skipUndecodableElements);
     }
 
     @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index 625dc590d24..2347529cf4a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -25,12 +26,14 @@ import java.io.InputStream;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.CausedByDrain;
@@ -45,20 +48,21 @@ import org.joda.time.Instant;
 /**
  * A Reader that receives input data from a Windmill server, and returns it as 
individual elements.
  */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 class UngroupedWindmillReader<T> extends NativeReader<WindowedValue<T>> {
   private final Coder<T> valueCoder;
   private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
-  private StreamingModeExecutionContext context;
+  private final StreamingModeExecutionContext context;
+  private final ValueProvider<Boolean> skipUndecodableElements;
 
-  UngroupedWindmillReader(Coder<WindowedValue<T>> coder, 
StreamingModeExecutionContext context) {
+  UngroupedWindmillReader(
+      Coder<WindowedValue<T>> coder,
+      StreamingModeExecutionContext context,
+      ValueProvider<Boolean> skipUndecodableElements) {
     FullWindowedValueCoder<T> inputCoder = (FullWindowedValueCoder<T>) coder;
     this.valueCoder = inputCoder.getValueCoder();
     this.windowsCoder = inputCoder.getWindowsCoder();
     this.context = context;
+    this.skipUndecodableElements = skipUndecodableElements;
   }
 
   /** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */
@@ -75,6 +79,7 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
     }
   }
 
+  @SuppressWarnings({"unchecked", "rawtypes"})
   static class Factory implements ReaderFactory {
     @Override
     public NativeReader<?> create(
@@ -84,11 +89,21 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
         @Nullable DataflowExecutionContext executionContext,
         DataflowOperationContext operationContext)
         throws Exception {
-      coder = checkArgumentNotNull(coder);
-      @SuppressWarnings("unchecked")
-      Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) 
coder;
+      Coder<WindowedValue<Object>> typedCoder =
+          (Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
+      @Nullable
+      ValueProvider<Boolean> skipUndecodableElements =
+          options != null
+              ? options
+                  .as(DataflowStreamingPipelineOptions.class)
+                  .getSkipInputElementsWithDecodingExceptions()
+              : null;
       return new UngroupedWindmillReader<>(
-          typedCoder, (StreamingModeExecutionContext) executionContext);
+          typedCoder,
+          (StreamingModeExecutionContext) 
checkArgumentNotNull(executionContext),
+          skipUndecodableElements != null
+              ? skipUndecodableElements
+              : ValueProvider.StaticValueProvider.of(false));
     }
   }
 
@@ -97,9 +112,9 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
     return new UngroupedWindmillReaderIterator(context.getWorkItem());
   }
 
-  class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
+  class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
     UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
-      super(work);
+      super(work, skipUndecodableElements);
     }
 
     @Override
@@ -134,7 +149,7 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
       }
       if (valueCoder instanceof KvCoder) {
         KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
-        InputStream key = context.getSerializedKey().newInput();
+        InputStream key = checkNotNull(context.getSerializedKey()).newInput();
         notifyElementRead(key.available() + data.available() + 
metadata.available());
 
         @SuppressWarnings("unchecked")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 59489babf0b..c328719bfb5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -49,6 +50,8 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Fluent
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@link KeyedWorkItem} that wraps around a {@code 
Windmill.WorkItem}.
@@ -56,14 +59,13 @@ import org.joda.time.Instant;
  * @param <K> the key type
  * @param <ElemT> the element type
  */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
 public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, 
ElemT> {
   private static final Predicate<Timer> IS_WATERMARK =
       input -> input.getType() == Timer.Type.WATERMARK;
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillKeyedWorkItem.class);
+
   private final Windmill.WorkItem workItem;
   private final K key;
   // used to inform that timer was caused by drain
@@ -73,6 +75,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
   private final transient Coder<Collection<? extends BoundedWindow>> 
windowsCoder;
   private final transient Coder<ElemT> valueCoder;
   private final WindmillTagEncoding windmillTagEncoding;
+  private final boolean skipUndecodableElements;
 
   public WindmillKeyedWorkItem(
       K key,
@@ -82,6 +85,26 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
       Coder<ElemT> valueCoder,
       WindmillTagEncoding windmillTagEncoding,
       boolean drainMode) {
+    this(
+        key,
+        workItem,
+        windowCoder,
+        windowsCoder,
+        valueCoder,
+        windmillTagEncoding,
+        drainMode,
+        false);
+  }
+
+  public WindmillKeyedWorkItem(
+      K key,
+      Windmill.WorkItem workItem,
+      Coder<? extends BoundedWindow> windowCoder,
+      Coder<Collection<? extends BoundedWindow>> windowsCoder,
+      Coder<ElemT> valueCoder,
+      WindmillTagEncoding windmillTagEncoding,
+      boolean drainMode,
+      boolean skipUndecodableElements) {
     this.key = key;
     this.workItem = workItem;
     this.windowCoder = windowCoder;
@@ -89,6 +112,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
     this.valueCoder = valueCoder;
     this.windmillTagEncoding = windmillTagEncoding;
     this.drainMode = drainMode;
+    this.skipUndecodableElements = skipUndecodableElements;
   }
 
   @Override
@@ -113,39 +137,49 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
             });
   }
 
+  private @Nullable WindowedValue<ElemT> parseElem(Windmill.Message message) {
+    try {
+      Instant timestamp = 
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
+      Collection<? extends BoundedWindow> windows =
+          WindmillSink.decodeMetadataWindows(windowsCoder, 
message.getMetadata());
+      PaneInfo paneInfo = 
WindmillSink.decodeMetadataPane(message.getMetadata());
+      /**
+       * https://s.apache.org/beam-drain-mode - propagate drain bit if 
aggregation/expiry induced by
+       * drain happened upstream
+       */
+      CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+      if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
+        BeamFnApi.Elements.ElementMetadata elementMetadata =
+            WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
+        drainingValueFromUpstream =
+            elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING
+                ? CausedByDrain.CAUSED_BY_DRAIN
+                : CausedByDrain.NORMAL;
+      }
+      InputStream inputStream = message.getData().newInput();
+      ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
+      return WindowedValues.of(
+          value, timestamp, windows, paneInfo, null, null, 
drainingValueFromUpstream);
+    } catch (RuntimeException | IOException e) {
+      if (!skipUndecodableElements) {
+        throw new RuntimeException(e);
+      }
+      LOG.error(
+          "Skipping input element for work token {} on sharding key {} due to 
decoding error",
+          workItem.getWorkToken(),
+          workItem.getShardingKey(),
+          e);
+      return null;
+    }
+  }
+
   @Override
+  @SuppressWarnings("nullness")
   public Iterable<WindowedValue<ElemT>> elementsIterable() {
     return FluentIterable.from(workItem.getMessageBundlesList())
         .transformAndConcat(Windmill.InputMessageBundle::getMessagesList)
-        .transform(
-            message -> {
-              try {
-                Instant timestamp =
-                    
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
-                Collection<? extends BoundedWindow> windows =
-                    WindmillSink.decodeMetadataWindows(windowsCoder, 
message.getMetadata());
-                PaneInfo paneInfo = 
WindmillSink.decodeMetadataPane(message.getMetadata());
-                /**
-                 * https://s.apache.org/beam-drain-mode - propagate drain bit 
if aggregation/expiry
-                 * induced by drain happened upstream
-                 */
-                CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
-                if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
-                  BeamFnApi.Elements.ElementMetadata elementMetadata =
-                      WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
-                  drainingValueFromUpstream =
-                      elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING
-                          ? CausedByDrain.CAUSED_BY_DRAIN
-                          : CausedByDrain.NORMAL;
-                }
-                InputStream inputStream = message.getData().newInput();
-                ElemT value = valueCoder.decode(inputStream, 
Coder.Context.OUTER);
-                return WindowedValues.of(
-                    value, timestamp, windows, paneInfo, null, null, 
drainingValueFromUpstream);
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            });
+        .transform(this::parseElem)
+        .filter(Objects::nonNull);
   }
 
   @Override
@@ -237,12 +271,13 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
       return kvCoder.getValueCoder();
     }
 
+    @SuppressWarnings("unchecked")
     protected FakeKeyedWorkItemCoder(Coder<?> elemCoder) {
       if (elemCoder instanceof KeyedWorkItemCoder) {
-        KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder) elemCoder;
+        KeyedWorkItemCoder<K, ElemT> kwiCoder = (KeyedWorkItemCoder<K, ElemT>) 
elemCoder;
         this.kvCoder = KvCoder.of(kwiCoder.getKeyCoder(), 
kwiCoder.getElementCoder());
       } else if (elemCoder instanceof KvCoder) {
-        this.kvCoder = ((KvCoder) elemCoder);
+        this.kvCoder = (KvCoder<K, ElemT>) elemCoder;
       } else {
         throw new IllegalArgumentException(
             "FakeKeyedWorkItemCoder only works with KeyedWorkItemCoder or 
KvCoder; was: "
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
index 5e68641bf66..7e6508a4788 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
@@ -17,27 +17,33 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.values.WindowedValue;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for iterators that decode messages from bundles inside a {@link 
Windmill.WorkItem}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public abstract class WindmillReaderIteratorBase<T>
     extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
-  private Windmill.WorkItem work;
+  private final Windmill.WorkItem work;
   private int bundleIndex = 0;
   private int messageIndex = -1;
-  private Optional<WindowedValue<T>> current;
+  private @Nullable WindowedValue<T> current = null;
+  private final ValueProvider<Boolean> skipUndecodableElements;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
 
-  protected WindmillReaderIteratorBase(Windmill.WorkItem work) {
+  protected WindmillReaderIteratorBase(
+      Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
+    this.skipUndecodableElements = skipUndecodableElements;
     this.work = work;
   }
 
@@ -48,30 +54,43 @@ public abstract class WindmillReaderIteratorBase<T>
 
   @Override
   public boolean advance() throws IOException {
-    if (bundleIndex == work.getMessageBundlesCount()
-        || messageIndex == 
work.getMessageBundles(bundleIndex).getMessagesCount()) {
-      current = Optional.absent();
-      return false;
-    }
-    ++messageIndex;
-    for (; bundleIndex < work.getMessageBundlesCount(); ++bundleIndex, 
messageIndex = 0) {
+    while (true) {
+      if (bundleIndex >= work.getMessageBundlesCount()) {
+        current = null;
+        return false;
+      }
       Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
-      if (messageIndex < bundle.getMessagesCount()) {
-        current = Optional.of(decodeMessage(bundle.getMessages(messageIndex)));
+      ++messageIndex;
+      if (messageIndex >= bundle.getMessagesCount()) {
+        messageIndex = -1;
+        ++bundleIndex;
+        continue;
+      }
+      try {
+        current = 
checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
         return true;
+      } catch (RuntimeException | IOException e) {
+        if (skipUndecodableElements.isAccessible()
+            && Boolean.TRUE.equals(skipUndecodableElements.get())) {
+          LOG.error(
+              "Skipping input element for work token {} on sharding key {} due 
to decoding error",
+              work.getWorkToken(),
+              work.getShardingKey(),
+              e);
+          continue;
+        }
+        throw e;
       }
     }
-    current = Optional.absent();
-    return false;
   }
 
   protected abstract WindowedValue<T> decodeMessage(Windmill.Message message) 
throws IOException;
 
   @Override
   public WindowedValue<T> getCurrent() throws NoSuchElementException {
-    if (!current.isPresent()) {
+    if (current == null) {
       throw new NoSuchElementException();
     }
-    return current.get();
+    return current;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 2ed29125bd4..abee9a33df2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -214,7 +214,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
     }
 
     @Override
-    @SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"})
+    @SuppressWarnings("rawtypes")
     public long add(WindowedValue<T> data) throws IOException {
       ByteString key, value;
       ByteString id = ByteString.EMPTY;
@@ -228,16 +228,18 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         KvCoder kvCoder = (KvCoder) valueCoder;
         KV kv = checkNotNull((KV) data.getValue());
         key = encode(kvCoder.getKeyCoder(), kv.getKey());
-        Coder valueCoder = kvCoder.getValueCoder();
+        Coder nestedValueCoder = kvCoder.getValueCoder();
         // If ids are explicitly provided, use that instead of the 
windmill-generated id.
         // This is used when reading an UnboundedSource to deduplicate records.
-        if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
+        if (nestedValueCoder instanceof 
ValueWithRecordId.ValueWithRecordIdCoder) {
           ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId) 
kv.getValue());
           value =
-              encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(), 
valueAndId.getValue());
+              encode(
+                  ((ValueWithRecordIdCoder) nestedValueCoder).getValueCoder(),
+                  valueAndId.getValue());
           id = ByteString.copyFrom(valueAndId.getId());
         } else {
-          value = encode(valueCoder, kv.getValue());
+          value = encode(nestedValueCoder, kv.getValue());
         }
       } else {
         key = checkNotNull(context.getSerializedKey());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
index 7dd55d91211..173b254f639 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -25,12 +26,15 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.KeyedWorkItem;
+import 
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -42,10 +46,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * A Reader that receives input data from a Windmill server, and returns a 
singleton iterable
  * containing the work item.
  */
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
 class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWorkItem<K, T>>> {
 
   private final Coder<K> keyCoder;
@@ -53,19 +54,22 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
   private final Coder<? extends BoundedWindow> windowCoder;
   private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
   private StreamingModeExecutionContext context;
+  private final ValueProvider<Boolean> skipUndecodableElements;
 
   WindowingWindmillReader(
-      Coder<WindowedValue<KeyedWorkItem<K, T>>> coder, 
StreamingModeExecutionContext context) {
+      Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
+      StreamingModeExecutionContext context,
+      ValueProvider<Boolean> skipUndecodableElements) {
     FullWindowedValueCoder<KeyedWorkItem<K, T>> inputCoder =
         (FullWindowedValueCoder<KeyedWorkItem<K, T>>) coder;
     this.windowsCoder = inputCoder.getWindowsCoder();
     this.windowCoder = inputCoder.getWindowCoder();
-    @SuppressWarnings("unchecked")
     WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T> keyedWorkItemCoder =
         (WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T>) 
inputCoder.getValueCoder();
     this.keyCoder = keyedWorkItemCoder.getKeyCoder();
     this.valueCoder = keyedWorkItemCoder.getElementCoder();
     this.context = context;
+    this.skipUndecodableElements = skipUndecodableElements;
   }
 
   /** A {@link ReaderFactory.Registrar} for grouping windmill sources. */
@@ -87,6 +91,7 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
 
   static class Factory implements ReaderFactory {
     @Override
+    @SuppressWarnings("rawtypes")
     public NativeReader<?> create(
         CloudObject spec,
         @Nullable Coder<?> coder,
@@ -94,14 +99,22 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
         @Nullable DataflowExecutionContext context,
         DataflowOperationContext operationContext)
         throws Exception {
-      coder = checkArgumentNotNull(coder);
-      @SuppressWarnings({
-        "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-        "unchecked"
-      })
+      @SuppressWarnings("unchecked")
       Coder<WindowedValue<KeyedWorkItem<Object, Object>>> typedCoder =
-          (Coder<WindowedValue<KeyedWorkItem<Object, Object>>>) coder;
-      return WindowingWindmillReader.create(typedCoder, 
(StreamingModeExecutionContext) context);
+          (Coder<WindowedValue<KeyedWorkItem<Object, Object>>>) 
checkArgumentNotNull(coder);
+      @Nullable
+      ValueProvider<Boolean> skipUndecodableElements =
+          (options != null)
+              ? options
+                  .as(DataflowStreamingPipelineOptions.class)
+                  .getSkipInputElementsWithDecodingExceptions()
+              : null;
+      return WindowingWindmillReader.create(
+          typedCoder,
+          (StreamingModeExecutionContext) checkArgumentNotNull(context),
+          skipUndecodableElements != null
+              ? skipUndecodableElements
+              : ValueProvider.StaticValueProvider.of(false));
     }
   }
 
@@ -110,13 +123,17 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
    * StreamingModeExecutionContext}.
    */
   public static <K, T> WindowingWindmillReader<K, T> create(
-      Coder<WindowedValue<KeyedWorkItem<K, T>>> coder, 
StreamingModeExecutionContext context) {
-    return new WindowingWindmillReader<K, T>(coder, context);
+      Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
+      StreamingModeExecutionContext context,
+      ValueProvider<Boolean> skipUndecodableElements) {
+    return new WindowingWindmillReader<>(coder, context, 
skipUndecodableElements);
   }
 
   @Override
   public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() 
throws IOException {
-    final K key = keyCoder.decode(context.getSerializedKey().newInput(), 
Coder.Context.OUTER);
+    final K key =
+        keyCoder.decode(
+            checkStateNotNull(context.getSerializedKey()).newInput(), 
Coder.Context.OUTER);
     final WorkItem workItem = context.getWorkItem();
     KeyedWorkItem<K, T> keyedWorkItem =
         new WindmillKeyedWorkItem<>(
@@ -126,7 +143,9 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
             windowsCoder,
             valueCoder,
             context.getWindmillTagEncoding(),
-            context.getDrainMode());
+            context.getDrainMode(),
+            skipUndecodableElements.isAccessible()
+                && Boolean.TRUE.equals(skipUndecodableElements.get()));
     final boolean isEmptyWorkItem =
         (Iterables.isEmpty(keyedWorkItem.timersIterable())
             && Iterables.isEmpty(keyedWorkItem.elementsIterable()));
@@ -152,7 +171,7 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
       };
     } else {
       return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
-        private WindowedValue<KeyedWorkItem<K, T>> current;
+        private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
 
         @Override
         public boolean start() throws IOException {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 15304e5cb9f..fafa27f98fc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -148,6 +148,7 @@ import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -223,9 +224,7 @@ import org.slf4j.LoggerFactory;
 
 /** Unit tests for {@link StreamingDataflowWorker}. */
 @RunWith(Parameterized.class)
-// TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
-// released (2.11.0)
-@SuppressWarnings({"unused", "deprecation"})
+@SuppressWarnings("deprecation")
 public class StreamingDataflowWorkerTest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingDataflowWorkerTest.class);
@@ -857,6 +856,7 @@ public class StreamingDataflowWorkerTest {
     if (streamingEngine) {
       argsList.add("--experiments=enable_streaming_engine");
     }
+    LOG.info("Running with args {}", argsList);
     DataflowWorkerHarnessOptions options =
         PipelineOptionsFactory.fromArgs(argsList.toArray(new String[0]))
             .as(DataflowWorkerHarnessOptions.class);
@@ -870,7 +870,6 @@ public class StreamingDataflowWorkerTest {
     if (options.getActiveWorkRefreshPeriodMillis() == 10000) {
       options.setActiveWorkRefreshPeriodMillis(0);
     }
-
     return options;
   }
 
@@ -4318,6 +4317,67 @@ public class StreamingDataflowWorkerTest {
     }
   }
 
+  @Test
+  public void testSkipInputElementsWithDecodingExceptions() throws Exception {
+    KvCoder<String, Integer> kvCoder = KvCoder.of(StringUtf8Coder.of(), 
TextualIntegerCoder.of());
+    List<ParallelInstruction> instructions =
+        Arrays.asList(makeSourceInstruction(kvCoder), 
makeSinkInstruction(kvCoder, 0));
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            
defaultWorkerParams("--skipInputElementsWithDecodingExceptions=true")
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+    worker.start();
+
+    // Create a work item with one valid message and one corrupted message.
+    Windmill.GetWorkResponse.Builder builder = 
Windmill.GetWorkResponse.newBuilder();
+    Windmill.ComputationWorkItems.Builder computationBuilder =
+        
builder.addWorkBuilder().setComputationId(DEFAULT_COMPUTATION_ID).setInputDataWatermark(1);
+    Windmill.WorkItem.Builder workItemBuilder =
+        computationBuilder
+            .addWorkBuilder()
+            .setKey(DEFAULT_KEY_BYTES)
+            .setShardingKey(DEFAULT_SHARDING_KEY)
+            .setWorkToken(1)
+            .setCacheToken(2);
+
+    Windmill.InputMessageBundle.Builder bundleBuilder =
+        workItemBuilder
+            .addMessageBundlesBuilder()
+            .setSourceComputationId(DEFAULT_SOURCE_COMPUTATION_ID);
+
+    // Valid message
+    bundleBuilder
+        .addMessagesBuilder()
+        .setTimestamp(0)
+        .setData(ByteString.copyFromUtf8("12345"))
+        .setMetadata(addPaneTag(PaneInfo.NO_FIRING, 
intervalWindowBytes(DEFAULT_WINDOW)));
+
+    // Corrupted message (invalid data for kvCoder)
+    bundleBuilder
+        .addMessagesBuilder()
+        .setTimestamp(1000)
+        .setData(ByteString.copyFromUtf8("54321corrupted data"))
+        .setMetadata(addPaneTag(PaneInfo.NO_FIRING, 
intervalWindowBytes(DEFAULT_WINDOW)));
+
+    server.whenGetWorkCalled().thenReturn(builder.build());
+
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    worker.stop();
+
+    assertTrue(result.containsKey(1L));
+    Windmill.WorkItemCommitRequest commit = result.get(1L);
+
+    // Verify that only the valid message was processed and output.
+    assertEquals(1, commit.getOutputMessagesCount());
+    assertEquals(1, 
commit.getOutputMessages(0).getBundles(0).getMessagesCount());
+    assertEquals("key", 
commit.getOutputMessages(0).getBundles(0).getKey().toStringUtf8());
+    assertEquals(
+        "12345", 
commit.getOutputMessages(0).getBundles(0).getMessages(0).getData().toStringUtf8());
+  }
+
   static class BlockingFn extends DoFn<String, String> implements TestRule {
 
     public static AtomicReference<CountDownLatch> blocker =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 69aa4d0d69a..c1568058435 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -18,7 +18,9 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -118,6 +120,93 @@ public class WindmillKeyedWorkItemTest {
             WindowedValues.of("earth", new Instant(6), WINDOW_1, 
paneInfo(1))));
   }
 
+  @Test
+  public void testElementIterationWithSkipEnabled() throws Exception {
+    Windmill.WorkItem.Builder workItem =
+        Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+    Windmill.InputMessageBundle.Builder chunk1 = 
workItem.addMessageBundlesBuilder();
+    chunk1.setSourceComputationId("computation");
+    addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0));
+    addElement(chunk1, 7, "world", WINDOW_2, paneInfo(2));
+    Windmill.InputMessageBundle.Builder chunk2 = 
workItem.addMessageBundlesBuilder();
+    chunk2.setSourceComputationId("computation");
+    addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1));
+
+    KeyedWorkItem<String, String> keyedWorkItem =
+        new WindmillKeyedWorkItem<>(
+            KEY,
+            workItem.build(),
+            WINDOW_CODER,
+            WINDOWS_CODER,
+            VALUE_CODER,
+            windmillTagEncoding,
+            false,
+            true);
+
+    assertThat(
+        keyedWorkItem.elementsIterable(),
+        Matchers.contains(
+            WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)),
+            WindowedValues.of("world", new Instant(7), WINDOW_2, paneInfo(2)),
+            WindowedValues.of("earth", new Instant(6), WINDOW_1, 
paneInfo(1))));
+  }
+
+  @Test
+  public void testElementIterationSkips() throws Exception {
+    Windmill.WorkItem.Builder workItem =
+        Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+    Windmill.InputMessageBundle.Builder chunk1 = 
workItem.addMessageBundlesBuilder();
+    chunk1.setSourceComputationId("computation");
+    addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0));
+    addCorruptedElement(chunk1);
+    Windmill.InputMessageBundle.Builder chunk2 = 
workItem.addMessageBundlesBuilder();
+    chunk2.setSourceComputationId("computation");
+    addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1));
+
+    KeyedWorkItem<String, String> keyedWorkItem =
+        new WindmillKeyedWorkItem<>(
+            KEY,
+            workItem.build(),
+            WINDOW_CODER,
+            WINDOWS_CODER,
+            VALUE_CODER,
+            windmillTagEncoding,
+            false,
+            true);
+
+    assertThat(
+        keyedWorkItem.elementsIterable(),
+        Matchers.contains(
+            WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)),
+            WindowedValues.of("earth", new Instant(6), WINDOW_1, 
paneInfo(1))));
+  }
+
+  @Test
+  public void testElementIterationAllSkips() throws Exception {
+    Windmill.WorkItem.Builder workItem =
+        Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+    Windmill.InputMessageBundle.Builder chunk1 = 
workItem.addMessageBundlesBuilder();
+    chunk1.setSourceComputationId("computation");
+    addCorruptedElement(chunk1);
+    addCorruptedElement(chunk1);
+    Windmill.InputMessageBundle.Builder chunk2 = 
workItem.addMessageBundlesBuilder();
+    chunk2.setSourceComputationId("computation");
+    addCorruptedElement(chunk2);
+
+    KeyedWorkItem<String, String> keyedWorkItem =
+        new WindmillKeyedWorkItem<>(
+            KEY,
+            workItem.build(),
+            WINDOW_CODER,
+            WINDOWS_CODER,
+            VALUE_CODER,
+            windmillTagEncoding,
+            false,
+            true);
+
+    assertTrue(Iterables.isEmpty(keyedWorkItem.elementsIterable()));
+  }
+
   private void addElement(
       Windmill.InputMessageBundle.Builder chunk,
       long timestamp,
@@ -156,6 +245,14 @@ public class WindmillKeyedWorkItemTest {
         .setMetadata(encodedMetadata);
   }
 
+  private void addCorruptedElement(Windmill.InputMessageBundle.Builder chunk) {
+    chunk
+        .addMessagesBuilder()
+        .setTimestamp(1)
+        .setData(ByteString.copyFromUtf8("bad data"))
+        .setMetadata(ByteString.copyFromUtf8("bad metadata"));
+  }
+
   private PaneInfo paneInfo(int index) {
     return PaneInfo.createPane(false, false, Timing.EARLY, index, -1);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
index b6a4cb86c68..61e2f4250d0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -36,12 +39,16 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class WindmillReaderIteratorBaseTest {
   private static class TestWindmillReaderIterator extends 
WindmillReaderIteratorBase<Long> {
-    protected TestWindmillReaderIterator(Windmill.WorkItem work) {
-      super(work);
+    protected TestWindmillReaderIterator(
+        Windmill.WorkItem work, ValueProvider<Boolean> 
skipUndecodableElements) {
+      super(work, skipUndecodableElements);
     }
 
     @Override
-    protected WindowedValue<Long> decodeMessage(Windmill.Message message) {
+    protected WindowedValue<Long> decodeMessage(Windmill.Message message) 
throws CoderException {
+      if (message.getTimestamp() < 0) {
+        throw new CoderException("Injected decoding error to test skipping.");
+      }
       return WindowedValues.valueInGlobalWindow(message.getTimestamp());
     }
   }
@@ -60,7 +67,26 @@ public class WindmillReaderIteratorBaseTest {
     testForMessageBundleCounts(0, 0, 1, 3, 0, 1, 0, 0, 0, 0);
   }
 
+  @Test
+  public void testSkipErrors() throws IOException {
+    testForMessageBundleCounts(true);
+    testForMessageBundleCounts(true, 0);
+    testForMessageBundleCounts(true, 0, 0);
+    testForMessageBundleCounts(true, 1);
+    testForMessageBundleCounts(true, 2);
+    testForMessageBundleCounts(true, 1, 1);
+    testForMessageBundleCounts(true, 0, 1);
+    testForMessageBundleCounts(true, 1, 0);
+    testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 1);
+    testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 0);
+  }
+
   private void testForMessageBundleCounts(int... messageBundleCounts) throws 
IOException {
+    testForMessageBundleCounts(false, messageBundleCounts);
+  }
+
+  private void testForMessageBundleCounts(boolean skipErrors, int... 
messageBundleCounts)
+      throws IOException {
     List<Windmill.InputMessageBundle> bundles = new ArrayList<>();
     long numTotalMessages = 0;
     for (int count : messageBundleCounts) {
@@ -73,6 +99,10 @@ public class WindmillReaderIteratorBaseTest {
                 .setData(ByteString.EMPTY)
                 .build());
       }
+      if (skipErrors && ThreadLocalRandom.current().nextBoolean()) {
+        bundle.addMessages(
+            
Windmill.Message.newBuilder().setTimestamp(-10).setData(ByteString.EMPTY).build());
+      }
       bundles.add(bundle.build());
     }
     Windmill.WorkItem workItem =
@@ -81,7 +111,9 @@ public class WindmillReaderIteratorBaseTest {
             .setWorkToken(0L)
             .addAllMessageBundles(bundles)
             .build();
-    try (TestWindmillReaderIterator iter = new 
TestWindmillReaderIterator(workItem)) {
+    try (TestWindmillReaderIterator iter =
+        new TestWindmillReaderIterator(
+            workItem, ValueProvider.StaticValueProvider.of(skipErrors))) {
       List<Long> actual =
           ReaderTestUtils.windowedValuesToValues(
               ReaderUtils.readRemainingFromIterator(iter, false));
@@ -90,7 +122,7 @@ public class WindmillReaderIteratorBaseTest {
       for (int i = 0; i < numTotalMessages; ++i) {
         expected.add((long) i);
       }
-      assertEquals(Arrays.toString(messageBundleCounts), expected, actual);
+      assertEquals(Arrays.toString(messageBundleCounts) + skipErrors, 
expected, actual);
     }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index f253d179483..4805122035a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -341,7 +341,11 @@ public final class PaneInfo {
         tag = (byte) (ordinal() << 4);
       }
 
-      public static Encoding fromTag(byte b) {
+      public static Encoding fromTag(byte b) throws CoderException {
+        int index = b >> 4;
+        if (index < 0 || index >= values().length) {
+          throw new CoderException("Invalid pane encoding " + index);
+        }
         return Encoding.values()[b >> 4];
       }
     }

Reply via email to