mananmangal commented on code in PR #38980: URL: https://github.com/apache/beam/pull/38980#discussion_r3423360711
########## runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java: ########## @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + private static final ObjectMapper mapper = new ObjectMapper(); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending on the + * user-specified options. + */ + public static StreamExecutionEnvironment createBatchExecutionEnvironment( + FlinkPipelineOptions options) { + return createBatchExecutionEnvironment( + options, + MoreObjects.firstNonNull(options.getFilesToStage(), Collections.emptyList()), + options.getFlinkConfDir()); + } + + static StreamExecutionEnvironment createBatchExecutionEnvironment( + FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) { + + LOG.info("Creating a Batch Execution Environment."); + + // Although Flink uses Rest, it expects the address not to contain a http scheme + String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir); + StreamExecutionEnvironment flinkBatchEnv; + + // depending on the master, create the right environment. + if ("[local]".equals(flinkMasterHostPort)) { + setManagedMemoryByFraction(flinkConfiguration); + disableClassLoaderLeakCheck(flinkConfiguration); + flinkBatchEnv = StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration); + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is only supported in RemoteStreamEnvironment"); + } + } else if ("[collection]".equals(flinkMasterHostPort)) { + throw new UnsupportedOperationException( + "CollectionEnvironment has been removed in Flink 2. Use [local] instead."); + } else if ("[auto]".equals(flinkMasterHostPort)) { + flinkBatchEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + if (flinkBatchEnv instanceof LocalStreamEnvironment) { + disableClassLoaderLeakCheck(flinkConfiguration); + flinkBatchEnv = StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration); + flinkBatchEnv.setParallelism(getDefaultLocalParallelism()); + } + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is not supported in [auto]."); + } + } else { + int defaultPort = flinkConfiguration.get(RestOptions.PORT); + HostAndPort hostAndPort = + HostAndPort.fromString(flinkMasterHostPort).withDefaultPort(defaultPort); + flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort()); + if (!options.getAttachedMode()) { + flinkConfiguration.set(DeploymentOptions.ATTACHED, options.getAttachedMode()); + } + flinkBatchEnv = + StreamExecutionEnvironment.createRemoteEnvironment( + hostAndPort.getHost(), + hostAndPort.getPort(), + flinkConfiguration, + filesToStage.toArray(new String[filesToStage.size()])); + LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort()); + } + + // Set the execution mode for data exchange. + flinkBatchEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); + + // set the correct parallelism. + if (options.getParallelism() != -1) { + flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent + // splits. + final int parallelism = + determineParallelism( + options.getParallelism(), flinkBatchEnv.getParallelism(), flinkConfiguration); + + flinkBatchEnv.setParallelism(parallelism); + // set parallelism in the options (required by some execution code) + options.setParallelism(parallelism); + + if (options.getObjectReuse()) { + flinkBatchEnv.getConfig().enableObjectReuse(); + } else { + flinkBatchEnv.getConfig().disableObjectReuse(); + } + + applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options); + + configureWebUIOptions(flinkBatchEnv.getConfig(), options.as(PipelineOptions.class)); + + return flinkBatchEnv; + } + + @VisibleForTesting + static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options) { + return createStreamExecutionEnvironment( + options, + MoreObjects.firstNonNull(options.getFilesToStage(), Collections.emptyList()), + options.getFlinkConfDir()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate Flink {@link + * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending on the + * user-specified options. + */ + public static StreamExecutionEnvironment createStreamExecutionEnvironment( + FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir) { + + LOG.info("Creating a Streaming Environment."); + + // Although Flink uses Rest, it expects the address not to contain a http scheme + String masterUrl = stripHttpSchema(options.getFlinkMaster()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir); + configureRestartStrategy(options, flinkConfiguration); + configureStateBackend(options, flinkConfiguration); + StreamExecutionEnvironment flinkStreamEnv; + + // depending on the master, create the right environment. + if ("[local]".equals(masterUrl)) { + setManagedMemoryByFraction(flinkConfiguration); + disableClassLoaderLeakCheck(flinkConfiguration); + flinkStreamEnv = + StreamExecutionEnvironment.createLocalEnvironment( + getDefaultLocalParallelism(), flinkConfiguration); + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is only supported in RemoteStreamEnvironment"); + } + } else if ("[auto]".equals(masterUrl)) { + + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfiguration); + if (flinkStreamEnv instanceof LocalStreamEnvironment) { + disableClassLoaderLeakCheck(flinkConfiguration); + flinkStreamEnv = + StreamExecutionEnvironment.createLocalEnvironment( + getDefaultLocalParallelism(), flinkConfiguration); + } + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is not only supported in [auto]"); + } + } else { + int defaultPort = flinkConfiguration.get(RestOptions.PORT); + HostAndPort hostAndPort = HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort); + flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort()); + final SavepointRestoreSettings savepointRestoreSettings; + if (options.getSavepointPath() != null) { + savepointRestoreSettings = + SavepointRestoreSettings.forPath( + options.getSavepointPath(), options.getAllowNonRestoredState()); + } else { + savepointRestoreSettings = SavepointRestoreSettings.none(); + } + if (!options.getAttachedMode()) { + flinkConfiguration.set(DeploymentOptions.ATTACHED, options.getAttachedMode()); + } + flinkStreamEnv = + new RemoteStreamEnvironment( + hostAndPort.getHost(), + hostAndPort.getPort(), + flinkConfiguration, + filesToStage.toArray(new String[filesToStage.size()]), + null, + savepointRestoreSettings); + LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort()); + } + + // Set the parallelism, required by UnboundedSourceWrapper to generate consistent splits. + final int parallelism = + determineParallelism( + options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfiguration); + flinkStreamEnv.setParallelism(parallelism); + if (options.getMaxParallelism() > 0) { + flinkStreamEnv.setMaxParallelism(options.getMaxParallelism()); + } else if (!options.isStreaming()) { + // In Flink maxParallelism defines the number of keyGroups. + // (see + // https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76) + // The default value (parallelism * 1.5) + // (see + // https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L137-L147) + // create a lot of skew so we force maxParallelism = parallelism in Batch mode. + LOG.info("Setting maxParallelism to {}", parallelism); + flinkStreamEnv.setMaxParallelism(parallelism); + } + // set parallelism in the options (required by some execution code) + options.setParallelism(parallelism); + + if (options.getObjectReuse()) { + flinkStreamEnv.getConfig().enableObjectReuse(); + } else { + flinkStreamEnv.getConfig().disableObjectReuse(); + } + + if (!options.getOperatorChaining()) { + flinkStreamEnv.disableOperatorChaining(); + } + + configureCheckpointing(options, flinkStreamEnv); + + applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options); + + if (options.getAutoWatermarkInterval() != null) { + flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval()); + } + configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); + + return flinkStreamEnv; + } + + private static void configureWebUIOptions( + ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions options) { + SerializablePipelineOptions serializablePipelineOptions = + new SerializablePipelineOptions(options); + String optionsAsString = serializablePipelineOptions.toString(); + + try { + JsonNode node = mapper.readTree(optionsAsString); + JsonNode optionsNode = node.get("options"); + Map<String, String> output = + Streams.stream(optionsNode.fields()) + .filter(entry -> !entry.getValue().isNull()) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asText())); + + config.setGlobalJobParameters(new GlobalJobParametersImpl(output)); Review Comment: Same — verbatim copy from runners/flink/2.0 with only an import update. The NPE is already caught by the surrounding catch (Exception e). Hence not addressing here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
