gemini-code-assist[bot] commented on code in PR #38980:
URL: https://github.com/apache/beam/pull/38980#discussion_r3423298503


##########
runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:
##########
@@ -0,0 +1,1786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryBundleFinalizer;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.ProcessFnRunner;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.adapter.FlinkKey;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import org.apache.beam.runners.flink.translation.utils.CheckpointStats;
+import org.apache.beam.runners.flink.translation.utils.Workarounds;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.NoopLock;
+import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.util.WindowedValueReceiver;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.execution.CheckpointingMode;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
+import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink operator for executing {@link DoFn DoFns}.
+ *
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <OutputT> the output type of the {@link DoFn}
+ */
+// We use Flink's lifecycle methods to initialize transient fields
+@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "keyfor",
+  "nullness"
+}) // TODO(https://github.com/apache/beam/issues/20497)
+public class DoFnOperator<PreInputT, InputT, OutputT>
+    extends AbstractStreamOperator<WindowedValue<OutputT>>
+    implements OneInputStreamOperator<WindowedValue<PreInputT>, 
WindowedValue<OutputT>>,
+        TwoInputStreamOperator<WindowedValue<PreInputT>, RawUnionValue, 
WindowedValue<OutputT>>,
+        Triggerable<FlinkKey, TimerData> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnOperator.class);
+  private final boolean isStreaming;
+
+  protected DoFn<InputT, OutputT> doFn;
+
+  protected final SerializablePipelineOptions serializedOptions;
+
+  protected final TupleTag<OutputT> mainOutputTag;
+  protected final List<TupleTag<?>> additionalOutputTags;
+
+  protected final Collection<PCollectionView<?>> sideInputs;
+  protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
+
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+
+  protected final OutputManagerFactory<OutputT> outputManagerFactory;
+
+  protected transient DoFnRunner<InputT, OutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> 
pushbackDoFnRunner;
+  protected transient BufferingDoFnRunner<InputT, OutputT> bufferingDoFnRunner;
+
+  protected transient SideInputHandler sideInputHandler;
+
+  protected transient SideInputReader sideInputReader;
+
+  protected transient BufferedOutputManager<OutputT> outputManager;
+
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  protected transient FlinkStateInternals<?> keyedStateInternals;
+  protected transient FlinkTimerInternals timerInternals;
+
+  protected final String stepName;
+
+  final Coder<WindowedValue<InputT>> windowedInputCoder;
+
+  final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+  final Coder<?> keyCoder;
+
+  final KeySelector<WindowedValue<InputT>, ?> keySelector;
+
+  final TimerInternals.TimerDataCoderV2 timerCoder;
+
+  /** Max number of elements to include in a bundle. */
+  private final long maxBundleSize;
+  /** Max duration of a bundle. */
+  private final long maxBundleTimeMills;
+
+  private final DoFnSchemaInformation doFnSchemaInformation;
+
+  private final Map<String, PCollectionView<?>> sideInputMapping;
+
+  /** If true, we must process elements only after a checkpoint is finished. */
+  final boolean requiresStableInput;
+
+  /**
+   * If both requiresStableInput and this parameter are true, we must flush 
the buffer during drain
+   * operation.
+   */
+  final boolean enableStableInputDrain;
+
+  final int numConcurrentCheckpoints;
+
+  private final boolean usesOnWindowExpiration;
+
+  private final boolean finishBundleBeforeCheckpointing;
+
+  /** Stores new finalizations being gathered. */
+  private transient InMemoryBundleFinalizer bundleFinalizer;
+  /** Pending bundle finalizations which have not been acknowledged yet. */
+  private transient LinkedHashMap<Long, 
List<InMemoryBundleFinalizer.Finalization>>
+      pendingFinalizations;
+  /**
+   * Keep a maximum of 32 bundle finalizations for {@link
+   * BundleFinalizer.Callback#onBundleSuccess()}.
+   */
+  private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32;
+
+  protected transient InternalTimerService<TimerData> timerService;
+  // Flink 1.20 moved timeServiceManager to protected scope. No longer need 
delegate
+  // private transient InternalTimeServiceManager<?> timeServiceManager;
+
+  private transient PushedBackElementsHandler<WindowedValue<InputT>> 
pushedBackElementsHandler;
+
+  /** Metrics container for reporting Beam metrics to Flink (null if metrics 
are disabled). */
+  transient @Nullable FlinkMetricContainer flinkMetricContainer;
+
+  /** Helper class to report the checkpoint duration. */
+  private transient @Nullable CheckpointStats checkpointStats;
+
+  /** A timer that finishes the current bundle after a fixed amount of time. */
+  private transient ScheduledFuture<?> checkFinishBundleTimer;
+
+  /**
+   * This and the below fields need to be volatile because we use multiple 
threads to access these.
+   * (a) the main processing thread (b) a timer thread to finish bundles by a 
timeout instead of the
+   * number of element However, we do not need a lock because Flink makes sure 
to acquire the
+   * "checkpointing" lock for the main processing but also for timer set via 
its {@code
+   * timerService}.
+   *
+   * <p>The volatile flag can be removed once 
https://issues.apache.org/jira/browse/FLINK-12481 has
+   * been addressed.
+   */
+  private transient volatile boolean bundleStarted;
+  /** Number of processed elements in the current bundle. */
+  private transient volatile long elementCount;
+  /** Time that the last bundle was finished (to set the timer). */
+  private transient volatile long lastFinishBundleTime;
+  /** Callback to be executed before the current bundle is started. */
+  private transient volatile Runnable preBundleCallback;
+  /** Callback to be executed after the current bundle was finished. */
+  private transient volatile Runnable bundleFinishedCallback;
+
+  // Watermark state.
+  // Volatile because these can be set in two mutually exclusive threads (see 
above).
+  private transient volatile long currentInputWatermark;
+  private transient volatile long currentSideInputWatermark;
+  private transient volatile long currentOutputWatermark;
+  private transient volatile long pushedBackWatermark;
+
+  /** Constructor for DoFnOperator. */
+  public DoFnOperator(
+      @Nullable DoFn<InputT, OutputT> doFn,
+      String stepName,
+      Coder<WindowedValue<InputT>> inputWindowedCoder,
+      Map<TupleTag<?>, Coder<?>> outputCoders,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      @Nullable Coder<?> keyCoder,
+      @Nullable KeySelector<WindowedValue<InputT>, ?> keySelector,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping) {
+    this.doFn = doFn;
+    this.stepName = stepName;
+    this.windowedInputCoder = inputWindowedCoder;
+    this.outputCoders = outputCoders;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputTagMapping = sideInputTagMapping;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.isStreaming = 
serializedOptions.get().as(FlinkPipelineOptions.class).isStreaming();
+    this.windowingStrategy = windowingStrategy;
+    this.outputManagerFactory = outputManagerFactory;
+
+    // API removed in Flink 2.0. setChainingStrategy is now set internally.
+    // setChainingStrategy(ChainingStrategy.ALWAYS);
+
+    this.keyCoder = keyCoder;
+    this.keySelector = keySelector;
+
+    this.timerCoder =
+        
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
+
+    FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+
+    this.maxBundleSize = flinkOptions.getMaxBundleSize();
+    Preconditions.checkArgument(maxBundleSize > 0, "Bundle size must be at 
least 1");
+    this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
+    Preconditions.checkArgument(maxBundleTimeMills > 0, "Bundle time must be 
at least 1");
+    this.doFnSchemaInformation = doFnSchemaInformation;
+    this.sideInputMapping = sideInputMapping;
+
+    this.requiresStableInput = isRequiresStableInput(doFn);
+
+    this.usesOnWindowExpiration =
+        doFn != null && 
DoFnSignatures.getSignature(doFn.getClass()).onWindowExpiration() != null;
+
+    if (requiresStableInput) {
+      Preconditions.checkState(
+          CheckpointingMode.valueOf(flinkOptions.getCheckpointingMode())
+              == CheckpointingMode.EXACTLY_ONCE,
+          "Checkpointing mode is not set to exactly once but 
@RequiresStableInput is used.");
+      Preconditions.checkState(
+          flinkOptions.getCheckpointingInterval() > 0,
+          "No checkpointing configured but pipeline uses 
@RequiresStableInput");
+      LOG.warn(
+          "Enabling stable input for transform {}. Will only process elements 
at most every {} milliseconds.",
+          stepName,
+          flinkOptions.getCheckpointingInterval()
+              + Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
+    }
+
+    this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain();
+
+    this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();
+
+    this.finishBundleBeforeCheckpointing = 
flinkOptions.getFinishBundleBeforeCheckpointing();
+  }
+
+  private boolean isRequiresStableInput(DoFn<InputT, OutputT> doFn) {
+    // WindowDoFnOperator does not use a DoFn
+    return doFn != null
+        && 
DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
+  }
+
+  @VisibleForTesting
+  boolean getRequiresStableInput() {
+    return requiresStableInput;
+  }
+
+  // allow overriding this in WindowDoFnOperator because this one dynamically 
creates
+  // the DoFn
+  protected DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
+  }
+
+  protected Iterable<WindowedValue<InputT>> 
preProcess(WindowedValue<PreInputT> input) {
+    // Assume Input is PreInputT
+    return Collections.singletonList((WindowedValue<InputT>) input);
+  }
+
+  // allow overriding this, for example SplittableDoFnOperator will not create 
a
+  // stateful DoFn runner because ProcessFn, which is used for executing a 
Splittable DoFn
+  // doesn't play by the normal DoFn rules and WindowDoFnOperator uses 
LateDataDroppingDoFnRunner
+  protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
+      DoFnRunner<InputT, OutputT> wrappedRunner, StepContext stepContext) {
+
+    if (keyCoder != null) {
+      StatefulDoFnRunner.CleanupTimer<InputT> cleanupTimer =
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer<InputT>(
+              timerInternals, windowingStrategy) {
+            @Override
+            public void setForWindow(InputT input, BoundedWindow window) {
+              if (!window.equals(GlobalWindow.INSTANCE) || 
usesOnWindowExpiration) {
+                // Skip setting a cleanup timer for the global window as these 
timers
+                // lead to potentially unbounded state growth in the runner, 
depending on key
+                // cardinality. Cleanup for global window will be performed 
upon arrival of the
+                // final watermark.
+                // In the case of OnWindowExpiration, we still set the timer.
+                super.setForWindow(input, window);
+              }
+            }
+          };
+
+      // we don't know the window type
+      //      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+              doFn, keyedStateInternals, windowCoder);
+
+      return DoFnRunners.defaultStatefulDoFnRunner(
+          doFn,
+          getInputCoder(),
+          wrappedRunner,
+          stepContext,
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner,
+          true /* requiresTimeSortedInput is supported */);
+
+    } else {
+      return doFnRunner;
+    }
+  }
+
+  @Override
+  public void setup(
+      StreamTask<?, ?> containingTask,
+      StreamConfig config,
+      Output<StreamRecord<WindowedValue<OutputT>>> output) {
+
+    // make sure that FileSystems is initialized correctly
+    FileSystems.setDefaultPipelineOptions(serializedOptions.get());
+
+    super.setup(containingTask, config, output);
+  }
+
+  protected boolean shoudBundleElements() {
+    return isStreaming;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor =
+        new ListStateDescriptor<>(
+            "pushed-back-elements",
+            new CoderTypeSerializer<>(windowedInputCoder, serializedOptions));
+
+    if (keySelector != null) {
+      pushedBackElementsHandler =
+          KeyedPushedBackElementsHandler.create(
+              keySelector, getKeyedStateBackend(), pushedBackStateDescriptor);
+    } else {
+      ListState<WindowedValue<InputT>> listState =
+          getOperatorStateBackend().getListState(pushedBackStateDescriptor);
+      pushedBackElementsHandler = 
NonKeyedPushedBackElementsHandler.create(listState);
+    }
+
+    currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentSideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+
+    sideInputReader = NullSideInputReader.of(sideInputs);
+
+    if (!sideInputs.isEmpty()) {
+
+      FlinkBroadcastStateInternals sideInputStateInternals =
+          new FlinkBroadcastStateInternals<>(
+              getContainingTask().getIndexInSubtaskGroup(),
+              getOperatorStateBackend(),
+              serializedOptions);
+
+      sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+
+      Stream<WindowedValue<InputT>> pushedBack = 
pushedBackElementsHandler.getElements();
+      long min =
+          pushedBack.map(v -> 
v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min);
+      pushedBackWatermark = min;
+    } else {
+      pushedBackWatermark = Long.MAX_VALUE;
+    }
+
+    // StatefulPardo or WindowDoFn
+    if (keyCoder != null) {
+      keyedStateInternals =
+          new FlinkStateInternals<>(
+              (KeyedStateBackend) getKeyedStateBackend(),
+              keyCoder,
+              windowingStrategy.getWindowFn().windowCoder(),
+              serializedOptions);
+
+      if (timerService == null) {
+        timerService =
+            getInternalTimerService(
+                "beam-timer", new CoderTypeSerializer<>(timerCoder, 
serializedOptions), this);
+      }
+
+      timerInternals = new FlinkTimerInternals(timerService);
+      Preconditions.checkNotNull(getTimeServiceManager(), "Time service 
manager is not set.");
+    }
+
+    outputManager =
+        outputManagerFactory.create(
+            output, getLockToAcquireForStateAccessDuringBundles(), 
getOperatorStateBackend());
+  }
+
+  /**
+   * Subclasses may provide a lock to ensure that the state backend is not 
accessed concurrently
+   * during bundle execution.
+   */
+  protected Lock getLockToAcquireForStateAccessDuringBundles() {
+    return NoopLock.get();
+  }
+
+  @Override
+  public void open() throws Exception {
+    // WindowDoFnOperator need use state and timer to get DoFn.
+    // So must wait StateInternals and TimerInternals ready.
+    // This will be called after initializeState()
+    this.doFn = getDoFn();
+
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+    doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, options);
+
+    StepContext stepContext = new FlinkStepContext();
+    doFnRunner =
+        DoFnRunners.simpleRunner(
+            options,
+            doFn,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            stepContext,
+            getInputCoder(),
+            outputCoders,
+            windowingStrategy,
+            doFnSchemaInformation,
+            sideInputMapping);
+
+    doFnRunner =
+        createBufferingDoFnRunnerIfNeeded(createWrappingDoFnRunner(doFnRunner, 
stepContext));
+    earlyBindStateIfNeeded();
+
+    if (!options.getDisableMetrics()) {
+      flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
+      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, 
flinkMetricContainer);
+      String checkpointMetricNamespace = options.getReportCheckpointDuration();
+      if (checkpointMetricNamespace != null) {
+        MetricName checkpointMetric =
+            MetricName.named(checkpointMetricNamespace, "checkpoint_duration");
+        checkpointStats =
+            new CheckpointStats(
+                () ->
+                    flinkMetricContainer
+                        .getMetricsContainer(stepName)
+                        .getDistribution(checkpointMetric));
+      }
+    }
+
+    elementCount = 0L;
+    lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
+
+    // Schedule timer to check timeout of finish bundle.
+    long bundleCheckPeriod = Math.max(maxBundleTimeMills / 2, 1);
+    checkFinishBundleTimer =
+        getProcessingTimeService()
+            .scheduleAtFixedRate(
+                timestamp -> checkInvokeFinishBundleByTime(), 
bundleCheckPeriod, bundleCheckPeriod);
+
+    if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
+      pushbackDoFnRunner =
+          new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, 
sideInputHandler);
+    } else {
+      pushbackDoFnRunner =
+          SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
+    }
+
+    bundleFinalizer = new InMemoryBundleFinalizer();
+    pendingFinalizations = new LinkedHashMap<>();
+  }
+
+  DoFnRunner<InputT, OutputT> createBufferingDoFnRunnerIfNeeded(
+      DoFnRunner<InputT, OutputT> wrappedRunner) throws Exception {
+
+    if (requiresStableInput) {
+      // put this in front of the root FnRunner before any additional wrappers
+      return this.bufferingDoFnRunner =
+          BufferingDoFnRunner.create(
+              wrappedRunner,
+              "stable-input-buffer",
+              windowedInputCoder,
+              windowingStrategy.getWindowFn().windowCoder(),
+              getOperatorStateBackend(),
+              getBufferingKeyedStateBackend(),
+              numConcurrentCheckpoints,
+              serializedOptions);
+    }
+    return wrappedRunner;
+  }
+
+  /**
+   * Retrieve a keyed state backend that should be used to buffer elements for
+   * {@code @RequiresStableInput} functionality. By default this is the 
default keyed backend, but
+   * can be override in {@link ExecutableStageDoFnOperator}.
+   *
+   * @return the keyed backend to use for element buffering
+   */
+  <K> @Nullable KeyedStateBackend<K> getBufferingKeyedStateBackend() {
+    return getKeyedStateBackend();
+  }
+
+  private void earlyBindStateIfNeeded() throws IllegalArgumentException, 
IllegalAccessException {
+    if (keyCoder != null) {
+      if (doFn != null) {
+        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+        FlinkStateInternals.EarlyBinder earlyBinder =
+            new FlinkStateInternals.EarlyBinder(
+                getKeyedStateBackend(),
+                serializedOptions,
+                windowingStrategy.getWindowFn().windowCoder());
+        for (DoFnSignature.StateDeclaration value : 
signature.stateDeclarations().values()) {
+          StateSpec<?> spec =
+              (StateSpec<?>) 
signature.stateDeclarations().get(value.id()).field().get(doFn);
+          spec.bind(value.id(), earlyBinder);
+        }
+        if (doFnRunner instanceof StatefulDoFnRunner) {
+          ((StatefulDoFnRunner<InputT, OutputT, BoundedWindow>) doFnRunner)
+              .getSystemStateTags()
+              .forEach(tag -> tag.getSpec().bind(tag.getId(), earlyBinder));
+        }
+      }
+    }
+  }
+
+  void cleanUp() throws Exception {
+    Optional.ofNullable(flinkMetricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);
+    Optional.ofNullable(checkFinishBundleTimer).ifPresent(timer -> 
timer.cancel(true));
+    Workarounds.deleteStaticCaches();
+    Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown);
+  }
+
+  void flushData() throws Exception {
+    // This is our last change to block shutdown of this operator while
+    // there are still remaining processing-time timers. Flink will ignore 
pending
+    // processing-time timers when upstream operators have shut down and will 
also
+    // shut down this operator with pending processing-time timers.
+    if (numProcessingTimeTimers() > 0) {
+      timerInternals.processPendingProcessingTimeTimers();
+    }
+    if (numProcessingTimeTimers() > 0) {
+      throw new RuntimeException(
+          "There are still "
+              + numProcessingTimeTimers()
+              + " processing-time timers left, this indicates a bug");
+    }
+    // make sure we send a +Inf watermark downstream. It can happen that we 
receive +Inf
+    // in processWatermark*() but have holds, so we have to re-evaluate here.
+    processWatermark(new Watermark(Long.MAX_VALUE));
+    // Make sure to finish the current bundle
+    while (bundleStarted) {
+      invokeFinishBundle();
+    }
+    if (requiresStableInput && enableStableInputDrain) {
+      // Flush any buffered events here before draining the pipeline. Note 
that this is best-effort
+      // and requiresStableInput contract might be violated in cases where 
buffer processing fails.
+      bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE);
+      updateOutputWatermark();
+    }
+    if (currentOutputWatermark < Long.MAX_VALUE) {
+      throw new RuntimeException(
+          String.format(
+              "There are still watermark holds left when terminating operator 
%s Watermark held %d",
+              getOperatorName(), currentOutputWatermark));
+    }
+
+    // sanity check: these should have been flushed out by +Inf watermarks
+    if (!sideInputs.isEmpty()) {
+
+      List<WindowedValue<InputT>> pushedBackElements =
+          pushedBackElementsHandler.getElements().collect(Collectors.toList());
+
+      if (pushedBackElements.size() > 0) {
+        String pushedBackString = Joiner.on(",").join(pushedBackElements);
+        throw new RuntimeException(
+            "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+      }
+    }
+  }
+
+  @Override
+  public void finish() throws Exception {
+    try {
+      flushData();
+    } finally {
+      super.finish();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      cleanUp();
+    } finally {
+      super.close();
+    }
+  }
+
+  protected int numProcessingTimeTimers() {
+    return getTimeServiceManager()
+        .map(
+            manager -> {
+              if (timeServiceManager instanceof 
InternalTimeServiceManagerImpl) {
+                final InternalTimeServiceManagerImpl<?> cast =
+                    (InternalTimeServiceManagerImpl<?>) timeServiceManager;
+                return cast.numProcessingTimeTimers();
+              } else if (timeServiceManager instanceof 
BatchExecutionInternalTimeServiceManager) {
+                return 0;
+              } else {
+                throw new IllegalStateException(
+                    String.format(
+                        "Unknown implementation of 
InternalTimerServiceManager. %s",
+                        timeServiceManager));
+              }
+            })
+        .orElse(0);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Inside the `.map(...)` lambda, the code directly references the class field 
`timeServiceManager` instead of using the lambda parameter `manager`. Since 
`getTimeServiceManager()` returns an `Optional` containing the active manager, 
you should use the `manager` parameter to avoid potential issues if the field 
is null or shadowed.
   
   ```suggestion
       return getTimeServiceManager()
           .map(
               manager -> {
                 if (manager instanceof InternalTimeServiceManagerImpl) {
                   final InternalTimeServiceManagerImpl<?> cast =
                       (InternalTimeServiceManagerImpl<?>) manager;
                   return cast.numProcessingTimeTimers();
                 } else if (manager instanceof 
BatchExecutionInternalTimeServiceManager) {
                   return 0;
                 } else {
                   throw new IllegalStateException(
                       String.format(
                           "Unknown implementation of 
InternalTimerServiceManager. %s",
                           manager));
                 }
               })
           .orElse(0);
   ```



##########
runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.CheckpointingMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} 
depending on the
+   * user-specified options.
+   */
+  public static StreamExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options) {
+    return createBatchExecutionEnvironment(
+        options,
+        MoreObjects.firstNonNull(options.getFilesToStage(), 
Collections.emptyList()),
+        options.getFlinkConfDir());
+  }
+
+  static StreamExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
+
+    LOG.info("Creating a Batch Execution Environment.");
+
+    // Although Flink uses Rest, it expects the address not to contain a http 
scheme
+    String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
+    Configuration flinkConfiguration = getFlinkConfiguration(confDir);
+    StreamExecutionEnvironment flinkBatchEnv;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(flinkMasterHostPort)) {
+      setManagedMemoryByFraction(flinkConfiguration);
+      disableClassLoaderLeakCheck(flinkConfiguration);
+      flinkBatchEnv = 
StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is only supported in RemoteStreamEnvironment");
+      }
+    } else if ("[collection]".equals(flinkMasterHostPort)) {
+      throw new UnsupportedOperationException(
+          "CollectionEnvironment has been removed in Flink 2. Use [local] 
instead.");
+    } else if ("[auto]".equals(flinkMasterHostPort)) {
+      flinkBatchEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+      if (flinkBatchEnv instanceof LocalStreamEnvironment) {
+        disableClassLoaderLeakCheck(flinkConfiguration);
+        flinkBatchEnv = 
StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
+        flinkBatchEnv.setParallelism(getDefaultLocalParallelism());
+      }
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is not supported in [auto].");
+      }
+    } else {
+      int defaultPort = flinkConfiguration.get(RestOptions.PORT);
+      HostAndPort hostAndPort =
+          
HostAndPort.fromString(flinkMasterHostPort).withDefaultPort(defaultPort);
+      flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort());
+      if (!options.getAttachedMode()) {
+        flinkConfiguration.set(DeploymentOptions.ATTACHED, 
options.getAttachedMode());
+      }
+      flinkBatchEnv =
+          StreamExecutionEnvironment.createRemoteEnvironment(
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
+              flinkConfiguration,
+              filesToStage.toArray(new String[filesToStage.size()]));
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
+    }
+
+    // Set the execution mode for data exchange.
+    flinkBatchEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // Set the correct parallelism, required by UnboundedSourceWrapper to 
generate consistent
+    // splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkBatchEnv.getParallelism(), 
flinkConfiguration);
+
+    flinkBatchEnv.setParallelism(parallelism);
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(parallelism);
+
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
+    applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
+
+    configureWebUIOptions(flinkBatchEnv.getConfig(), 
options.as(PipelineOptions.class));
+
+    return flinkBatchEnv;
+  }
+
+  @VisibleForTesting
+  static StreamExecutionEnvironment 
createStreamExecutionEnvironment(FlinkPipelineOptions options) {
+    return createStreamExecutionEnvironment(
+        options,
+        MoreObjects.firstNonNull(options.getFilesToStage(), 
Collections.emptyList()),
+        options.getFlinkConfDir());
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} 
depending on the
+   * user-specified options.
+   */
+  public static StreamExecutionEnvironment createStreamExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
+
+    LOG.info("Creating a Streaming Environment.");
+
+    // Although Flink uses Rest, it expects the address not to contain a http 
scheme
+    String masterUrl = stripHttpSchema(options.getFlinkMaster());
+    Configuration flinkConfiguration = getFlinkConfiguration(confDir);
+    configureRestartStrategy(options, flinkConfiguration);
+    configureStateBackend(options, flinkConfiguration);
+    StreamExecutionEnvironment flinkStreamEnv;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(masterUrl)) {
+      setManagedMemoryByFraction(flinkConfiguration);
+      disableClassLoaderLeakCheck(flinkConfiguration);
+      flinkStreamEnv =
+          StreamExecutionEnvironment.createLocalEnvironment(
+              getDefaultLocalParallelism(), flinkConfiguration);
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is only supported in RemoteStreamEnvironment");
+      }
+    } else if ("[auto]".equals(masterUrl)) {
+
+      flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfiguration);
+      if (flinkStreamEnv instanceof LocalStreamEnvironment) {
+        disableClassLoaderLeakCheck(flinkConfiguration);
+        flinkStreamEnv =
+            StreamExecutionEnvironment.createLocalEnvironment(
+                getDefaultLocalParallelism(), flinkConfiguration);
+      }
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is not only supported in [auto]");
+      }
+    } else {
+      int defaultPort = flinkConfiguration.get(RestOptions.PORT);
+      HostAndPort hostAndPort = 
HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort);
+      flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort());
+      final SavepointRestoreSettings savepointRestoreSettings;
+      if (options.getSavepointPath() != null) {
+        savepointRestoreSettings =
+            SavepointRestoreSettings.forPath(
+                options.getSavepointPath(), 
options.getAllowNonRestoredState());
+      } else {
+        savepointRestoreSettings = SavepointRestoreSettings.none();
+      }
+      if (!options.getAttachedMode()) {
+        flinkConfiguration.set(DeploymentOptions.ATTACHED, 
options.getAttachedMode());
+      }
+      flinkStreamEnv =
+          new RemoteStreamEnvironment(
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
+              flinkConfiguration,
+              filesToStage.toArray(new String[filesToStage.size()]),
+              null,
+              savepointRestoreSettings);
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
+    }
+
+    // Set the parallelism, required by UnboundedSourceWrapper to generate 
consistent splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkStreamEnv.getParallelism(), 
flinkConfiguration);
+    flinkStreamEnv.setParallelism(parallelism);
+    if (options.getMaxParallelism() > 0) {
+      flinkStreamEnv.setMaxParallelism(options.getMaxParallelism());
+    } else if (!options.isStreaming()) {
+      // In Flink maxParallelism defines the number of keyGroups.
+      // (see
+      // 
https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76)
+      // The default value (parallelism * 1.5)
+      // (see
+      // 
https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L137-L147)
+      // create a lot of skew so we force maxParallelism = parallelism in 
Batch mode.
+      LOG.info("Setting maxParallelism to {}", parallelism);
+      flinkStreamEnv.setMaxParallelism(parallelism);
+    }
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(parallelism);
+
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
+    if (!options.getOperatorChaining()) {
+      flinkStreamEnv.disableOperatorChaining();
+    }
+
+    configureCheckpointing(options, flinkStreamEnv);
+
+    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
+
+    if (options.getAutoWatermarkInterval() != null) {
+      
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
+    }
+    configureWebUIOptions(flinkStreamEnv.getConfig(), 
options.as(PipelineOptions.class));
+    configureCustomKryoSerializers(flinkStreamEnv.getConfig());
+
+    return flinkStreamEnv;
+  }
+
+  private static void configureWebUIOptions(
+      ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions 
options) {
+    SerializablePipelineOptions serializablePipelineOptions =
+        new SerializablePipelineOptions(options);
+    String optionsAsString = serializablePipelineOptions.toString();
+
+    try {
+      JsonNode node = mapper.readTree(optionsAsString);
+      JsonNode optionsNode = node.get("options");
+      Map<String, String> output =
+          Streams.stream(optionsNode.fields())
+              .filter(entry -> !entry.getValue().isNull())
+              .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue().asText()));
+
+      config.setGlobalJobParameters(new GlobalJobParametersImpl(output));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `optionsNode` is null (for example, if the serialized options JSON does 
not contain an `"options"` field), calling `optionsNode.fields()` will throw a 
`NullPointerException`. Although this is caught by the `catch (Exception e)` 
block, it is better to avoid throwing preventable exceptions by adding a null 
check.
   
   ```java
         JsonNode node = mapper.readTree(optionsAsString);
         JsonNode optionsNode = node.get("options");
         if (optionsNode != null) {
           Map<String, String> output =
               Streams.stream(optionsNode.fields())
                   .filter(entry -> !entry.getValue().isNull())
                   .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue().asText()));
   
           config.setGlobalJobParameters(new GlobalJobParametersImpl(output));
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to