This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c182507444 Add support for Apache Flink 2.1.3 (#38961)
1c182507444 is described below

commit 1c182507444b5d5cb5e376d5a2fd9db1afd5dab0
Author: ddebowczyk92 <[email protected]>
AuthorDate: Wed Jun 17 20:21:19 2026 +0200

    Add support for Apache Flink 2.1.3 (#38961)
    
    * [runners-flink] Add support for Apache Flink 2.1.3
    
    * Improve lz4-java dependency resolution strategy
    
    Explicitly prefer Flink's at.yawk.lz4:lz4-java over org.lz4:lz4-java
    to ensure the Flink-compatible version is always selected, regardless
    of version numbers.
    
    This is more robust than selectHighestVersion() which could
    theoretically select org.lz4 if it had a higher version number.
    
    * Fix lz4-java capability conflict for examples:java flinkRunnerPreCommit
    
    Flink 2.1.3 uses at.yawk.lz4:lz4-java:1.10.3 while Kafka uses
    org.lz4:lz4-java:1.6.0, causing capability conflicts in configurations
    that depend on both (like examples:java flinkRunnerPreCommit).
    
    Add capability resolution strategy to select the highest version,
    consistent with the approach in runners/flink/2.1/build.gradle.
    
    * Remove unnecessary test resources for Flink 2.1
    
    Flink 2.1 inherits test resources from parent versions via Beam's
    resource layering mechanism. No version-specific config needed.
---
 .../test-properties.json                           |  2 +-
 .../run_rc_validation_java_quickstart.yml          |  2 +-
 .test-infra/validate-runner/build.gradle           | 13 +++++
 examples/java/common.gradle                        | 10 ++++
 gradle.properties                                  |  2 +-
 runners/flink/2.1/build.gradle                     | 56 ++++++++++++++++++++++
 .../flink/2.1/job-server-container/build.gradle    | 26 ++++++++++
 runners/flink/2.1/job-server/build.gradle          | 44 +++++++++++++++++
 sdks/go/examples/wasm/README.md                    |  6 +--
 sdks/go/test/build.gradle                          |  2 +-
 .../python/apache_beam/options/pipeline_options.py |  2 +-
 sdks/typescript/src/apache_beam/runners/flink.ts   |  2 +-
 12 files changed, 158 insertions(+), 9 deletions(-)

diff --git a/.github/actions/setup-default-test-properties/test-properties.json 
b/.github/actions/setup-default-test-properties/test-properties.json
index f06de5174e6..7a3f9890a29 100644
--- a/.github/actions/setup-default-test-properties/test-properties.json
+++ b/.github/actions/setup-default-test-properties/test-properties.json
@@ -14,7 +14,7 @@
   },
   "JavaTestProperties": {
     "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"],
-    "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"],
+    "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.1"],
     "SPARK_VERSIONS": ["3"]
   },
   "GoTestProperties": {
diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml 
b/.github/workflows/run_rc_validation_java_quickstart.yml
index dce9b7f3fed..41a6991d14e 100644
--- a/.github/workflows/run_rc_validation_java_quickstart.yml
+++ b/.github/workflows/run_rc_validation_java_quickstart.yml
@@ -88,7 +88,7 @@ jobs:
       - name: Run QuickStart Java Flink Runner
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal
+          gradle-command: :runners:flink:2.1:runQuickstartJavaFlinkLocal
           arguments: |
             -Prepourl=${{ env.APACHE_REPO_URL }} \
             -Pver=${{ env.RELEASE_VERSION }}
diff --git a/.test-infra/validate-runner/build.gradle 
b/.test-infra/validate-runner/build.gradle
index 1817d7014a6..3992abb24dd 100644
--- a/.test-infra/validate-runner/build.gradle
+++ b/.test-infra/validate-runner/build.gradle
@@ -31,6 +31,19 @@ repositories {
     }
 }
 
