mananmangal commented on code in PR #38980:
URL: https://github.com/apache/beam/pull/38980#discussion_r3423360711


##########
runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java:
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExternalizedCheckpointRetention;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.CheckpointingMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} 
depending on the
+   * user-specified options.
+   */
+  public static StreamExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options) {
+    return createBatchExecutionEnvironment(
+        options,
+        MoreObjects.firstNonNull(options.getFilesToStage(), 
Collections.emptyList()),
+        options.getFlinkConfDir());
+  }
+
+  static StreamExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
+
+    LOG.info("Creating a Batch Execution Environment.");
+
+    // Although Flink uses Rest, it expects the address not to contain a http 
scheme
+    String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster());
+    Configuration flinkConfiguration = getFlinkConfiguration(confDir);
+    StreamExecutionEnvironment flinkBatchEnv;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(flinkMasterHostPort)) {
+      setManagedMemoryByFraction(flinkConfiguration);
+      disableClassLoaderLeakCheck(flinkConfiguration);
+      flinkBatchEnv = 
StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is only supported in RemoteStreamEnvironment");
+      }
+    } else if ("[collection]".equals(flinkMasterHostPort)) {
+      throw new UnsupportedOperationException(
+          "CollectionEnvironment has been removed in Flink 2. Use [local] 
instead.");
+    } else if ("[auto]".equals(flinkMasterHostPort)) {
+      flinkBatchEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+      if (flinkBatchEnv instanceof LocalStreamEnvironment) {
+        disableClassLoaderLeakCheck(flinkConfiguration);
+        flinkBatchEnv = 
StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration);
+        flinkBatchEnv.setParallelism(getDefaultLocalParallelism());
+      }
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is not supported in [auto].");
+      }
+    } else {
+      int defaultPort = flinkConfiguration.get(RestOptions.PORT);
+      HostAndPort hostAndPort =
+          
HostAndPort.fromString(flinkMasterHostPort).withDefaultPort(defaultPort);
+      flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort());
+      if (!options.getAttachedMode()) {
+        flinkConfiguration.set(DeploymentOptions.ATTACHED, 
options.getAttachedMode());
+      }
+      flinkBatchEnv =
+          StreamExecutionEnvironment.createRemoteEnvironment(
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
+              flinkConfiguration,
+              filesToStage.toArray(new String[filesToStage.size()]));
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
+    }
+
+    // Set the execution mode for data exchange.
+    flinkBatchEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // Set the correct parallelism, required by UnboundedSourceWrapper to 
generate consistent
+    // splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkBatchEnv.getParallelism(), 
flinkConfiguration);
+
+    flinkBatchEnv.setParallelism(parallelism);
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(parallelism);
+
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
+    applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options);
+
+    configureWebUIOptions(flinkBatchEnv.getConfig(), 
options.as(PipelineOptions.class));
+
+    return flinkBatchEnv;
+  }
+
+  @VisibleForTesting
+  static StreamExecutionEnvironment 
createStreamExecutionEnvironment(FlinkPipelineOptions options) {
+    return createStreamExecutionEnvironment(
+        options,
+        MoreObjects.firstNonNull(options.getFilesToStage(), 
Collections.emptyList()),
+        options.getFlinkConfDir());
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} 
depending on the
+   * user-specified options.
+   */
+  public static StreamExecutionEnvironment createStreamExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable 
String confDir) {
+
+    LOG.info("Creating a Streaming Environment.");
+
+    // Although Flink uses Rest, it expects the address not to contain a http 
scheme
+    String masterUrl = stripHttpSchema(options.getFlinkMaster());
+    Configuration flinkConfiguration = getFlinkConfiguration(confDir);
+    configureRestartStrategy(options, flinkConfiguration);
+    configureStateBackend(options, flinkConfiguration);
+    StreamExecutionEnvironment flinkStreamEnv;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(masterUrl)) {
+      setManagedMemoryByFraction(flinkConfiguration);
+      disableClassLoaderLeakCheck(flinkConfiguration);
+      flinkStreamEnv =
+          StreamExecutionEnvironment.createLocalEnvironment(
+              getDefaultLocalParallelism(), flinkConfiguration);
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is only supported in RemoteStreamEnvironment");
+      }
+    } else if ("[auto]".equals(masterUrl)) {
+
+      flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfiguration);
+      if (flinkStreamEnv instanceof LocalStreamEnvironment) {
+        disableClassLoaderLeakCheck(flinkConfiguration);
+        flinkStreamEnv =
+            StreamExecutionEnvironment.createLocalEnvironment(
+                getDefaultLocalParallelism(), flinkConfiguration);
+      }
+      if (!options.getAttachedMode()) {
+        LOG.warn("Detached mode is not only supported in [auto]");
+      }
+    } else {
+      int defaultPort = flinkConfiguration.get(RestOptions.PORT);
+      HostAndPort hostAndPort = 
HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort);
+      flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort());
+      final SavepointRestoreSettings savepointRestoreSettings;
+      if (options.getSavepointPath() != null) {
+        savepointRestoreSettings =
+            SavepointRestoreSettings.forPath(
+                options.getSavepointPath(), 
options.getAllowNonRestoredState());
+      } else {
+        savepointRestoreSettings = SavepointRestoreSettings.none();
+      }
+      if (!options.getAttachedMode()) {
+        flinkConfiguration.set(DeploymentOptions.ATTACHED, 
options.getAttachedMode());
+      }
+      flinkStreamEnv =
+          new RemoteStreamEnvironment(
+              hostAndPort.getHost(),
+              hostAndPort.getPort(),
+              flinkConfiguration,
+              filesToStage.toArray(new String[filesToStage.size()]),
+              null,
+              savepointRestoreSettings);
+      LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), 
hostAndPort.getPort());
+    }
+
+    // Set the parallelism, required by UnboundedSourceWrapper to generate 
consistent splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkStreamEnv.getParallelism(), 
flinkConfiguration);
+    flinkStreamEnv.setParallelism(parallelism);
+    if (options.getMaxParallelism() > 0) {
+      flinkStreamEnv.setMaxParallelism(options.getMaxParallelism());
+    } else if (!options.isStreaming()) {
+      // In Flink maxParallelism defines the number of keyGroups.
+      // (see
+      // 
https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76)
+      // The default value (parallelism * 1.5)
+      // (see
+      // 
https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L137-L147)
+      // create a lot of skew so we force maxParallelism = parallelism in 
Batch mode.
+      LOG.info("Setting maxParallelism to {}", parallelism);
+      flinkStreamEnv.setMaxParallelism(parallelism);
+    }
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(parallelism);
+
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
+    if (!options.getOperatorChaining()) {
+      flinkStreamEnv.disableOperatorChaining();
+    }
+
+    configureCheckpointing(options, flinkStreamEnv);
+
+    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
+
+    if (options.getAutoWatermarkInterval() != null) {
+      
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
+    }
+    configureWebUIOptions(flinkStreamEnv.getConfig(), 
options.as(PipelineOptions.class));
+    configureCustomKryoSerializers(flinkStreamEnv.getConfig());
+
+    return flinkStreamEnv;
+  }
+
+  private static void configureWebUIOptions(
+      ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions 
options) {
+    SerializablePipelineOptions serializablePipelineOptions =
+        new SerializablePipelineOptions(options);
+    String optionsAsString = serializablePipelineOptions.toString();
+
+    try {
+      JsonNode node = mapper.readTree(optionsAsString);
+      JsonNode optionsNode = node.get("options");
+      Map<String, String> output =
+          Streams.stream(optionsNode.fields())
+              .filter(entry -> !entry.getValue().isNull())
+              .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue().asText()));
+
+      config.setGlobalJobParameters(new GlobalJobParametersImpl(output));

Review Comment:
   Same — verbatim copy from runners/flink/2.0 with only an import update. The 
NPE is already caught by the surrounding catch (Exception e). Hence not 
addressing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to