This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d55cf588e58 Flink 2 support prerequisites (#37133)
d55cf588e58 is described below
commit d55cf588e586e0b04716c9dbca52874c54899a3c
Author: Yi Hu <[email protected]>
AuthorDate: Tue Jan 6 20:41:40 2026 -0500
Flink 2 support prerequisites (#37133)
* Honor getUseDataStreamForBatch pipeline option for Flink portable runner
* Refactor gradle scripts in preparation for Flink 2 support
* Create a PostCommit run validate runner tests on legacy DataSet
---
.github/workflows/README.md | 1 +
.../beam_PostCommit_Java_PVR_Flink_Batch.yml | 106 +++++++++++++++++
runners/flink/flink_runner.gradle | 125 ++++++++++++++-------
.../flink_job_server_container.gradle | 10 +-
runners/flink/job-server/flink_job_server.gradle | 31 +++--
.../beam/runners/flink/FlinkPipelineRunner.java | 4 +-
.../wrappers/streaming/DoFnOperator.java | 0
.../flink/streaming/MemoryStateBackendWrapper.java | 0
.../runners/flink/streaming/StreamSources.java | 0
9 files changed, 222 insertions(+), 55 deletions(-)
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f01d2a1257b..283be9c2b1f 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -344,6 +344,7 @@ PostCommit Jobs run in a schedule against master branch and
generally do not get
| [ PostCommit Java Nexmark Direct
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml)
| N/A |`beam_PostCommit_Java_Nexmark_Direct.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml?query=event%3Aschedule)
|
| [ PostCommit Java Nexmark Flink
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml)
| N/A |`beam_PostCommit_Java_Nexmark_Flink.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml?query=event%3Aschedule)
|
| [ PostCommit Java Nexmark Spark
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml)
| N/A |`beam_PostCommit_Java_Nexmark_Spark.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml?query=event%3Aschedule)
|
+| [ PostCommit Java PVR Flink Batch
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml)
| N/A |`beam_PostCommit_Java_PVR_Flink_Batch.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml?query=event%3Aschedule)
|
| [ PostCommit Java PVR Flink Streaming
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml)
| N/A |`beam_PostCommit_Java_PVR_Flink_Streaming.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Asch
[...]
| [ PostCommit Java PVR Samza
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml)
| N/A |`beam_PostCommit_Java_PVR_Samza.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule)
|
| [ PostCommit Java SingleStoreIO IT
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml)
| N/A |`beam_PostCommit_Java_SingleStoreIO_IT.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml?query=event%3Aschedule)
|
diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
new file mode 100644
index 00000000000..0a808f2f861
--- /dev/null
+++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: PostCommit Java PVR Flink Batch
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths:
+ - 'runners/flink/**'
+ - 'runners/java-fn-execution/**'
+ - 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
+ - '.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml'
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths:
+ - 'release/trigger_all_tests.json'
+ - '.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json'
+ schedule:
+ - cron: '15 2/6 * * *'
+ workflow_dispatch:
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.id ||
github.event.sender.login }}'
+ cancel-in-progress: true
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: write
+ checks: write
+ contents: read
+ deployments: read
+ id-token: none
+ issues: write
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+env:
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PostCommit_Java_PVR_Flink_Batch:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ strategy:
+ matrix:
+ job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
+ job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
+ timeout-minutes: 240
+ runs-on: [self-hosted, ubuntu-20.04, highmem]
+ if: |
+ github.event_name == 'push' ||
+ github.event_name == 'pull_request_target' ||
+ (github.event_name == 'schedule' && github.repository == 'apache/beam')
||
+ github.event_name == 'workflow_dispatch' ||
+ github.event.comment.body == 'Run Java_PVR_Flink_Batch PostCommit'
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup repository
+ uses: ./.github/actions/setup-action
+ with:
+ comment_phrase: ${{ matrix.job_phrase }}
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ - name: Setup environment
+ uses: ./.github/actions/setup-environment-action
+ - name: run validatesPortableRunnerBatch script
+ uses: ./.github/actions/gradle-command-self-hosted-action
+ with:
+ gradle-command:
:runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
+ env:
+ CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
+ - name: Archive JUnit Test Results
+ uses: actions/upload-artifact@v4
+ if: ${{ !success() }}
+ with:
+ name: JUnit Test Results
+ path: "**/build/reports/tests/"
+ - name: Upload test report
+ uses: actions/upload-artifact@v4
+ with:
+ name: java-code-coverage-report
+ path: "**/build/test-results/**/*.xml"
+# TODO: Investigate 'Max retries exceeded' issue with
EnricoMi/publish-unit-test-result-action@v2.
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 52f9631f455..af90c22cfb0 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -28,7 +28,8 @@ import groovy.json.JsonOutput
def base_path = ".."
def overrides(versions, type, base_path) {
- versions.collect { "${base_path}/${it}/src/${type}/java" } +
["./src/${type}/java"]
+ // order is important
+ ["${base_path}/src/${type}/java"] + versions.collect {
"${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
}
def all_versions = flink_versions.split(",")
@@ -49,7 +50,8 @@ applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.flink',
archivesBaseName: archivesBaseName,
// flink runner jars are in same package name. Publish javadoc once.
- exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
+ exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()),
+ requireJavaVersion: project.ext.flink_major.startsWith('2') ?
JavaVersion.VERSION_11 : null
)
description = "Apache Beam :: Runners :: Flink $flink_version"
@@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java")
*/
def sourceOverridesBase =
project.layout.buildDirectory.dir('source-overrides/src').get()
-def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
- it.from main_source_overrides
- it.into "${sourceOverridesBase}/main/java"
- it.duplicatesStrategy DuplicatesStrategy.INCLUDE
+def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
copyTask ->
+ copyTask.from main_source_overrides
+ copyTask.into "${sourceOverridesBase}/main/java"
+ copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
+
+ if (project.ext.has('excluded_files') &&
project.ext.excluded_files.containsKey('main')) {
+ project.ext.excluded_files.main.each { file ->
+ copyTask.exclude "**/${file}"
+ }
+ }
}
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
@@ -80,10 +88,16 @@ def copyResourcesOverrides =
tasks.register('copyResourcesOverrides', Copy) {
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
-def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
- it.from test_source_overrides
- it.into "${sourceOverridesBase}/test/java"
- it.duplicatesStrategy DuplicatesStrategy.INCLUDE
+def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy)
{ copyTask ->
+ copyTask.from test_source_overrides
+ copyTask.into "${sourceOverridesBase}/test/java"
+ copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
+
+ if (project.ext.has('excluded_files') &&
project.ext.excluded_files.containsKey('test')) {
+ project.ext.excluded_files.test.each { file ->
+ copyTask.exclude "**/${file}"
+ }
+ }
}
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides',
Copy) {
@@ -92,45 +106,69 @@ def copyTestResourcesOverrides =
tasks.register('copyTestResourcesOverrides', Co
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}
-// add dependency to gradle Java plugin defined tasks
-compileJava.dependsOn copySourceOverrides
-processResources.dependsOn copyResourcesOverrides
-compileTestJava.dependsOn copyTestSourceOverrides
-processTestResources.dependsOn copyTestResourcesOverrides
-
-// add dependency BeamModulePlugin defined custom tasks
-// they are defined only when certain flags are provided (e.g. -Prelease;
-Ppublishing, etc)
-def sourcesJar = project.tasks.findByName('sourcesJar')
-if (sourcesJar != null) {
- sourcesJar.dependsOn copySourceOverrides
- sourcesJar.dependsOn copyResourcesOverrides
-}
-def testSourcesJar = project.tasks.findByName('testSourcesJar')
-if (testSourcesJar != null) {
- testSourcesJar.dependsOn copyTestSourceOverrides
- testSourcesJar.dependsOn copyTestResourcesOverrides
-}
+def use_override = (flink_major != all_versions.first())
+def sourceBase = "${project.projectDir}/../src"
-/*
+if (use_override) {
+ // Copy original+version specific sources to a tmp dir and use it as
sourceSet
+ // add dependency to gradle Java plugin defined tasks
+ compileJava.dependsOn copySourceOverrides
+ processResources.dependsOn copyResourcesOverrides
+ compileTestJava.dependsOn copyTestSourceOverrides
+ processTestResources.dependsOn copyTestResourcesOverrides
+
+ // add dependency BeamModulePlugin defined custom tasks
+ // they are defined only when certain flags are provided (e.g. -Prelease;
-Ppublishing, etc)
+ def sourcesJar = project.tasks.findByName('sourcesJar')
+ if (sourcesJar != null) {
+ sourcesJar.dependsOn copySourceOverrides
+ sourcesJar.dependsOn copyResourcesOverrides
+ }
+ def testSourcesJar = project.tasks.findByName('testSourcesJar')
+ if (testSourcesJar != null) {
+ testSourcesJar.dependsOn copyTestSourceOverrides
+ testSourcesJar.dependsOn copyTestResourcesOverrides
+ }
+ /*
* We have to explicitly set all directories here to make sure each
* version of Flink has the correct overrides set.
*/
-def sourceBase = "${project.projectDir}/../src"
-sourceSets {
- main {
- java {
- srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
+ sourceSets {
+ main {
+ java {
+ srcDirs = ["${sourceOverridesBase}/main/java"]
+ }
+ resources {
+ srcDirs = ["${sourceBase}/main/resources",
"${sourceOverridesBase}/main/resources"]
+ }
}
- resources {
- srcDirs = ["${sourceBase}/main/resources",
"${sourceOverridesBase}/main/resources"]
+ test {
+ java {
+ srcDirs = ["${sourceOverridesBase}/test/java"]
+ }
+ resources {
+ srcDirs = ["${sourceBase}/test/resources",
"${sourceOverridesBase}/test/resources"]
+ }
}
}
- test {
- java {
- srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
+} else {
+ // Use the original sources directly for the lowest supported Flink version.
+ sourceSets {
+ main {
+ java {
+ srcDirs = ["${sourceBase}/main/java"]
+ }
+ resources {
+ srcDirs = ["${sourceBase}/main/resources"]
+ }
}
- resources {
- srcDirs = ["${sourceBase}/test/resources",
"${sourceOverridesBase}/test/resources"]
+ test {
+ java {
+ srcDirs = ["${sourceBase}/test/java"]
+ }
+ resources {
+ srcDirs = ["${sourceBase}/test/resources"]
+ }
}
}
}
@@ -196,7 +234,10 @@ dependencies {
implementation "org.apache.flink:flink-core:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
- implementation "org.apache.flink:flink-java:$flink_version"
+ if (project.ext.flink_major.startsWith('1')) {
+ // FLINK-36336: dataset API removed in Flink 2
+ implementation "org.apache.flink:flink-java:$flink_version"
+ }
implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
diff --git
a/runners/flink/job-server-container/flink_job_server_container.gradle
b/runners/flink/job-server-container/flink_job_server_container.gradle
index 3f30a1aac1f..cf492b46929 100644
--- a/runners/flink/job-server-container/flink_job_server_container.gradle
+++ b/runners/flink/job-server-container/flink_job_server_container.gradle
@@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) {
}
def pushContainers = project.rootProject.hasProperty(["isRelease"]) ||
project.rootProject.hasProperty("push-containers")
+def containerName = project.parent.name.startsWith("2") ? "flink_job_server" :
"flink${project.parent.name}_job_server"
+def containerTag = project.rootProject.hasProperty(["docker-tag"]) ?
project.rootProject["docker-tag"] : project.sdk_version
+if (project.parent.name.startsWith("2")) {
+ containerTag += "-flink${project.parent.name}"
+}
docker {
name containerImageName(
- name: project.docker_image_default_repo_prefix +
"flink${project.parent.name}_job_server",
+ name: project.docker_image_default_repo_prefix + containerName,
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
project.rootProject["docker-repository-root"] :
project.docker_image_default_repo_root,
- tag: project.rootProject.hasProperty(["docker-tag"]) ?
- project.rootProject["docker-tag"] : project.sdk_version)
+ tag: containerTag)
// tags used by dockerTag task
tags containerImageTags()
files "./build/"
diff --git a/runners/flink/job-server/flink_job_server.gradle
b/runners/flink/job-server/flink_job_server.gradle
index d8a818ff84c..b85f8fc98aa 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -29,6 +29,11 @@ apply plugin: 'application'
// we need to set mainClassName before applying shadow plugin
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
+// Resolve the Flink project name (and version) the job-server is based on
+def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
+evaluationDependsOn(flinkRunnerProject)
+boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2')
+
applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
archivesBaseName: project.hasProperty('archives_base_name') ?
archives_base_name : archivesBaseName,
@@ -37,11 +42,9 @@ applyJavaNature(
shadowClosure: {
append "reference.conf"
},
+ requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null
)
-// Resolve the Flink project name (and version) the job-server is based on
-def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
-
description = project(flinkRunnerProject).description + " :: Job Server"
/*
@@ -126,11 +129,12 @@ runShadow {
jvmArgs +=
["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
}
-def portableValidatesRunnerTask(String name, boolean streaming, boolean
checkpointing, boolean docker) {
+def portableValidatesRunnerTask(String name, String mode, boolean
checkpointing, boolean docker) {
def pipelineOptions = [
// Limit resource consumption via parallelism
"--parallelism=2",
]
+ boolean streaming = (mode == "streaming")
if (streaming) {
pipelineOptions += "--streaming"
if (checkpointing) {
@@ -138,6 +142,9 @@ def portableValidatesRunnerTask(String name, boolean
streaming, boolean checkpoi
pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
}
}
+ if (mode == "batch") {
+ pipelineOptions += "--useDataStreamForBatch=true"
+ }
createPortableValidatesRunnerTask(
name: "validatesPortableRunner${name}",
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
@@ -186,7 +193,9 @@ def portableValidatesRunnerTask(String name, boolean
streaming, boolean checkpoi
excludeCategories
'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
return
}
-
+ if (mode == "batch") {
+ excludeCategories
'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
+ }
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
@@ -214,13 +223,17 @@ def portableValidatesRunnerTask(String name, boolean
streaming, boolean checkpoi
)
}
-project.ext.validatesPortableRunnerDocker =
portableValidatesRunnerTask("Docker", false, false, true)
-project.ext.validatesPortableRunnerBatch =
portableValidatesRunnerTask("Batch", false, false, false)
-project.ext.validatesPortableRunnerStreaming =
portableValidatesRunnerTask("Streaming", true, false, false)
-project.ext.validatesPortableRunnerStreamingCheckpoint =
portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
+project.ext.validatesPortableRunnerDocker =
portableValidatesRunnerTask("Docker", "batch", false, true)
+project.ext.validatesPortableRunnerBatchDataSet =
portableValidatesRunnerTask("BatchDataSet", "batch-dataset", false, false)
+project.ext.validatesPortableRunnerBatch =
portableValidatesRunnerTask("Batch", "batch", false, false)
+project.ext.validatesPortableRunnerStreaming =
portableValidatesRunnerTask("Streaming", "streaming", false, false)
+project.ext.validatesPortableRunnerStreamingCheckpoint =
portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false)
tasks.register("validatesPortableRunner") {
dependsOn validatesPortableRunnerDocker
+ if (!isFlink2) {
+ dependsOn validatesPortableRunnerBatchDataSet
+ }
dependsOn validatesPortableRunnerBatch
dependsOn validatesPortableRunnerStreaming
dependsOn validatesPortableRunnerStreamingCheckpoint
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index c9559a39270..11175129d7e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -84,7 +84,9 @@ public class FlinkPipelineRunner implements
PortablePipelineRunner {
SdkHarnessOptions.getConfiguredLoggerFromOptions(pipelineOptions.as(SdkHarnessOptions.class));
FlinkPortablePipelineTranslator<?> translator;
- if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline))
{
+ if (!pipelineOptions.getUseDataStreamForBatch()
+ && !pipelineOptions.isStreaming()
+ && !hasUnboundedPCollections(pipeline)) {
// TODO: Do we need to inspect for unbounded sources before fusing?
translator = FlinkBatchPortablePipelineTranslator.createTranslator();
} else {
diff --git
a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 100%
rename from
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
rename to
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
diff --git
a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
similarity index 100%
rename from
runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
rename to
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
diff --git
a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
similarity index 100%
rename from
runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
rename to
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java