+// Flink 2.1+ uses at.yawk.lz4:lz4-java while Spark uses org.lz4:lz4-java
+// Resolve capability conflict by preferring Flink's version when both are 
present
+configurations.all {
+  resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') 
{
+    def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') 
}
+    if (candidate != null) {
+      select(candidate)
+    } else {
+      selectHighestVersion()
+    }
+  }
+}
+
 dependencies {
     implementation 'com.offbytwo.jenkins:jenkins-client:0.3.8'
     implementation library.java.jackson_databind
diff --git a/examples/java/common.gradle b/examples/java/common.gradle
index 10ea43628bc..0e800e7cf98 100644
--- a/examples/java/common.gradle
+++ b/examples/java/common.gradle
@@ -35,6 +35,16 @@ configurations.sparkRunnerPreCommit {
   exclude group: "org.slf4j", module: "jul-to-slf4j"
   exclude group: "org.slf4j", module: "slf4j-jdk14"
 }
+configurations.flinkRunnerPreCommit {
+  resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java") 
{
+    def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') 
}
+    if (candidate != null) {
+      select(candidate)
+    } else {
+      selectHighestVersion()
+    }
+  }
+}
 
 dependencies {
   directRunnerPreCommit project(path: ":runners:direct-java", configuration: 
"shadow")
diff --git a/gradle.properties b/gradle.properties
index 95e50105a49..0289d02722b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -39,7 +39,7 @@ docker_image_default_repo_root=apache
 docker_image_default_repo_prefix=beam_
 
 # supported flink versions
-flink_versions=1.17,1.18,1.19,1.20,2.0
+flink_versions=1.17,1.18,1.19,1.20,2.0,2.1
 # supported spark versions
 spark_versions=3,4
 # supported python versions
diff --git a/runners/flink/2.1/build.gradle b/runners/flink/2.1/build.gradle
new file mode 100644
index 00000000000..e9092c2977f
--- /dev/null
+++ b/runners/flink/2.1/build.gradle
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+project.ext {
+  flink_major = '2.1'
+  flink_version = '2.1.3'
+  excluded_files = [
+    'main': [
+        // Used by DataSet API only
+        "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java",
+        "org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java",
+        
"org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java",
+        "org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java",
+        
"org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java",
+        // Moved to org.apache.flink.runtime.state.StateBackendFactory
+        "org/apache/beam/runners/flink/FlinkStateBackendFactory.java",
+    ],
+    'test': [
+        // Used by DataSet API only
+        
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java",
+        "org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java",
+        "org/apache/beam/runners/flink/batch/ReshuffleTest.java",
+    ]
+  ]
+}
+
+// Load the main build script which contains all build logic.
+apply from: "../flink_runner.gradle"
+
+// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
+// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
+configurations.all {
+  resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') 
{
+    def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') 
}
+    if (candidate != null) {
+      select(candidate)
+    } else {
+      selectHighestVersion()
+    }
+  }
+}
diff --git a/runners/flink/2.1/job-server-container/build.gradle 
b/runners/flink/2.1/job-server-container/build.gradle
new file mode 100644
index 00000000000..afdb68a0fc9
--- /dev/null
+++ b/runners/flink/2.1/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+  resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/2.1/job-server/build.gradle 
b/runners/flink/2.1/job-server/build.gradle
new file mode 100644
index 00000000000..277ddc07fda
--- /dev/null
+++ b/runners/flink/2.1/job-server/build.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+  archives_base_name = 'beam-runners-flink-2.1-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
+
+// Flink 2.1 uses at.yawk.lz4:lz4-java instead of org.lz4:lz4-java
+// Explicitly prefer Flink's at.yawk.lz4 version to resolve capability conflict
+configurations.all {
+  resolutionStrategy.capabilitiesResolution.withCapability('org.lz4:lz4-java') 
{
+    def candidate = candidates.find { it.id.toString().contains('at.yawk.lz4') 
}
+    if (candidate != null) {
+      select(candidate)
+    } else {
+      selectHighestVersion()
+    }
+  }
+}
diff --git a/sdks/go/examples/wasm/README.md b/sdks/go/examples/wasm/README.md
index e4ab54d4a3e..30fd22f624b 100644
--- a/sdks/go/examples/wasm/README.md
+++ b/sdks/go/examples/wasm/README.md
@@ -68,13 +68,13 @@ cd $BEAM_HOME
 Expected output should include the following, from which you acquire the 
latest flink runner version.
 
 ```shell
-'flink_versions: 1.17,1.18,1.19,1.20'
+'flink_versions: 1.17,1.18,1.19,1.20,2.0,2.1'
 ```
 
-#### 2. Set to the latest flink runner version i.e. 1.16
+#### 2. Set to the latest flink runner version i.e. 2.1
 
 ```shell
-FLINK_VERSION=1.16
+FLINK_VERSION=2.1
 ```
 
 #### 3. In a separate terminal, start the flink runner (It should take a few 
minutes on the first execution)
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index 8fcb0916614..3437ba12f2c 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -92,7 +92,7 @@ task flinkValidatesRunner {
   doFirst {
     // Copy Flink conf file
     copy {
-      from 
"${project.rootDir}/runners/flink/2.0/src/test/resources/flink-test-config.yaml"
+      from 
"${project.rootDir}/runners/flink/${flinkVersion}/src/test/resources/flink-test-config.yaml"
       into "${project.buildDir}/flink-conf"
 
       // Rename the file during the copy process
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index c813939d53f..978a79bf617 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -2106,7 +2106,7 @@ class JobServerOptions(PipelineOptions):
 class FlinkRunnerOptions(PipelineOptions):
 
   # These should stay in sync with gradle.properties.
-  PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0']
+  PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.1']
 
   @classmethod
   def _add_argparse_args(cls, parser):
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts 
b/sdks/typescript/src/apache_beam/runners/flink.ts
index 8f80b971da2..c8f8f57eb08 100644
--- a/sdks/typescript/src/apache_beam/runners/flink.ts
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service";
 const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
 
 // These should stay in sync with gradle.properties.
-const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"];
+const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", 
"2.1"];
 
 const defaultOptions = {
   flinkMaster: "[local]",

Reply via email to