This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1c182507444 Add support for Apache Flink 2.1.3 (#38961)
1c182507444 is described below
commit 1c182507444b5d5cb5e376d5a2fd9db1afd5dab0
Author: ddebowczyk92 <[email protected]>
AuthorDate: Wed Jun 17 20:21:19 2026 +0200
Add support for Apache Flink 2.1.3 (#38961)
* [runners-flink] Add support for Apache Flink 2.1.3
* Improve lz4-java dependency resolution strategy
Explicitly prefer Flink's at.yawk.lz4:lz4-java over org.lz4:lz4-java
to ensure the Flink-compatible version is always selected, regardless
of version numbers.
This is more robust than selectHighestVersion() which could
theoretically select org.lz4 if it had a higher version number.
* Fix lz4-java capability conflict for examples:java flinkRunnerPreCommit
Flink 2.1.3 uses at.yawk.lz4:lz4-java:1.10.3 while Kafka uses
org.lz4:lz4-java:1.6.0, causing capability conflicts in configurations
that depend on both (like examples:java flinkRunnerPreCommit).
Add capability resolution strategy to select the highest version,
consistent with the approach in runners/flink/2.1/build.gradle.
* Remove unnecessary test resources for Flink 2.1
Flink 2.1 inherits test resources from parent versions via Beam's
resource layering mechanism. No version-specific config needed.
---
.../test-properties.json | 2 +-
.../run_rc_validation_java_quickstart.yml | 2 +-
.test-infra/validate-runner/build.gradle | 13 +++++
examples/java/common.gradle | 10 ++++
gradle.properties | 2 +-
runners/flink/2.1/build.gradle | 56 ++++++++++++++++++++++
.../flink/2.1/job-server-container/build.gradle | 26 ++++++++++
runners/flink/2.1/job-server/build.gradle | 44 +++++++++++++++++
sdks/go/examples/wasm/README.md | 6 +--
sdks/go/test/build.gradle | 2 +-
.../python/apache_beam/options/pipeline_options.py | 2 +-
sdks/typescript/src/apache_beam/runners/flink.ts | 2 +-
12 files changed, 158 insertions(+), 9 deletions(-)
diff --git a/.github/actions/setup-default-test-properties/test-properties.json
b/.github/actions/setup-default-test-properties/test-properties.json
index f06de5174e6..7a3f9890a29 100644
--- a/.github/actions/setup-default-test-properties/test-properties.json
+++ b/.github/actions/setup-default-test-properties/test-properties.json
@@ -14,7 +14,7 @@
},
"JavaTestProperties": {
"SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"],
- "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"],
+ "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"],
"SPARK_VERSIONS": ["3"]
},
"GoTestProperties": {
diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml
b/.github/workflows/run_rc_validation_java_quickstart.yml
index dce9b7f3fed..41a6991d14e 100644
--- a/.github/workflows/run_rc_validation_java_quickstart.yml
+++ b/.github/workflows/run_rc_validation_java_quickstart.yml
@@ -88,7 +88,7 @@ jobs:
- name: Run QuickStart Java Flink Runner
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal
+ gradle-command: :runners:flink:2.1:runQuickstartJavaFlinkLocal
arguments: |
-Prepourl=${{ env.APACHE_REPO_URL }} \
-Pver=${{ env.RELEASE_VERSION }}
diff --git a/.test-infra/validate-runner/build.gradle
b/.test-infra/validate-runner/build.gradle
index 1817d7014a6..3992abb24dd 100644
--- a/.test-infra/validate-runner/build.gradle
+++ b/.test-infra/validate-runner/build.gradle
@@ -31,6 +31,19 @@ repositories {
}
}
+// Flink 2.1+ uses at.yawk.lz4:lz4-java while Spark uses org.lz4:lz4-java
+// Resolve capability conflict by preferring Flink's version when both are
present
+configurations.all {
+ resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java')
{
+ def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4')
}
+ if (candidate != null) {
+ select(candidate)
+ } else {
+ selectHighestVersion()
+ }
+ }
+}
+
dependencies {
implementation 'com.offbytwo.jenkins:jenkins-client:0.3.8'
implementation library.java.jackson_databind
diff --git a/examples/java/common.gradle b/examples/java/common.gradle
index 10ea43628bc..0e800e7cf98 100644
--- a/examples/java/common.gradle
+++ b/examples/java/common.gradle
@@ -35,6 +35,16 @@ configurations.sparkRunnerPreCommit {
exclude group: "org.slf4j", module: "jul-to-slf4j"
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
+configurations.flinkRunnerPreCommit {
+ resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java")
{
+ def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4')
}
+ if (candidate != null) {
+ select(candidate)
+ } else {
+ selectHighestVersion()
+ }
+ }
+}
dependencies {
directRunnerPreCommit project(path: ":runners:direct-java", configuration:
"shadow")
diff --git a/gradle.properties b/gradle.properties
index 95e50105a49..0289d02722b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,7 +39,7 @@ docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_
# supported flink versions
-flink_versions=1.17,1.18,1.19,1.20,2.0
+flink_versions=1.17,1.18,1.19,1.20,2.0,2.1
# supported spark versions
spark_versions=3,4
# supported python versions
diff --git a/runners/flink/2.1/build.gradle b/runners/flink/2.1/build.gradle
new file mode 100644
index 00000000000..e9092c2977f
--- /dev/null
+++ b/runners/flink/2.1/build.gradle
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+project.ext {
+ flink_major = '2.1'
+ flink_version = '2.1.3'
+ excluded_files = [
+ 'main': [
+ // Used by DataSet API only
+ "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java",
+ "org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java",
+
"org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java",
+ "org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java",
+
"org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java",
+ // Moved to org.apache.flink.runtime.state.StateBackendFactory
+ "org/apache/beam/runners/flink/FlinkStateBackendFactory.java",
+ ],
+ 'test': [
+ // Used by DataSet API only
+
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java",
+ "org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java",
+ "org/apache/beam/runners/flink/batch/ReshuffleTest.java",
+ ]
+ ]
+}
+
+// Load the main build script which contains all build logic.
+apply from: "../flink_runner.gradle"
+
+// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
+// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
+configurations.all {
+ resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java')
{
+ def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4')
}
+ if (candidate != null) {
+ select(candidate)
+ } else {
+ selectHighestVersion()
+ }
+ }
+}
diff --git a/runners/flink/2.1/job-server-container/build.gradle
b/runners/flink/2.1/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/2.1/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+ resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/2.1/job-server/build.gradle
b/runners/flink/2.1/job-server/build.gradle
new file mode 100644
index 00000000000..277ddc07fda
--- /dev/null
+++ b/runners/flink/2.1/job-server/build.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+ // Look for the source code in the parent module
+ main_source_dirs = ["$basePath/src/main/java"]
+ test_source_dirs = ["$basePath/src/test/java"]
+ main_resources_dirs = ["$basePath/src/main/resources"]
+ test_resources_dirs = ["$basePath/src/test/resources"]
+ archives_base_name = 'beam-runners-flink-2.1-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
+
+// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
+// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
+configurations.all {
+ resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java')
{
+ def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4')
}
+ if (candidate != null) {
+ select(candidate)
+ } else {
+ selectHighestVersion()
+ }
+ }
+}
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
index e4ab54d4a3e..30fd22f624b 100644
--- a/sdks/go/examples/wasm/README.md
+++ b/sdks/go/examples/wasm/README.md
@@ -68,13 +68,13 @@ cd $BEAM_HOME
Expected output should include the following, from which you acquire the
latest flink runner version.
```shell
-'flink_versions: 1.17,1.18,1.19,1.20'
+'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1'
```
-#### 2. Set to the latest flink runner version i.e. 1.16
+#### 2. Set to the latest flink runner version i.e. 2.1
```shell
-FLINK_VERSION=1.16
+FLINK_VERSION=2.1
```
#### 3. In a separate terminal, start the flink runner (It should take a few
minutes on the first execution)
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index 8fcb0916614..3437ba12f2c 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -92,7 +92,7 @@ task flinkValidatesRunner {
doFirst {
// Copy Flink conf file
copy {
- from
"${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
+ from
"${project.rootDir}/runners/flink/${flinkVersion}/src/test/resources/flink-test-config.yaml"
into "${project.buildDir}/flink-conf"
// Rename the file during the copy process
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index c813939d53f..978a79bf617 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -2106,7 +2106,7 @@ class JobServerOptions(PipelineOptions):
class FlinkRunnerOptions(PipelineOptions):
# These should stay in sync with gradle.properties.
- PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0']
+ PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1']
@classmethod
def _add_argparse_args(cls, parser):
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts
b/sdks/typescript/src/apache_beam/runners/flink.ts
index 8f80b971da2..c8f8f57eb08 100644
--- a/sdks/typescript/src/apache_beam/runners/flink.ts
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service";
const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
// These should stay in sync with gradle.properties.
-const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"];
+const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0",
"2.1"];
const defaultOptions = {
flinkMaster: "[local]",