arunpandianp commented on code in PR #37762:
URL: https://github.com/apache/beam/pull/37762#discussion_r2908424149
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java:
##########
@@ -341,7 +341,11 @@ private enum Encoding {
tag = (byte) (ordinal() << 4);
}
- public static Encoding fromTag(byte b) {
+ public static Encoding fromTag(byte b) throws CoderException {
+ int index = b >> 4;
+ if (index < 0 || index > values().length) {
Review Comment:
```suggestion
if (index < 0 || index >= values().length) {
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillServiceStreamMaxBackoffMillis(int value);
+ @Description(
+ "If true, log and skip input elements that are unable to successfully
decode from the streaming backend.")
+ ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
Review Comment:
it is not clear why this needs to be a ValueProvider<Boolean> and not a
Boolean. It seems like all the places where this is used are instantiated
during the runtime and the pipeline options will be available there.
I'm not familiar with how ValueProviders work, does this need to be a
ValueProvider?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java:
##########
@@ -48,30 +54,38 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
- if (bundleIndex == work.getMessageBundlesCount()
- || messageIndex ==
work.getMessageBundles(bundleIndex).getMessagesCount()) {
- current = Optional.absent();
- return false;
- }
- ++messageIndex;
- for (; bundleIndex < work.getMessageBundlesCount(); ++bundleIndex,
messageIndex = 0) {
+ while (true) {
+ if (bundleIndex >= work.getMessageBundlesCount()) {
+ current = null;
+ return false;
+ }
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
- if (messageIndex < bundle.getMessagesCount()) {
- current = Optional.of(decodeMessage(bundle.getMessages(messageIndex)));
+ ++messageIndex;
+ if (messageIndex >= bundle.getMessagesCount()) {
+ messageIndex = -1;
+ ++bundleIndex;
+ continue;
+ }
+ try {
+ current =
checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
return true;
+ } catch (RuntimeException | IOException e) {
+ if (Boolean.TRUE.equals(skipUndecodableElements.get())) {
+ LOG.error("Skipping input element due to decoding error", e);
Review Comment:
can we add sharding_key and work_token to the log?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java:
##########
@@ -48,30 +54,38 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
- if (bundleIndex == work.getMessageBundlesCount()
- || messageIndex ==
work.getMessageBundles(bundleIndex).getMessagesCount()) {
- current = Optional.absent();
- return false;
- }
- ++messageIndex;
- for (; bundleIndex < work.getMessageBundlesCount(); ++bundleIndex,
messageIndex = 0) {
+ while (true) {
+ if (bundleIndex >= work.getMessageBundlesCount()) {
+ current = null;
+ return false;
+ }
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
- if (messageIndex < bundle.getMessagesCount()) {
- current = Optional.of(decodeMessage(bundle.getMessages(messageIndex)));
+ ++messageIndex;
+ if (messageIndex >= bundle.getMessagesCount()) {
+ messageIndex = -1;
+ ++bundleIndex;
+ continue;
+ }
+ try {
+ current =
checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
return true;
+ } catch (RuntimeException | IOException e) {
+ if (Boolean.TRUE.equals(skipUndecodableElements.get())) {
Review Comment:
do we need a `skipDecodingExceptions.isAccessible()` check?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java:
##########
@@ -113,39 +137,45 @@ public Iterable<TimerData> timersIterable() {
});
}
+ private @Nullable WindowedValue<ElemT> parseElem(Windmill.Message message) {
+ try {
+ Instant timestamp =
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
+ Collection<? extends BoundedWindow> windows =
+ WindmillSink.decodeMetadataWindows(windowsCoder,
message.getMetadata());
+ PaneInfo paneInfo =
WindmillSink.decodeMetadataPane(message.getMetadata());
+ /**
+ * https://s.apache.org/beam-drain-mode - propagate drain bit if
aggregation/expiry induced by
+ * drain happened upstream
+ */
+ CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+ if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
+ BeamFnApi.Elements.ElementMetadata elementMetadata =
+ WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ drainingValueFromUpstream =
+ elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ ? CausedByDrain.CAUSED_BY_DRAIN
+ : CausedByDrain.NORMAL;
+ }
+ InputStream inputStream = message.getData().newInput();
+ ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
+ return WindowedValues.of(
+ value, timestamp, windows, paneInfo, null, null,
drainingValueFromUpstream);
+ } catch (RuntimeException | IOException e) {
+ if (!skipUndecodableElements) {
+ throw new RuntimeException(e);
+ }
+ LOG.error("Skipping input element due to decoding error", e);
Review Comment:
can we add sharding_key and work_token to the log?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java:
##########
@@ -126,7 +143,8 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K,
T>>> iterator() throw
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
- context.getDrainMode());
+ context.getDrainMode(),
+ Boolean.TRUE.equals(skipDecodingExceptions.get()));
Review Comment:
do we need a `skipDecodingExceptions.isAccessible()` check?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]