This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.56.0 by this push:
     new 50b73de3336 [runners-flink] remove 1.12 and 1.13 runners (#31020)
50b73de3336 is described below

commit 50b73de333614888db174140f72371db4cdbdbe5
Author: Jan Lukavský <je...@seznam.cz>
AuthorDate: Wed Apr 17 21:56:06 2024 +0200

    [runners-flink] remove 1.12 and 1.13 runners (#31020)
---
 gradle.properties                                  |  2 +-
 release/build.gradle.kts                           |  2 +-
 runners/flink/1.12/build.gradle                    | 25 ------
 .../flink/1.12/job-server-container/build.gradle   | 26 ------
 runners/flink/1.12/job-server/build.gradle         | 31 -------
 .../streaming/AbstractStreamOperatorCompat.java    | 94 ----------------------
 .../io/source/compat/FlinkSourceCompat.java        | 31 -------
 .../io/source/compat/SplitEnumeratorCompat.java    | 27 -------
 .../beam/runners/flink/RemoteMiniClusterImpl.java  | 68 ----------------
 .../runners/flink/metrics/MetricGroupWrapper.java  | 31 -------
 .../runners/flink/streaming/StreamSources.java     | 72 -----------------
 .../streaming/io/source/SourceTestCompat.java      | 75 -----------------
 runners/flink/1.13/build.gradle                    | 25 ------
 .../flink/1.13/job-server-container/build.gradle   | 26 ------
 runners/flink/1.13/job-server/build.gradle         | 31 -------
 .../translation/types/CoderTypeSerializer.java     |  0
 .../streaming/ProcessingTimeCallbackCompat.java    |  0
 .../io/source/compat/SplitEnumeratorCompat.java    |  0
 .../streaming/io/source/compat/package-info.java   |  0
 .../beam/runners/flink/MiniClusterCompat.java      |  0
 .../translation/types/CoderTypeSerializerTest.java |  0
 runners/flink/flink_runner.gradle                  | 17 ++--
 .../streaming/io/source/FlinkSourceReaderBase.java |  2 -
 sdks/go/examples/stringsplit/stringsplit.go        |  2 +-
 sdks/java/testing/nexmark/build.gradle             |  2 +-
 sdks/java/testing/tpcds/build.gradle               |  2 +-
 settings.gradle.kts                                |  8 --
 .../content/en/documentation/dsls/sql/shell.md     |  6 +-
 .../shortcodes/flink_java_pipeline_options.html    |  8 +-
 .../shortcodes/flink_python_pipeline_options.html  |  8 +-
 30 files changed, 22 insertions(+), 599 deletions(-)

diff --git a/gradle.properties b/gradle.properties
index 5b07dd6c2b4..8fd8e9b4742 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
 docker_image_default_repo_prefix=beam_
 
 # supported flink versions
-flink_versions=1.12,1.13,1.14,1.15,1.16,1.17
+flink_versions=1.14,1.15,1.16,1.17
 # supported python versions
 python_versions=3.8,3.9,3.10,3.11
diff --git a/release/build.gradle.kts b/release/build.gradle.kts
index abb34d8605a..4e4586f666f 100644
--- a/release/build.gradle.kts
+++ b/release/build.gradle.kts
@@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") {
   dependsOn(":runners:direct-java:runQuickstartJavaDirect")
   dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow")
   dependsOn(":runners:spark:3:runQuickstartJavaSpark")
-  dependsOn(":runners:flink:1.13:runQuickstartJavaFlinkLocal")
+  dependsOn(":runners:flink:1.17:runQuickstartJavaFlinkLocal")
   dependsOn(":runners:direct-java:runMobileGamingJavaDirect")
   dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow")
   dependsOn(":runners:twister2:runQuickstartJavaTwister2")
diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle
deleted file mode 100644
index 2acee16c6e8..00000000000
--- a/runners/flink/1.12/build.gradle
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-project.ext {
-  flink_major = '1.12'
-  flink_version = '1.12.7'
-}
-
-// Load the main build script which contains all build logic.
-apply from: "../flink_runner.gradle"
diff --git a/runners/flink/1.12/job-server-container/build.gradle 
b/runners/flink/1.12/job-server-container/build.gradle
deleted file mode 100644
index afdb68a0fc9..00000000000
--- a/runners/flink/1.12/job-server-container/build.gradle
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-def basePath = '../../job-server-container'
-
-project.ext {
-  resource_path = basePath
-}
-
-// Load the main build script which contains all build logic.
-apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.12/job-server/build.gradle 
b/runners/flink/1.12/job-server/build.gradle
deleted file mode 100644
index 890082042c0..00000000000
--- a/runners/flink/1.12/job-server/build.gradle
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-def basePath = '../../job-server'
-
-project.ext {
-  // Look for the source code in the parent module
-  main_source_dirs = ["$basePath/src/main/java"]
-  test_source_dirs = ["$basePath/src/test/java"]
-  main_resources_dirs = ["$basePath/src/main/resources"]
-  test_resources_dirs = ["$basePath/src/test/resources"]
-  archives_base_name = 'beam-runners-flink-1.12-job-server'
-}
-
-// Load the main build script which contains all build logic.
-apply from: "$basePath/flink_job_server.gradle"
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java
deleted file mode 100644
index 5072e6b2459..00000000000
--- 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
-import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
-
-/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
-public abstract class AbstractStreamOperatorCompat<OutputT>
-    extends AbstractStreamOperator<OutputT> {
-
-  /**
-   * Getter for timeServiceManager, which has been made private in Flink 1.11.
-   *
-   * @return Time service manager.
-   */
-  protected InternalTimeServiceManager<?> getTimeServiceManagerCompat() {
-    return getTimeServiceManager()
-        .orElseThrow(() -> new IllegalStateException("Time service manager is 
not set."));
-  }
-
-  /**
-   * This call has been removed from {@link AbstractStreamOperator} in Flink 
1.12.
-   *
-   * <p>{@link InternalTimeServiceManagerImpl#numProcessingTimeTimers()}
-   */
-  protected int numProcessingTimeTimers() {
-    return getTimeServiceManager()
-        .map(
-            manager -> {
-              InternalTimeServiceManager<?> tsm = 
getTimeServiceManagerCompat();
-              if (tsm instanceof InternalTimeServiceManagerImpl) {
-                final InternalTimeServiceManagerImpl<?> cast =
-                    (InternalTimeServiceManagerImpl<?>) 
getTimeServiceManagerCompat();
-                return cast.numProcessingTimeTimers();
-              } else if (tsm instanceof 
BatchExecutionInternalTimeServiceManager) {
-                return 0;
-              } else {
-                throw new IllegalStateException(
-                    String.format(
-                        "Unknown implementation of 
InternalTimerServiceManager. %s", tsm));
-              }
-            })
-        .orElse(0);
-  }
-
-  /** Release all of the operator's resources. */
-  abstract void cleanUp() throws Exception;
-
-  /** Flush all remaining buffered data. */
-  abstract void flushData() throws Exception;
-
-  // Prior to Flink 1.14, dispose() releases the operator's resources, while 
close() flushes
-  // remaining data and then releases the operator's resources.
-  // https://issues.apache.org/jira/browse/FLINK-22972
-
-  @Override
-  public void dispose() throws Exception {
-    try {
-      cleanUp();
-    } finally {
-      // This releases all task's resources. We need to call this last
-      // to ensure that state, timers, or output buffers can still be
-      // accessed during finishing the bundle.
-      super.dispose();
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    try {
-      flushData();
-    } finally {
-      super.close();
-    }
-  }
-}
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
deleted file mode 100644
index 2f9e69fe4f7..00000000000
--- 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
-
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-
-public class FlinkSourceCompat {
-
-  public static Counter getNumRecordsInCounter(SourceReaderContext context) {
-    return ((OperatorMetricGroup) context.metricGroup())
-        .getIOMetricGroup()
-        .getNumRecordsInCounter();
-  }
-}
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
deleted file mode 100644
index d6bed940470..00000000000
--- 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
-
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-
-public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
-    extends SplitEnumerator<SplitT, CheckpointT> {
-
-  CheckpointT snapshotState(long checkpointId) throws Exception;
-}
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
deleted file mode 100644
index 5d8bebd6dff..00000000000
--- 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
-
-/** A {@link MiniCluster} which allows remote connections for the end-to-end 
test. */
-public class RemoteMiniClusterImpl extends RemoteMiniCluster {
-
-  private String jobManagerBindAddress;
-  private int port;
-
-  public RemoteMiniClusterImpl(MiniClusterConfiguration 
miniClusterConfiguration) {
-    super(miniClusterConfiguration);
-    jobManagerBindAddress = 
miniClusterConfiguration.getJobManagerBindAddress();
-  }
-
-  @Override
-  protected RpcService createLocalRpcService(Configuration configuration) 
throws Exception {
-    // Enable remote connections to the mini cluster which are disabled by 
default
-    final RpcService rpcService =
-        AkkaRpcServiceUtils.remoteServiceBuilder(
-                configuration, jobManagerBindAddress, String.valueOf(0))
-            .withBindAddress(jobManagerBindAddress)
-            .withBindPort(0)
-            .withCustomConfig(AkkaUtils.testDispatcherConfig())
-            .createAndStart();
-    this.port = rpcService.getPort();
-
-    return rpcService;
-  }
-
-  @Override
-  public int getClusterPort() {
-    Preconditions.checkState(port > 0, "Port not yet initialized. Start the 
cluster first.");
-    return port;
-  }
-
-  @Override
-  public int getRestPort() {
-    try {
-      return getRestAddress().get().getPort();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java
deleted file mode 100644
index 797c09b0929..00000000000
--- 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/metrics/MetricGroupWrapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.metrics;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-
-/**
- * Compatibility layer for {@link MetricGroup} breaking changes made in Flink 
1.14
- * (https://issues.apache.org/jira/browse/FLINK-23652).
- */
-public interface MetricGroupWrapper extends MetricGroup {
-  static MetricGroup createUnregisteredMetricGroup() {
-    return new UnregisteredMetricsGroup();
-  }
-}
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
deleted file mode 100644
index ba5940653cd..00000000000
--- 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-/** {@link StreamSource} utilities, that bridge incompatibilities between 
Flink releases. */
-public class StreamSources {
-
-  public static <OutT, SrcT extends SourceFunction<OutT>> void run(
-      StreamSource<OutT, SrcT> streamSource,
-      Object lockingObject,
-      Output<StreamRecord<OutT>> collector)
-      throws Exception {
-    streamSource.run(
-        lockingObject,
-        new TestStreamStatusMaintainer(),
-        collector,
-        createOperatorChain(streamSource));
-  }
-
-  private static OperatorChain<?, ?> 
createOperatorChain(AbstractStreamOperator<?> operator) {
-    return new OperatorChain<>(
-        operator.getContainingTask(),
-        StreamTask.createRecordWriterDelegate(
-            operator.getOperatorConfig(), new 
MockEnvironmentBuilder().build()));
-  }
-
-  /** StreamStatusMaintainer was removed in Flink 1.14. */
-  private static final class TestStreamStatusMaintainer implements 
StreamStatusMaintainer {
-    StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
-
-    @Override
-    public void toggleStreamStatus(StreamStatus streamStatus) {
-      if (!currentStreamStatus.equals(streamStatus)) {
-        currentStreamStatus = streamStatus;
-      }
-    }
-
-    @Override
-    public StreamStatus getStreamStatus() {
-      return currentStreamStatus;
-    }
-  }
-
-  /** The emitWatermarkStatus method was added in Flink 1.14, so we need to 
wrap Output. */
-  public interface OutputWrapper<T> extends Output<T> {}
-}
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
deleted file mode 100644
index 0b9ca07f99a..00000000000
--- 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-
-public class SourceTestCompat {
-
-  /** A MetricGroup implementation which records the registered gauge. */
-  public static class TestMetricGroup
-      extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
-    public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
-    public final Map<String, Counter> registeredCounter = new HashMap<>();
-    public final Counter numRecordsInCounter = new SimpleCounter();
-
-    @Override
-    public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT 
gauge) {
-      registeredGauge.put(name, gauge);
-      return gauge;
-    }
-
-    @Override
-    public Counter counter(String name) {
-      // The OperatorIOMetricsGroup will register some IO metrics in the 
constructor.
-      // At that time, the construction of this class has not finihsed yet, so 
we
-      // need to delegate the call to the parent class.
-      if (registeredCounter != null) {
-        return registeredCounter.computeIfAbsent(name, ignored -> 
super.counter(name));
-      } else {
-        return super.counter(name);
-      }
-    }
-
-    @Override
-    public OperatorIOMetricGroup getIOMetricGroup() {
-      return new OperatorIOMetricGroup(this) {
-        @Override
-        public Counter getNumRecordsInCounter() {
-          return numRecordsInCounter;
-        }
-      };
-    }
-  }
-
-  public interface ReaderOutputCompat<T> extends ReaderOutput<T> {
-    void markActive();
-  }
-
-  public interface SourceOutputCompat<T> extends SourceOutput<T> {
-    void markActive();
-  }
-}
diff --git a/runners/flink/1.13/build.gradle b/runners/flink/1.13/build.gradle
deleted file mode 100644
index 1aa19e6f071..00000000000
--- a/runners/flink/1.13/build.gradle
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-project.ext {
-  flink_major = '1.13'
-  flink_version = '1.13.5'
-}
-
-// Load the main build script which contains all build logic.
-apply from: "../flink_runner.gradle"
diff --git a/runners/flink/1.13/job-server-container/build.gradle 
b/runners/flink/1.13/job-server-container/build.gradle
deleted file mode 100644
index afdb68a0fc9..00000000000
--- a/runners/flink/1.13/job-server-container/build.gradle
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-def basePath = '../../job-server-container'
-
-project.ext {
-  resource_path = basePath
-}
-
-// Load the main build script which contains all build logic.
-apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.13/job-server/build.gradle 
b/runners/flink/1.13/job-server/build.gradle
deleted file mode 100644
index a7e6fd6eb59..00000000000
--- a/runners/flink/1.13/job-server/build.gradle
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-def basePath = '../../job-server'
-
-project.ext {
-  // Look for the source code in the parent module
-  main_source_dirs = ["$basePath/src/main/java"]
-  test_source_dirs = ["$basePath/src/test/java"]
-  main_resources_dirs = ["$basePath/src/main/resources"]
-  test_resources_dirs = ["$basePath/src/test/resources"]
-  archives_base_name = 'beam-runners-flink-1.13-job-server'
-}
-
-// Load the main build script which contains all build logic.
-apply from: "$basePath/flink_job_server.gradle"
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
similarity index 100%
rename from 
runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
rename to 
runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
similarity index 100%
rename from 
runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
rename to 
runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ProcessingTimeCallbackCompat.java
diff --git 
a/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
similarity index 100%
rename from 
runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
rename to 
runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
similarity index 100%
rename from 
runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
rename to 
runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
 
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
similarity index 100%
rename from 
runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
rename to 
runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/MiniClusterCompat.java
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
 
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
similarity index 100%
rename from 
runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
rename to 
runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index f786bcf8d88..6849334aea0 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -31,7 +31,7 @@ def overrides(versions, type, base_path) {
   versions.collect { "${base_path}/${it}/src/${type}/java" } + 
["./src/${type}/java"]
 }
 
-def all_versions = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17']
+def all_versions = flink_versions.split(",")
 
 def previous_versions = all_versions.findAll { it < flink_major }
 
@@ -208,15 +208,10 @@ dependencies {
   implementation "org.apache.flink:flink-metrics-core:$flink_version"
   implementation "org.apache.flink:flink-java:$flink_version"
 
-  if (flink_version.compareTo("1.14") >= 0) {
-    implementation "org.apache.flink:flink-runtime:$flink_version"
-    implementation "org.apache.flink:flink-metrics-core:$flink_version"
-    testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
-    testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
-  } else {
-    implementation "org.apache.flink:flink-runtime_2.12:$flink_version"
-    testImplementation 
"org.apache.flink:flink-runtime_2.12:$flink_version:tests"
-  }
+  implementation "org.apache.flink:flink-runtime:$flink_version"
+  implementation "org.apache.flink:flink-metrics-core:$flink_version"
+  testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
+  testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   // FlinkStateInternalsTest extends abstract StateInternalsTest
   testImplementation project(path: ":runners:core-java", configuration: 
"testRuntimeMigration")
@@ -378,7 +373,7 @@ tasks.register("validatesRunnerSickbay", Test) {
   }
 }
 
-// Generates :runners:flink:1.13:runQuickstartJavaFlinkLocal
+// Generates :runners:flink:1.17:runQuickstartJavaFlinkLocal
 createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 
'FlinkLocal')
 
 tasks.register("examplesIntegrationTest", Test) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
index 288f1f9511b..1f4f31f90eb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
@@ -124,8 +124,6 @@ public abstract class FlinkSourceReaderBase<T, OutputT>
         
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
     this.idleTimeoutFuture = new CompletableFuture<>();
     this.idleTimeoutCountingDown = false;
-    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
-    // upgraded to 1.14 and above.
     this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
     FlinkMetricContainerWithoutAccumulator metricsContainer =
         new FlinkMetricContainerWithoutAccumulator(context.metricGroup());
diff --git a/sdks/go/examples/stringsplit/stringsplit.go 
b/sdks/go/examples/stringsplit/stringsplit.go
index 3698bd3cc2b..28d79815607 100644
--- a/sdks/go/examples/stringsplit/stringsplit.go
+++ b/sdks/go/examples/stringsplit/stringsplit.go
@@ -21,7 +21,7 @@
 // 1. From a command line, navigate to the top-level beam/ directory and run
 // the Flink job server:
 //
-//     ./gradlew :runners:flink:1.13:job-server:runShadow -Djob-host=localhost 
-Dflink-master=local
+//     ./gradlew :runners:flink:1.17:job-server:runShadow -Djob-host=localhost 
-Dflink-master=local
 //
 // 2. The job server is ready to receive jobs once it outputs a log like the
 // following: `JobService started on localhost:8099`. Take note of the endpoint
diff --git a/sdks/java/testing/nexmark/build.gradle 
b/sdks/java/testing/nexmark/build.gradle
index 942dfb4c1e9..b9e6a0d2266 100644
--- a/sdks/java/testing/nexmark/build.gradle
+++ b/sdks/java/testing/nexmark/build.gradle
@@ -144,7 +144,7 @@ def getNexmarkArgs = {
 //
 // Parameters:
 //   -Pnexmark.runner
-//       Specify a runner subproject, such as ":runners:spark:3" or 
":runners:flink:1.13"
+//       Specify a runner subproject, such as ":runners:spark:3" or 
":runners:flink:1.17"
 //       Defaults to ":runners:direct-java"
 //
 //   -Pnexmark.args
diff --git a/sdks/java/testing/tpcds/build.gradle 
b/sdks/java/testing/tpcds/build.gradle
index 387dda89824..da4a317d320 100644
--- a/sdks/java/testing/tpcds/build.gradle
+++ b/sdks/java/testing/tpcds/build.gradle
@@ -94,7 +94,7 @@ if (isSpark) {
 //
 // Parameters:
 //   -Ptpcds.runner
-//       Specify a runner subproject, such as ":runners:spark:3" or 
":runners:flink:1.13"
+//       Specify a runner subproject, such as ":runners:spark:3" or 
":runners:flink:1.17"
 //       Defaults to ":runners:direct-java"
 //
 //   -Ptpcds.args
diff --git a/settings.gradle.kts b/settings.gradle.kts
index c2dd9bac1ba..8c86a8a5e55 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -121,14 +121,6 @@ include(":runners:core-java")
 include(":runners:direct-java")
 include(":runners:extensions-java:metrics")
 /* Begin Flink Runner related settings */
-// Flink 1.12
-include(":runners:flink:1.12")
-include(":runners:flink:1.12:job-server")
-include(":runners:flink:1.12:job-server-container")
-// Flink 1.13
-include(":runners:flink:1.13")
-include(":runners:flink:1.13:job-server")
-include(":runners:flink:1.13:job-server-container")
 // Flink 1.14
 include(":runners:flink:1.14")
 include(":runners:flink:1.14:job-server")
diff --git a/website/www/site/content/en/documentation/dsls/sql/shell.md 
b/website/www/site/content/en/documentation/dsls/sql/shell.md
index 7c7b31bac49..3676bfa59f7 100644
--- a/website/www/site/content/en/documentation/dsls/sql/shell.md
+++ b/website/www/site/content/en/documentation/dsls/sql/shell.md
@@ -29,7 +29,7 @@ This page describes how to work with the shell, but does not 
focus on specific f
 To use Beam SQL shell, you must first clone the [Beam SDK 
repository](https://github.com/apache/beam). Then, from the root of the 
repository clone, execute the following commands to run the shell:
 
 ```
-./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist
+./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
 
 ./sdks/java/extensions/sql/shell/build/install/shell/bin/shell
 ```
@@ -117,7 +117,7 @@ By default, Beam uses the `DirectRunner` to run the 
pipeline on the machine wher
 1.  Make sure the SQL shell includes the desired runner. Add the corresponding 
project id to the `-Pbeam.sql.shell.bundled` parameter of the Gradle invocation 
([source 
code](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/shell/build.gradle),
 [project 
ids](https://github.com/apache/beam/blob/master/settings.gradle.kts)). For 
example, use the following command to include Flink runner and KafkaIO:
 
     ```
-    ./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' installDist
+    ./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
     ```
 
     _Note: You can bundle multiple runners (using a comma-separated list) or 
other additional components in the same manner. For example, you can add 
support for more I/Os._
@@ -143,7 +143,7 @@ To configure the runner, you must specify `PipelineOptions` 
by using the `SET` c
 You can also build your own standalone package for SQL shell using `distZip` 
or `distTar` tasks. For example:
 
 ```
-./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.13,:sdks:java:io:kafka' distZip
+./gradlew -p sdks/java/extensions/sql/shell 
-Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip
 
 ls ./sdks/java/extensions/sql/shell/build/distributions/
 beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar 
beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip
diff --git 
a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
index a6b7ed28f40..939c64ed9c4 100644
--- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
@@ -119,13 +119,13 @@ Should be called before running the tests.
 </tr>
 <tr>
   <td><code>maxBundleSize</code></td>
-  <td>The maximum number of elements in a bundle.</td>
-  <td>Default: <code>1000</code></td>
+  <td>The maximum number of elements in a bundle. Default values are 1000 for 
a streaming job and 1,000,000 for batch</td>
+  <td>Default: <code>MaxBundleSizeFactory</code></td>
 </tr>
 <tr>
   <td><code>maxBundleTimeMills</code></td>
-  <td>The maximum time to wait before finalising a bundle (in 
milliseconds).</td>
-  <td>Default: <code>1000</code></td>
+  <td>The maximum time to wait before finalising a bundle (in milliseconds). 
Default values are 1000 for streaming and 10,000 for batch.</td>
+  <td>Default: <code>MaxBundleTimeFactory</code></td>
 </tr>
 <tr>
   <td><code>maxParallelism</code></td>
diff --git 
a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html 
b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
index 494b01bc1d0..eb5c525d78b 100644
--- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
+++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html
@@ -119,13 +119,13 @@ Should be called before running the tests.
 </tr>
 <tr>
   <td><code>max_bundle_size</code></td>
-  <td>The maximum number of elements in a bundle.</td>
-  <td>Default: <code>1000</code></td>
+  <td>The maximum number of elements in a bundle. Default values are 1000 for 
a streaming job and 1,000,000 for batch</td>
+  <td>Default: <code>MaxBundleSizeFactory</code></td>
 </tr>
 <tr>
   <td><code>max_bundle_time_mills</code></td>
-  <td>The maximum time to wait before finalising a bundle (in 
milliseconds).</td>
-  <td>Default: <code>1000</code></td>
+  <td>The maximum time to wait before finalising a bundle (in milliseconds). 
Default values are 1000 for streaming and 10,000 for batch.</td>
+  <td>Default: <code>MaxBundleTimeFactory</code></td>
 </tr>
 <tr>
   <td><code>max_parallelism</code></td>


Reply via email to