This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new efa22a493 [CELEBORN-1105][FLINK] Support Flink 1.18
efa22a493 is described below
commit efa22a4936335d98195ed99d8cabe1fd89d3a3a6
Author: sychen <[email protected]>
AuthorDate: Mon Nov 6 15:53:39 2023 +0800
[CELEBORN-1105][FLINK] Support Flink 1.18
### What changes were proposed in this pull request?
### Why are the changes needed?
```bash
flink-1.18.0
./bin/start-cluster.sh
./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
```
```java
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.<init>(Ljava/lang/String;ILorg/apache/flink/runtime/jobgraph/IntermediateDataSetID;Lorg/apache/flink/runtime/io/network/partition/ResultPartitionType;Lorg/apache/flink/runtime/executiongraph/IndexRange;ILorg/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider;Lorg/apache/flink/util/function/SupplierWithException;Lorg/apache/flink/runtime/io/network/buffer
[...]
at
org.apache.celeborn.plugin.flink.RemoteShuffleInputGate$FakedRemoteInputChannel.<init>(RemoteShuffleInputGate.java:225)
at
org.apache.celeborn.plugin.flink.RemoteShuffleInputGate.getChannel(RemoteShuffleInputGate.java:179)
at
org.apache.flink.runtime.io.network.partition.consumer.InputGate.setChannelStateWriter(InputGate.java:90)
at
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setChannelStateWriter(InputGateWithMetrics.java:120)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.injectChannelStateWriterIntoChannels(StreamTask.java:524)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:496)
```
Flink 1.18.0 release
https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
Interface `org.apache.flink.runtime.io.network.buffer.Buffer` adds
`setRecycler` method.
[[FLINK-32549](https://issues.apache.org/jira/browse/FLINK-32549)][network]
Tiered storage memory manager supports ownership transfer for buffers
`org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate`
constructor adds parameters.
[[FLINK-31638](https://issues.apache.org/jira/browse/FLINK-31638)][network]
Introduce the TieredStorageConsumerClient to SingleInputGate
[[FLINK-31642](https://issues.apache.org/jira/browse/FLINK-31642)][network]
Introduce the MemoryTierConsumerAgent to TieredStorageConsumerClient
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
```bash
flink-1.18.0 ./bin/flink run examples/streaming/WordCount.jar
--execution-mode BATCH
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d7fc5f0ca018a54e9453c4d35f7c598a
Program execution finished
Job with JobID d7fc5f0ca018a54e9453c4d35f7c598a has finished.
Job Runtime: 1635 ms
```
<img width="1297" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/3898450/6a5266bf-2386-4386-b98b-a60d2570fa99">
Closes #2063 from cxzl25/CELEBORN-1105.
Authored-by: sychen <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.github/workflows/deps.yml | 2 +
.github/workflows/maven.yml | 1 +
.github/workflows/sbt.yml | 1 +
.github/workflows/style.yml | 1 +
README.md | 5 +-
build/make-distribution.sh | 2 +
build/release/release.sh | 8 +
.../celeborn/plugin/flink/buffer/BufferPacker.java | 11 +
client-flink/flink-1.18-shaded/pom.xml | 136 +++++
.../src/main/resources/META-INF/LICENSE | 247 ++++++++
.../src/main/resources/META-INF/NOTICE | 13 +
.../META-INF/licenses/LICENSE-protobuf.txt | 42 ++
client-flink/flink-1.18/pom.xml | 69 +++
.../plugin/flink/RemoteShuffleEnvironment.java | 82 +++
.../plugin/flink/RemoteShuffleInputGate.java | 301 ++++++++++
.../flink/RemoteShuffleInputGateFactory.java | 55 ++
.../plugin/flink/RemoteShuffleResultPartition.java | 225 ++++++++
.../flink/RemoteShuffleResultPartitionFactory.java | 79 +++
.../plugin/flink/RemoteShuffleServiceFactory.java | 60 ++
.../plugin/flink/SimpleResultPartitionAdapter.java | 27 +
.../plugin/flink/RemoteShuffleMasterTest.java | 290 ++++++++++
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 621 +++++++++++++++++++++
.../flink/RemoteShuffleServiceFactorySuitJ.java | 58 ++
dev/dependencies.sh | 4 +
dev/deps/dependencies-client-flink-1.18 | 78 +++
dev/reformat | 1 +
docs/README.md | 2 +-
docs/developers/sbt.md | 1 +
pom.xml | 20 +
project/CelebornBuild.scala | 11 +
30 files changed, 2450 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/deps.yml b/.github/workflows/deps.yml
index 32d551ce1..a1318d009 100644
--- a/.github/workflows/deps.yml
+++ b/.github/workflows/deps.yml
@@ -50,6 +50,7 @@ jobs:
- 'flink-1.14'
- 'flink-1.15'
- 'flink-1.17'
+ - 'flink-1.18'
- 'mr'
steps:
- uses: actions/checkout@v2
@@ -80,6 +81,7 @@ jobs:
- 'flink-1.14'
- 'flink-1.15'
- 'flink-1.17'
+ - 'flink-1.18'
- 'mr'
steps:
- uses: actions/checkout@v2
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 1080b615c..43e602d63 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -154,6 +154,7 @@ jobs:
- '1.14'
- '1.15'
- '1.17'
+ - '1.18'
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index e778f70be..740e6bdf7 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -203,6 +203,7 @@ jobs:
- '1.14'
- '1.15'
- '1.17'
+ - '1.18'
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index 377cf7822..55c885952 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -50,6 +50,7 @@ jobs:
build/mvn spotless:check -Pgoogle-mirror,flink-1.14
build/mvn spotless:check -Pgoogle-mirror,flink-1.15
build/mvn spotless:check -Pgoogle-mirror,flink-1.17
+ build/mvn spotless:check -Pgoogle-mirror,flink-1.18
build/mvn spotless:check -Pgoogle-mirror,spark-2.4
build/mvn spotless:check -Pgoogle-mirror,spark-3.3
build/mvn spotless:check -Pgoogle-mirror,mr
diff --git a/README.md b/README.md
index 2472ce4c1..1820734be 100644
--- a/README.md
+++ b/README.md
@@ -41,12 +41,12 @@ Celeborn Worker's slot count is decided by `total usable
disk size / average shu
Celeborn worker's slot count decreases when a partition is allocated and
increments when a partition is freed.
## Build
-1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5, Flink 1.14/1.15/1.17
and Hadoop MapReduce 2/3.
+1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5, Flink
1.14/1.15/1.17/1.18 and Hadoop MapReduce 2/3.
2. Celeborn tested under Scala 2.11/2.12/2.13 and Java 8/11/17 environment.
Build Celeborn
```shell
-./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pmr
+./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pflink-1.18/-Pmr
```
package apache-celeborn-${project.version}-bin.tgz will be generated.
@@ -65,6 +65,7 @@ package apache-celeborn-${project.version}-bin.tgz will be
generated.
| Flink 1.14 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.15 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.17 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
+| Flink 1.18 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
### Package Details
Build procedure will create a compressed package.
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 6dd19e249..7aa4fc9c2 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -309,6 +309,7 @@ if [ "$SBT_ENABLED" == "true" ]; then
sbt_build_client -Pflink-1.14
sbt_build_client -Pflink-1.15
sbt_build_client -Pflink-1.17
+ sbt_build_client -Pflink-1.18
sbt_build_client -Pmr
else
echo "build client with $@"
@@ -339,6 +340,7 @@ else
build_flink_client -Pflink-1.14
build_flink_client -Pflink-1.15
build_flink_client -Pflink-1.17
+ build_flink_client -Pflink-1.18
build_mr_client -Pmr
else
## build release package on demand
diff --git a/build/release/release.sh b/build/release/release.sh
index ce89a981b..a87d2d31c 100755
--- a/build/release/release.sh
+++ b/build/release/release.sh
@@ -136,6 +136,14 @@ upload_nexus_staging() {
${PROJECT_DIR}/build/mvn deploy -DskipTests -Papache-release,flink-1.17 \
-s "${PROJECT_DIR}/build/release/asf-settings.xml" \
-pl :celeborn-client-flink-1.17-shaded_2.12
+
+ echo "Deploying celeborn-client-flink-1.18-shaded_2.12"
+ ${PROJECT_DIR}/build/mvn clean install -DskipTests
-Papache-release,flink-1.18 \
+ -s "${PROJECT_DIR}/build/release/asf-settings.xml" \
+ -pl :celeborn-client-flink-1.18-shaded_2.12 -am
+ ${PROJECT_DIR}/build/mvn deploy -DskipTests -Papache-release,flink-1.18 \
+ -s "${PROJECT_DIR}/build/release/asf-settings.xml" \
+ -pl :celeborn-client-flink-1.18-shaded_2.12
}
finalize_svn() {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
index 53c887d42..c0176e111 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
+import org.apache.celeborn.reflect.DynMethods;
/** Harness used to pack multiple partial buffers together as a full one. */
public class BufferPacker {
@@ -216,6 +217,16 @@ public class BufferPacker {
return buffer.getRecycler();
}
+ // Flink 1.18.0
+ // [FLINK-32549][network] Tiered storage memory manager supports ownership
transfer for buffers
+ public void setRecycler(BufferRecycler bufferRecycler) {
+ DynMethods.builder("setRecycler")
+ .impl(buffer.getClass().getName(), BufferRecycler.class)
+ .build()
+ .bind(buffer)
+ .invoke(bufferRecycler);
+ }
+
@Override
public void recycleBuffer() {
buffer.recycleBuffer();
diff --git a/client-flink/flink-1.18-shaded/pom.xml
b/client-flink/flink-1.18-shaded/pom.xml
new file mode 100644
index 000000000..b03e83029
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
<artifactId>celeborn-client-flink-1.18-shaded_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Shaded Client for Flink 1.18</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-1.18_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>org.apache.celeborn:*</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>io.netty:*</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>**/*.proto</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/log4j.properties</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <exclude>META-INF/NOTICE.txt</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+ </transformers>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.plugin.antrun.version}</version>
+ <executions>
+ <execution>
+ <id>rename-native-library</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <echo message="unpacking netty jar"></echo>
+ <unzip dest="${project.build.directory}/unpacked/"
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+ <echo message="renaming native epoll library"></echo>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_x86_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so"
type="glob"></mapper>
+ </move>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_aarch_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so"
type="glob"></mapper>
+ </move>
+ <echo message="deleting native kqueue library"></echo>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+ <echo message="repackaging netty jar"></echo>
+ <jar basedir="${project.build.directory}/unpacked"
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000..0441af2ac
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,247 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+------------------------------------------------------------------------------------
+This project bundles the following dependencies under the Apache License 2.0
(http://www.apache.org/licenses/LICENSE-2.0.txt):
+
+
+Apache License 2.0
+--------------------------------------
+
+org.apache.commons:commons-lang3
+io.netty:netty-all
+io.netty:netty-buffer
+io.netty:netty-codec
+io.netty:netty-codec-dns
+io.netty:netty-codec-haproxy
+io.netty:netty-codec-http
+io.netty:netty-codec-http2
+io.netty:netty-codec-memcache
+io.netty:netty-codec-mqtt
+io.netty:netty-codec-redis
+io.netty:netty-codec-smtp
+io.netty:netty-codec-socks
+io.netty:netty-codec-stomp
+io.netty:netty-codec-xml
+io.netty:netty-common
+io.netty:netty-handler
+io.netty:netty-transport-native-unix-common
+io.netty:netty-handler-proxy
+io.netty:netty-resolver
+io.netty:netty-resolver-dns
+io.netty:netty-transport
+io.netty:netty-transport-rxtx
+io.netty:netty-transport-sctp
+io.netty:netty-transport-udt
+io.netty:netty-transport-classes-epoll
+io.netty:netty-transport-classes-kqueue
+io.netty:netty-transport-native-epoll
+io.netty:netty-transport-native-kqueue
+com.google.guava:guava
+org.roaringbitmap:RoaringBitmap
+
+
+BSD 3-clause
+------------
+See license/LICENSE-protobuf.txt for details.
+com.google.protobuf:protobuf-java
diff --git a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000..a4001a2ce
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,13 @@
+
+Apache Celeborn (Incubating)
+Copyright 2022-2023 The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (https://www.apache.org/).
+
+=============================================================================
+= NOTICE file corresponding to section 4d of the Apache License Version 2.0 =
+=============================================================================
+
+Apache Commons Lang
+Copyright 2001-2021 The Apache Software Foundation
diff --git
a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
new file mode 100644
index 000000000..b4350ec83
--- /dev/null
+++
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
@@ -0,0 +1,42 @@
+This license applies to all parts of Protocol Buffers except the following:
+
+ - Atomicops support for generic gcc, located in
+ src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
+ This file is copyrighted by Red Hat Inc.
+
+ - Atomicops support for AIX/POWER, located in
+ src/google/protobuf/stubs/atomicops_internals_aix.h.
+ This file is copyrighted by Bloomberg Finance LP.
+
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it. This code is not
+standalone and requires a support library to be linked with it. This
+support library is itself covered by the above license.
\ No newline at end of file
diff --git a/client-flink/flink-1.18/pom.xml b/client-flink/flink-1.18/pom.xml
new file mode 100644
index 000000000..1d51f8dc8
--- /dev/null
+++ b/client-flink/flink-1.18/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-flink-1.18_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for Flink 1.18</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
new file mode 100644
index 000000000..01740f1f3
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/**
+ * The implementation of {@link ShuffleEnvironment} based on the remote
shuffle service, providing
+ * shuffle environment on flink TM side.
+ */
+public class RemoteShuffleEnvironment extends AbstractRemoteShuffleEnvironment
+ implements ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> {
+
+ /** Factory class to create {@link RemoteShuffleResultPartition}. */
+ private final RemoteShuffleResultPartitionFactory resultPartitionFactory;
+
+ private final RemoteShuffleInputGateFactory inputGateFactory;
+
+ /**
+ * @param networkBufferPool Network buffer pool for shuffle read and shuffle
write.
+ * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
+ * @param resultPartitionFactory Factory class to create {@link
RemoteShuffleResultPartition}. //
+ * * @param inputGateFactory Factory class to create {@link
RemoteShuffleInputGate}.
+ */
+ public RemoteShuffleEnvironment(
+ NetworkBufferPool networkBufferPool,
+ ResultPartitionManager resultPartitionManager,
+ RemoteShuffleResultPartitionFactory resultPartitionFactory,
+ RemoteShuffleInputGateFactory inputGateFactory,
+ CelebornConf conf) {
+
+ super(networkBufferPool, resultPartitionManager, conf);
+ this.resultPartitionFactory = resultPartitionFactory;
+ this.inputGateFactory = inputGateFactory;
+ }
+
+ @Override
+ public ResultPartitionWriter createResultPartitionWriterInternal(
+ ShuffleIOOwnerContext ownerContext,
+ int index,
+ ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor,
+ CelebornConf conf) {
+ return resultPartitionFactory.create(
+ ownerContext.getOwnerName(), index,
resultPartitionDeploymentDescriptor, conf);
+ }
+
+ @Override
+ IndexedInputGate createInputGateInternal(
+ ShuffleIOOwnerContext ownerContext, int gateIndex,
InputGateDeploymentDescriptor igdd) {
+ return inputGateFactory.create(ownerContext.getOwnerName(), gateIndex,
igdd);
+ }
+
+ @VisibleForTesting
+ RemoteShuffleResultPartitionFactory getResultPartitionFactory() {
+ return resultPartitionFactory;
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
new file mode 100644
index 000000000..3e73ae91e
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** A {@link IndexedInputGate} which ingest data from remote shuffle workers.
*/
+public class RemoteShuffleInputGate extends IndexedInputGate {
+
+ private final RemoteShuffleInputGateDelegation inputGateDelegation;
+
+ public RemoteShuffleInputGate(
+ CelebornConf celebornConf,
+ String taskName,
+ int gateIndex,
+ InputGateDeploymentDescriptor gateDescriptor,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ BufferDecompressor bufferDecompressor,
+ int numConcurrentReading) {
+
+ inputGateDelegation =
+ new RemoteShuffleInputGateDelegation(
+ celebornConf,
+ taskName,
+ gateIndex,
+ gateDescriptor,
+ bufferPoolFactory,
+ bufferDecompressor,
+ numConcurrentReading,
+ availabilityHelper,
+ gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
+ gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
+ }
+
+ /** Setup gate and build network connections. */
+ @Override
+ public void setup() throws IOException {
+ inputGateDelegation.setup();
+ }
+
+ /** Index of the gate of the corresponding computing task. */
+ @Override
+ public int getGateIndex() {
+ return inputGateDelegation.getGateIndex();
+ }
+
+ /** Get number of input channels. A channel is a data flow from one shuffle
worker. */
+ @Override
+ public int getNumberOfInputChannels() {
+ return inputGateDelegation.getBufferReaders().size();
+ }
+
+ /** Whether reading is finished -- all channels are finished and cached
buffers are drained. */
+ @Override
+ public boolean isFinished() {
+ return inputGateDelegation.isFinished();
+ }
+
+ @Override
+ public Optional<BufferOrEvent> getNext() {
+ throw new UnsupportedOperationException("Not implemented (DataSet API is
not supported).");
+ }
+
+ /** Poll a received {@link BufferOrEvent}. */
+ @Override
+ public Optional<BufferOrEvent> pollNext() throws IOException {
+ return inputGateDelegation.pollNext();
+ }
+
+ /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
+ @Override
+ public void close() throws Exception {
+ inputGateDelegation.close();
+ }
+
+ /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
+ @Override
+ public List<InputChannelInfo> getChannelInfos() {
+ return inputGateDelegation.getChannelsInfo();
+ }
+
+ @Override
+ public void requestPartitions() {
+ // do-nothing
+ }
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) {
+ // do-nothing.
+ }
+
+ @Override
+ public void checkpointStopped(long cancelledCheckpointId) {
+ // do-nothing.
+ }
+
+ @Override
+ public void triggerDebloating() {
+ // do-nothing.
+ }
+
+ @Override
+ public List<InputChannelInfo> getUnfinishedChannels() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public EndOfDataStatus hasReceivedEndOfData() {
+ if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
+ return EndOfDataStatus.NOT_END_OF_DATA;
+ } else {
+ // Keep compatibility with streaming mode.
+ return EndOfDataStatus.DRAINED;
+ }
+ }
+
+ @Override
+ public void finishReadRecoveredState() {
+ // do-nothing.
+ }
+
+ @Override
+ public InputChannel getChannel(int channelIndex) {
+ return new FakedRemoteInputChannel(channelIndex);
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) {
+ throw new FlinkRuntimeException("Method should not be called.");
+ }
+
+ @Override
+ public void resumeConsumption(InputChannelInfo channelInfo) {
+ throw new FlinkRuntimeException("Method should not be called.");
+ }
+
+ @Override
+ public void acknowledgeAllRecordsProcessed(InputChannelInfo
inputChannelInfo) {}
+
+ @Override
+ public CompletableFuture<Void> getStateConsumedFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
+ inputGateDelegation.getTaskName(),
+ inputGateDelegation.getGateIndex(),
+ inputGateDelegation.getGateDescriptor().toString());
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private class FakedRemoteInputChannel extends RemoteInputChannel {
+ FakedRemoteInputChannel(int channelIndex) {
+ // Flink 1.18.0
+ // [FLINK-31638][network] Introduce the TieredStorageConsumerClient to
SingleInputGate
+ // [FLINK-31642][network] Introduce the MemoryTierConsumerAgent to
TieredStorageConsumerClient
+ super(
+ new SingleInputGate(
+ "",
+ inputGateDelegation.getGateIndex(),
+ new IntermediateDataSetID(),
+ ResultPartitionType.BLOCKING,
+ new IndexRange(0, 0),
+ 1,
+ (a, b, c) -> {},
+ () -> null,
+ null,
+ new FakedMemorySegmentProvider(),
+ 0,
+ new ThroughputCalculator(SystemClock.getInstance()),
+ null,
+ null,
+ null,
+ null),
+ channelIndex,
+ new ResultPartitionID(),
+ 0,
+ new ConnectionID(
+ new TaskManagerLocation(ResourceID.generate(),
InetAddress.getLoopbackAddress(), 1),
+ 0),
+ new LocalConnectionManager(),
+ 0,
+ 0,
+ 0,
+ new SimpleCounter(),
+ new SimpleCounter(),
+ new FakedChannelStateWriter());
+ }
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private static class FakedMemorySegmentProvider implements
MemorySegmentProvider {
+
+ @Override
+ public Collection<MemorySegment> requestUnpooledMemorySegments(int i)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void recycleUnpooledMemorySegments(Collection<MemorySegment>
collection)
+ throws IOException {}
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private static class FakedChannelStateWriter implements ChannelStateWriter {
+
+ @Override
+ public void start(long cpId, CheckpointOptions checkpointOptions) {}
+
+ @Override
+ public void addInputData(
+ long cpId, InputChannelInfo info, int startSeqNum,
CloseableIterator<Buffer> data) {}
+
+ @Override
+ public void addOutputData(
+ long cpId, ResultSubpartitionInfo info, int startSeqNum, Buffer...
data) {}
+
+ @Override
+ public void addOutputDataFuture(
+ long l,
+ ResultSubpartitionInfo resultSubpartitionInfo,
+ int i,
+ CompletableFuture<List<Buffer>> completableFuture)
+ throws IllegalArgumentException {}
+
+ @Override
+ public void finishInput(long checkpointId) {}
+
+ @Override
+ public void finishOutput(long checkpointId) {}
+
+ @Override
+ public void abort(long checkpointId, Throwable cause, boolean cleanup) {}
+
+ @Override
+ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
new file mode 100644
index 000000000..d13613023
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleInputGate}. */
+public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGateFactory {
+
+ public RemoteShuffleInputGateFactory(
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ super(conf, networkBufferPool, networkBufferSize);
+ }
+
+ // For testing.
+ protected RemoteShuffleInputGate createInputGate(
+ String owningTaskName,
+ int gateIndex,
+ InputGateDeploymentDescriptor igdd,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ BufferDecompressor bufferDecompressor) {
+ return new RemoteShuffleInputGate(
+ this.celebornConf,
+ owningTaskName,
+ gateIndex,
+ igdd,
+ bufferPoolFactory,
+ bufferDecompressor,
+ numConcurrentReading);
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
new file mode 100644
index 000000000..820e456db
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import javax.annotation.Nullable;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+import org.apache.celeborn.plugin.flink.utils.Utils;
+
+/**
+ * A {@link ResultPartition} which appends records and events to {@link
SortBuffer} and after the
+ * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be
copied and spilled to the
+ * remote shuffle service in subpartition index order sequentially. Large
records that can not be
+ * appended to an empty {@link SortBuffer} will be spilled directly.
+ */
+public class RemoteShuffleResultPartition extends ResultPartition {
+
+ private final RemoteShuffleResultPartitionDelegation delegation;
+
+ public RemoteShuffleResultPartition(
+ String owningTaskName,
+ int partitionIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionType partitionType,
+ int numSubpartitions,
+ int numTargetKeyGroups,
+ int networkBufferSize,
+ ResultPartitionManager partitionManager,
+ @Nullable BufferCompressor bufferCompressor,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ RemoteShuffleOutputGate outputGate) {
+
+ super(
+ owningTaskName,
+ partitionIndex,
+ partitionId,
+ partitionType,
+ numSubpartitions,
+ numTargetKeyGroups,
+ partitionManager,
+ bufferCompressor,
+ bufferPoolFactory);
+
+ delegation =
+ new RemoteShuffleResultPartitionDelegation(
+ networkBufferSize,
+ outputGate,
+ (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
+ numSubpartitions);
+ }
+
+ @Override
+ public void setup() throws IOException {
+ super.setup();
+ BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
+ delegation.setup(
+ bufferPool,
+ bufferCompressor,
+ buffer -> canBeCompressed(buffer),
+ () -> checkInProduceState());
+ }
+
+ @Override
+ protected void setupInternal() {
+ // do not need to implement
+ }
+
+ @Override
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws
IOException {
+ delegation.emit(record, targetSubpartition, DataType.DATA_BUFFER, false);
+ }
+
+ @Override
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ delegation.broadcast(record, DataType.DATA_BUFFER);
+ }
+
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent)
throws IOException {
+ Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+ try {
+ ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+ delegation.broadcast(serializedEvent, buffer.getDataType());
+ } finally {
+ buffer.recycleBuffer();
+ }
+ }
+
+ @Override
+ public void alignedBarrierTimeout(long l) {}
+
+ @Override
+ public void abortCheckpoint(long l, CheckpointException e) {}
+
+ @Override
+ public void finish() throws IOException {
+ Utils.checkState(!isReleased(), "Result partition is already released.");
+ broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+ delegation.finish();
+ super.finish();
+ }
+
+ @Override
+ public synchronized void close() {
+ delegation.close(() -> super.close());
+ }
+
+ @Override
+ protected void releaseInternal() {
+ // no-op
+ }
+
+ @Override
+ public void flushAll() {
+ delegation.flushAll();
+ }
+
+ @Override
+ public void flush(int subpartitionIndex) {
+ flushAll();
+ }
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ return AVAILABLE;
+ }
+
+ @Override
+ public int getNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ @Override
+ public long getSizeOfQueuedBuffersUnsafe() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfQueuedBuffers(int targetSubpartition) {
+ return 0;
+ }
+
+ @Override
+ public ResultSubpartitionView createSubpartitionView(
+ int index, BufferAvailabilityListener availabilityListener) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public void notifyEndOfData(StopMode mode) throws IOException {
+ if (!delegation.isEndOfDataNotified()) {
+ broadcastEvent(new EndOfData(mode), false);
+ delegation.setEndOfDataNotified(true);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> getAllDataProcessedFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultPartition "
+ + partitionId.toString()
+ + " ["
+ + partitionType
+ + ", "
+ + numSubpartitions
+ + " subpartitions, shuffle-descriptor: "
+ + delegation.getOutputGate().getShuffleDesc()
+ + "]";
+ }
+
+ @VisibleForTesting
+ public RemoteShuffleResultPartitionDelegation getDelegation() {
+ return delegation;
+ }
+
+ public void updateStatistics(
+ SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+ numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
+ long readableBytes = bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ if (isBroadcast) {
+ resultPartitionBytes.incAll(readableBytes);
+ } else {
+ resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(),
readableBytes);
+ }
+ numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
new file mode 100644
index 000000000..b75435b92
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleResultPartition}. */
+public class RemoteShuffleResultPartitionFactory
+ extends AbstractRemoteShuffleResultPartitionFactory {
+
+ public RemoteShuffleResultPartitionFactory(
+ CelebornConf celebornConf,
+ ResultPartitionManager partitionManager,
+ BufferPoolFactory bufferPoolFactory,
+ int networkBufferSize) {
+
+ super(celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
+ }
+
+ @Override
+ ResultPartition createRemoteShuffleResultPartitionInternal(
+ String taskNameWithSubtaskAndId,
+ int partitionIndex,
+ ResultPartitionID id,
+ ResultPartitionType type,
+ int numSubpartitions,
+ int maxParallelism,
+ List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
+ CelebornConf celebornConf,
+ int numMappers,
+ BufferCompressor bufferCompressor,
+ RemoteShuffleDescriptor rsd) {
+ return new RemoteShuffleResultPartition(
+ taskNameWithSubtaskAndId,
+ partitionIndex,
+ id,
+ type,
+ numSubpartitions,
+ maxParallelism,
+ networkBufferSize,
+ partitionManager,
+ bufferCompressor,
+ bufferPoolFactories.get(0),
+ new RemoteShuffleOutputGate(
+ rsd,
+ numSubpartitions,
+ networkBufferSize,
+ bufferPoolFactories.get(1),
+ celebornConf,
+ numMappers));
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
new file mode 100644
index 000000000..bf95317bc
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+
+public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFactory
+ implements ShuffleServiceFactory<
+ RemoteShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> {
+
+ @Override
+ public ShuffleMaster<RemoteShuffleDescriptor> createShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext) {
+ return new RemoteShuffleMaster(shuffleMasterContext, new
SimpleResultPartitionAdapter());
+ }
+
+ @Override
+ public ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate>
createShuffleEnvironment(
+ ShuffleEnvironmentContext shuffleEnvironmentContext) {
+ AbstractRemoteShuffleServiceParameters parameters =
+ initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
+ RemoteShuffleResultPartitionFactory resultPartitionFactory =
+ new RemoteShuffleResultPartitionFactory(
+ parameters.celebornConf,
+ parameters.resultPartitionManager,
+ parameters.networkBufferPool,
+ parameters.bufferSize);
+ RemoteShuffleInputGateFactory inputGateFactory =
+ new RemoteShuffleInputGateFactory(
+ parameters.celebornConf, parameters.networkBufferPool,
parameters.bufferSize);
+
+ return new RemoteShuffleEnvironment(
+ parameters.networkBufferPool,
+ parameters.resultPartitionManager,
+ resultPartitionFactory,
+ inputGateFactory,
+ parameters.celebornConf);
+ }
+}
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
new file mode 100644
index 000000000..3476b8fff
--- /dev/null
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+public class SimpleResultPartitionAdapter implements ResultPartitionAdapter {
+ @Override
+ public boolean isBlockingResultPartition(ResultPartitionType partitionType) {
+ return partitionType.isBlockingOrBlockingPersistentResultPartition();
+ }
+}
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
new file mode 100644
index 000000000..8548ca8af
--- /dev/null
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class RemoteShuffleMasterTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteShuffleMasterTest.class);
+ private RemoteShuffleMaster remoteShuffleMaster;
+ private Configuration configuration;
+
+ @Before
+ public void setUp() {
+ configuration = new Configuration();
+ remoteShuffleMaster = createShuffleMaster(configuration);
+ }
+
+ @Test
+ public void testRegisterJob() {
+ JobShuffleContext jobShuffleContext =
createJobShuffleContext(JobID.generate());
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ // reRunRegister job
+ try {
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+ // unRegister job
+ remoteShuffleMaster.unregisterJob(jobShuffleContext.getJobId());
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+ }
+
+ @Test
+ public void testRegisterPartitionWithProducer()
+ throws UnknownHostException, ExecutionException, InterruptedException {
+ JobID jobID = JobID.generate();
+ JobShuffleContext jobShuffleContext = createJobShuffleContext(jobID);
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+ PartitionDescriptor partitionDescriptor =
createPartitionDescriptor(intermediateDataSetID, 0);
+ ProducerDescriptor producerDescriptor = createProducerDescriptor();
+ RemoteShuffleDescriptor remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ ShuffleResource shuffleResource =
remoteShuffleDescriptor.getShuffleResource();
+ ShuffleResourceDescriptor mapPartitionShuffleDescriptor =
+ shuffleResource.getMapPartitionShuffleDescriptor();
+
+ LOG.info("remoteShuffleDescriptor:{}", remoteShuffleDescriptor);
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getPartitionId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getMapId());
+
+ // use same dataset id
+ partitionDescriptor = createPartitionDescriptor(intermediateDataSetID, 1);
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+
+ // use another attemptId
+ producerDescriptor = createProducerDescriptor();
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+ }
+
+ @Test
+ public void testRegisterMultipleJobs()
+ throws UnknownHostException, ExecutionException, InterruptedException {
+ JobID jobID1 = JobID.generate();
+ JobShuffleContext jobShuffleContext1 = createJobShuffleContext(jobID1);
+ remoteShuffleMaster.registerJob(jobShuffleContext1);
+
+ JobID jobID2 = JobID.generate();
+ JobShuffleContext jobShuffleContext2 = createJobShuffleContext(jobID2);
+ remoteShuffleMaster.registerJob(jobShuffleContext2);
+
+ IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+ PartitionDescriptor partitionDescriptor =
createPartitionDescriptor(intermediateDataSetID, 0);
+ ProducerDescriptor producerDescriptor = createProducerDescriptor();
+ RemoteShuffleDescriptor remoteShuffleDescriptor1 =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID1, partitionDescriptor,
producerDescriptor)
+ .get();
+
+ // use same datasetId but different jobId
+ RemoteShuffleDescriptor remoteShuffleDescriptor2 =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID2, partitionDescriptor,
producerDescriptor)
+ .get();
+
+ Assert.assertEquals(
+ remoteShuffleDescriptor1
+ .getShuffleResource()
+ .getMapPartitionShuffleDescriptor()
+ .getShuffleId(),
+ 0);
+ Assert.assertEquals(
+ remoteShuffleDescriptor2
+ .getShuffleResource()
+ .getMapPartitionShuffleDescriptor()
+ .getShuffleId(),
+ 1);
+ }
+
+ @Test
+ public void testShuffleMemoryAnnouncing() {
+ Map<IntermediateDataSetID, Integer> numberOfInputGateChannels = new
HashMap<>();
+ Map<IntermediateDataSetID, Integer> numbersOfResultSubpartitions = new
HashMap<>();
+ Map<IntermediateDataSetID, Integer> subPartitionNums = new HashMap<>();
+ Map<IntermediateDataSetID, ResultPartitionType> inputPartitionTypes = new
HashMap<>();
+ Map<IntermediateDataSetID, ResultPartitionType> resultPartitionTypes = new
HashMap<>();
+ IntermediateDataSetID inputDataSetID0 = new IntermediateDataSetID();
+ IntermediateDataSetID inputDataSetID1 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID0 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID1 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID2 = new IntermediateDataSetID();
+ Random random = new Random();
+ numberOfInputGateChannels.put(inputDataSetID0, random.nextInt(1000));
+ numberOfInputGateChannels.put(inputDataSetID1, random.nextInt(1000));
+ int subPartitionNum1 = random.nextInt(1000);
+ int subPartitionNum2 = random.nextInt(1000);
+ int subPartitionNum3 = random.nextInt(1000);
+ numbersOfResultSubpartitions.put(outputDataSetID0, subPartitionNum1);
+ numbersOfResultSubpartitions.put(outputDataSetID1, subPartitionNum2);
+ numbersOfResultSubpartitions.put(outputDataSetID2, subPartitionNum3);
+ subPartitionNums.put(outputDataSetID0, subPartitionNum1);
+ subPartitionNums.put(outputDataSetID1, subPartitionNum2);
+ subPartitionNums.put(outputDataSetID2, subPartitionNum3);
+ inputPartitionTypes.put(inputDataSetID0, ResultPartitionType.BLOCKING);
+ inputPartitionTypes.put(inputDataSetID1, ResultPartitionType.BLOCKING);
+ resultPartitionTypes.put(outputDataSetID0, ResultPartitionType.BLOCKING);
+ resultPartitionTypes.put(outputDataSetID1, ResultPartitionType.BLOCKING);
+ resultPartitionTypes.put(outputDataSetID2, ResultPartitionType.BLOCKING);
+ MemorySize calculated =
+ remoteShuffleMaster.computeShuffleMemorySizeForTask(
+ TaskInputsOutputsDescriptor.from(
+ 3,
+ numberOfInputGateChannels,
+ numbersOfResultSubpartitions,
+ subPartitionNums,
+ inputPartitionTypes,
+ resultPartitionTypes));
+
+ CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
+
+ long numBytesPerGate = celebornConf.clientFlinkMemoryPerInputGate();
+ long expectedInput = 2 * numBytesPerGate;
+
+ long numBytesPerResultPartition =
celebornConf.clientFlinkMemoryPerResultPartition();
+ long expectedOutput = 3 * numBytesPerResultPartition;
+ MemorySize expected = new MemorySize(expectedInput + expectedOutput);
+
+ Assert.assertEquals(expected, calculated);
+ }
+
+ @After
+ public void tearDown() {
+ if (remoteShuffleMaster != null) {
+ try {
+ remoteShuffleMaster.close();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public RemoteShuffleMaster createShuffleMaster(Configuration configuration) {
+ remoteShuffleMaster =
+ new RemoteShuffleMaster(
+ new ShuffleMasterContext() {
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void onFatalError(Throwable throwable) {
+ System.exit(-1);
+ }
+ },
+ new SimpleResultPartitionAdapter());
+
+ return remoteShuffleMaster;
+ }
+
+ public JobShuffleContext createJobShuffleContext(JobID jobId) {
+ return new JobShuffleContext() {
+ @Override
+ public org.apache.flink.api.common.JobID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public CompletableFuture<?> stopTrackingAndReleasePartitions(
+ Collection<ResultPartitionID> collection) {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ }
+
+ public PartitionDescriptor createPartitionDescriptor(
+ IntermediateDataSetID intermediateDataSetId, int partitionNum) {
+ IntermediateResultPartitionID intermediateResultPartitionId =
+ new IntermediateResultPartitionID(intermediateDataSetId, partitionNum);
+ return new PartitionDescriptor(
+ intermediateDataSetId,
+ 10,
+ intermediateResultPartitionId,
+ ResultPartitionType.BLOCKING,
+ 5,
+ 1,
+ false,
+ false);
+ }
+
+ public ProducerDescriptor createProducerDescriptor() throws
UnknownHostException {
+ ExecutionAttemptID executionAttemptId =
+ new ExecutionAttemptID(
+ new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(0,
0), 0), 0);
+ return new ProducerDescriptor(
+ ResourceID.generate(), executionAttemptId, InetAddress.getLocalHost(),
100);
+ }
+}
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
new file mode 100644
index 000000000..622c0eea7
--- /dev/null
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+public class RemoteShuffleResultPartitionSuiteJ {
+ private final int networkBufferSize = 32 * 1024;
+ private BufferCompressor bufferCompressor = new
BufferCompressor(networkBufferSize, "lz4");
+ private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
+ private final String compressCodec = "LZ4";
+ private final CelebornConf conf = new CelebornConf();
+ BufferDecompressor bufferDecompressor = new
BufferDecompressor(networkBufferSize, "LZ4");
+
+ private static final int totalBuffers = 1000;
+
+ private static final int bufferSize = 1024;
+
+ private NetworkBufferPool globalBufferPool;
+
+ private BufferPool sortBufferPool;
+
+ private BufferPool nettyBufferPool;
+
+ private RemoteShuffleResultPartition partitionWriter;
+
+ private FakedRemoteShuffleOutputGate outputGate;
+
+ @Before
+ public void setup() {
+ globalBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (outputGate != null) {
+ outputGate.release();
+ }
+
+ if (sortBufferPool != null) {
+ sortBufferPool.lazyDestroy();
+ }
+ if (nettyBufferPool != null) {
+ nettyBufferPool.lazyDestroy();
+ }
+ assertEquals(totalBuffers,
globalBufferPool.getNumberOfAvailableMemorySegments());
+ globalBufferPool.destroy();
+ }
+
+ @Test
+ public void tesSimpleFlush() throws IOException, InterruptedException {
+ List<SupplierWithException<BufferPool, IOException>> bufferPool =
createBufferPoolFactory();
+ RemoteShuffleResultPartition remoteShuffleResultPartition =
+ new RemoteShuffleResultPartition(
+ "test",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ 2,
+ 2,
+ 32 * 1024,
+ new ResultPartitionManager(),
+ bufferCompressor,
+ bufferPool.get(0),
+ remoteShuffleOutputGate);
+ remoteShuffleResultPartition.setup();
+ doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
+ doNothing().when(remoteShuffleOutputGate).regionFinish();
+
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
+ SortBuffer sortBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
+ sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer,
true);
+ }
+
+ @Test
+ public void test123() throws IOException, InterruptedException {}
+
+ private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
+ NetworkBufferPool networkBufferPool =
+ new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
+
+ int numBuffersPerPartition = 64 * 1024 / 32;
+ int numForResultPartition = numBuffersPerPartition * 7 / 8;
+ int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+
+ List<SupplierWithException<BufferPool, IOException>> factories = new
ArrayList<>();
+ factories.add(
+ () -> networkBufferPool.createBufferPool(numForResultPartition,
numForResultPartition));
+ factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate,
numForOutputGate));
+ return factories;
+ }
+
+ @Test
+ public void testWriteNormalRecordWithCompressionEnabled() throws Exception {
+ testWriteNormalRecord(true);
+ }
+
+ @Test
+ public void testWriteNormalRecordWithCompressionDisabled() throws Exception {
+ testWriteNormalRecord(false);
+ }
+
+ @Test
+ public void testWriteLargeRecord() throws Exception {
+ int numSubpartitions = 2;
+ int numBuffers = 100;
+ initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+ partitionWriter.setup();
+
+ byte[] dataWritten = new byte[bufferSize * numBuffers];
+ Random random = new Random();
+ random.nextBytes(dataWritten);
+ ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+ partitionWriter.emitRecord(recordWritten, 0);
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+
+ List<Buffer> receivedBuffers = outputGate.getReceivedBuffers()[0];
+
+ ByteBuffer recordRead = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : receivedBuffers) {
+ if (buffer.isBuffer()) {
+ recordRead.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead.flip();
+ assertEquals(recordWritten, recordRead);
+ }
+
+ @Test
+ public void testBroadcastLargeRecord() throws Exception {
+ int numSubpartitions = 2;
+ int numBuffers = 100;
+ initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+ partitionWriter.setup();
+
+ byte[] dataWritten = new byte[bufferSize * numBuffers];
+ Random random = new Random();
+ random.nextBytes(dataWritten);
+ ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+ partitionWriter.broadcastRecord(recordWritten);
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+
+ ByteBuffer recordRead0 = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : outputGate.getReceivedBuffers()[0]) {
+ if (buffer.isBuffer()) {
+ recordRead0.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead0.flip();
+ assertEquals(recordWritten, recordRead0);
+
+ ByteBuffer recordRead1 = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : outputGate.getReceivedBuffers()[1]) {
+ if (buffer.isBuffer()) {
+ recordRead1.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead1.flip();
+ assertEquals(recordWritten, recordRead0);
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ int numSubpartitions = 10;
+
+ initResultPartitionWriter(numSubpartitions, 10, 20, false, conf, 10);
+ partitionWriter.setup();
+
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
+ assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
+ assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.flush(0);
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
+ assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.flushAll();
+ assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+ }
+
+ private void testWriteNormalRecord(boolean compressionEnabled) throws
Exception {
+ int numSubpartitions = 4;
+ int numRecords = 100;
+ Random random = new Random();
+
+ initResultPartitionWriter(numSubpartitions, 100, 500, compressionEnabled,
conf, 10);
+ partitionWriter.setup();
+ assertTrue(outputGate.isSetup());
+
+ Queue<DataAndType>[] dataWritten = new Queue[numSubpartitions];
+ IntStream.range(0, numSubpartitions).forEach(i -> dataWritten[i] = new
ArrayDeque<>());
+ int[] numBytesWritten = new int[numSubpartitions];
+ Arrays.fill(numBytesWritten, 0);
+
+ for (int i = 0; i < numRecords; i++) {
+ byte[] data = new byte[random.nextInt(2 * bufferSize) + 1];
+ if (compressionEnabled) {
+ byte randomByte = (byte) random.nextInt();
+ Arrays.fill(data, randomByte);
+ } else {
+ random.nextBytes(data);
+ }
+ ByteBuffer record = ByteBuffer.wrap(data);
+ boolean isBroadCast = random.nextBoolean();
+
+ if (isBroadCast) {
+ partitionWriter.broadcastRecord(record);
+ IntStream.range(0, numSubpartitions)
+ .forEach(
+ subpartition ->
+ recordDataWritten(
+ record,
+ Buffer.DataType.DATA_BUFFER,
+ subpartition,
+ dataWritten,
+ numBytesWritten));
+ } else {
+ int subpartition = random.nextInt(numSubpartitions);
+ partitionWriter.emitRecord(record, subpartition);
+ recordDataWritten(
+ record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
+ }
+ }
+
+ partitionWriter.finish();
+ assertTrue(outputGate.isFinished());
+ partitionWriter.close();
+ assertTrue(outputGate.isClosed());
+
+ for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
+ ByteBuffer record =
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+ recordDataWritten(
+ record, Buffer.DataType.EVENT_BUFFER, subpartition, dataWritten,
numBytesWritten);
+ }
+
+ outputGate
+ .getFinishedRegions()
+ .forEach(
+ regionIndex ->
assertTrue(outputGate.getNumBuffersByRegion().containsKey(regionIndex)));
+
+ int[] numBytesRead = new int[numSubpartitions];
+ List<Buffer>[] receivedBuffers = outputGate.getReceivedBuffers();
+ List<Buffer>[] validateTarget = new List[numSubpartitions];
+ Arrays.fill(numBytesRead, 0);
+ for (int i = 0; i < numSubpartitions; i++) {
+ validateTarget[i] = new ArrayList<>();
+ for (Buffer buffer : receivedBuffers[i]) {
+ for (Buffer unpackedBuffer : BufferPacker.unpack(buffer.asByteBuf())) {
+ if (compressionEnabled && unpackedBuffer.isCompressed()) {
+ Buffer decompressedBuffer =
+
bufferDecompressor.decompressToIntermediateBuffer(unpackedBuffer);
+ ByteBuffer decompressed =
decompressedBuffer.getNioBufferReadable();
+ int numBytes = decompressed.remaining();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ segment.put(0, decompressed, numBytes);
+ decompressedBuffer.recycleBuffer();
+ validateTarget[i].add(
+ new NetworkBuffer(segment, buf -> {},
unpackedBuffer.getDataType(), numBytes));
+ numBytesRead[i] += numBytes;
+ } else {
+ numBytesRead[i] += buffer.readableBytes();
+ validateTarget[i].add(buffer);
+ }
+ }
+ }
+ }
+ IntStream.range(0, numSubpartitions).forEach(subpartitions -> {});
+ checkWriteReadResult(
+ numSubpartitions, numBytesWritten, numBytesWritten, dataWritten,
validateTarget);
+ }
+
+ private void initResultPartitionWriter(
+ int numSubpartitions,
+ int sortBufferPoolSize,
+ int nettyBufferPoolSize,
+ boolean compressionEnabled,
+ CelebornConf conf,
+ int numMappers)
+ throws Exception {
+
+ sortBufferPool = globalBufferPool.createBufferPool(sortBufferPoolSize,
sortBufferPoolSize);
+ nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize,
nettyBufferPoolSize);
+
+ outputGate =
+ new FakedRemoteShuffleOutputGate(
+ getShuffleDescriptor(), numSubpartitions, () -> nettyBufferPool,
conf, numMappers);
+ outputGate.setup();
+
+ if (compressionEnabled) {
+ partitionWriter =
+ new RemoteShuffleResultPartition(
+ "RemoteShuffleResultPartitionWriterTest",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ numSubpartitions,
+ numSubpartitions,
+ bufferSize,
+ new ResultPartitionManager(),
+ bufferCompressor,
+ () -> sortBufferPool,
+ outputGate);
+ } else {
+ partitionWriter =
+ new RemoteShuffleResultPartition(
+ "RemoteShuffleResultPartitionWriterTest",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ numSubpartitions,
+ numSubpartitions,
+ bufferSize,
+ new ResultPartitionManager(),
+ null,
+ () -> sortBufferPool,
+ outputGate);
+ }
+ }
+
+ private void recordDataWritten(
+ ByteBuffer record,
+ Buffer.DataType dataType,
+ int subpartition,
+ Queue<DataAndType>[] dataWritten,
+ int[] numBytesWritten) {
+
+ record.rewind();
+ dataWritten[subpartition].add(new DataAndType(record, dataType));
+ numBytesWritten[subpartition] += record.remaining();
+ }
+
+ private static class FakedRemoteShuffleOutputGate extends
RemoteShuffleOutputGate {
+
+ private boolean isSetup;
+ private boolean isFinished;
+ private boolean isClosed;
+ private final List<Buffer>[] receivedBuffers;
+ private final Map<Integer, Integer> numBuffersByRegion;
+ private final Set<Integer> finishedRegions;
+ private int currentRegionIndex;
+ private boolean currentIsBroadcast;
+
+ FakedRemoteShuffleOutputGate(
+ RemoteShuffleDescriptor shuffleDescriptor,
+ int numSubpartitions,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ CelebornConf celebornConf,
+ int numMappers) {
+
+ super(
+ shuffleDescriptor,
+ numSubpartitions,
+ bufferSize,
+ bufferPoolFactory,
+ celebornConf,
+ numMappers);
+ isSetup = false;
+ isFinished = false;
+ isClosed = false;
+ numBuffersByRegion = new HashMap<>();
+ finishedRegions = new HashSet<>();
+ currentRegionIndex = -1;
+ receivedBuffers = new ArrayList[numSubpartitions];
+ IntStream.range(0, numSubpartitions).forEach(i -> receivedBuffers[i] =
new ArrayList<>());
+ currentIsBroadcast = false;
+ }
+
+ @Override
+ FlinkShuffleClientImpl getShuffleClient() {
+ FlinkShuffleClientImpl client = mock(FlinkShuffleClientImpl.class);
+ doNothing().when(client).cleanup(anyInt(), anyInt(), anyInt());
+ return client;
+ }
+
+ @Override
+ public void setup() throws IOException, InterruptedException {
+ bufferPool = bufferPoolFactory.get();
+ isSetup = true;
+ }
+
+ @Override
+ public void write(Buffer buffer, int subIdx) {
+ if (currentIsBroadcast) {
+ assertEquals(0, subIdx);
+ ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+ for (int i = 0; i < numSubs; i++) {
+ int numBytes = buffer.readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ byteBuffer.rewind();
+ segment.put(0, byteBuffer, numBytes);
+ receivedBuffers[i].add(
+ new NetworkBuffer(
+ segment, buf -> {}, buffer.getDataType(),
buffer.isCompressed(), numBytes));
+ }
+ buffer.recycleBuffer();
+ } else {
+ receivedBuffers[subIdx].add(buffer);
+ }
+ if (numBuffersByRegion.containsKey(currentRegionIndex)) {
+ int prev = numBuffersByRegion.get(currentRegionIndex);
+ numBuffersByRegion.put(currentRegionIndex, prev + 1);
+ } else {
+ numBuffersByRegion.put(currentRegionIndex, 1);
+ }
+ }
+
+ @Override
+ public void regionStart(boolean isBroadcast) {
+ currentIsBroadcast = isBroadcast;
+ currentRegionIndex++;
+ }
+
+ @Override
+ public void regionFinish() {
+ if (finishedRegions.contains(currentRegionIndex)) {
+ throw new IllegalStateException("Unexpected region: " +
currentRegionIndex);
+ }
+ finishedRegions.add(currentRegionIndex);
+ }
+
+ @Override
+ public void finish() throws InterruptedException {
+ isFinished = true;
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ public List<Buffer>[] getReceivedBuffers() {
+ return receivedBuffers;
+ }
+
+ public Map<Integer, Integer> getNumBuffersByRegion() {
+ return numBuffersByRegion;
+ }
+
+ public Set<Integer> getFinishedRegions() {
+ return finishedRegions;
+ }
+
+ public boolean isSetup() {
+ return isSetup;
+ }
+
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ public void release() throws Exception {
+ IntStream.range(0, numSubs)
+ .forEach(
+ subpartitionIndex -> {
+
receivedBuffers[subpartitionIndex].forEach(Buffer::recycleBuffer);
+ receivedBuffers[subpartitionIndex].clear();
+ });
+ numBuffersByRegion.clear();
+ finishedRegions.clear();
+ super.close();
+ }
+ }
+
+ private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception {
+ Random random = new Random();
+ byte[] bytes = new byte[16];
+ random.nextBytes(bytes);
+ return new RemoteShuffleDescriptor(
+ new JobID(bytes).toString(),
+ new JobID(bytes),
+ new JobID(bytes).toString(),
+ new ResultPartitionID(),
+ new RemoteShuffleResource(
+ "1", 2, System.currentTimeMillis(), new
ShuffleResourceDescriptor(1, 1, 1, 0)));
+ }
+
+ /** Data written and its {@link Buffer.DataType}. */
+ public static class DataAndType {
+ private final ByteBuffer data;
+ private final Buffer.DataType dataType;
+
+ DataAndType(ByteBuffer data, Buffer.DataType dataType) {
+ this.data = data;
+ this.dataType = dataType;
+ }
+ }
+
+ public static void checkWriteReadResult(
+ int numSubpartitions,
+ int[] numBytesWritten,
+ int[] numBytesRead,
+ Queue<DataAndType>[] dataWritten,
+ Collection<Buffer>[] buffersRead) {
+ for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions;
++subpartitionIndex) {
+ assertEquals(numBytesWritten[subpartitionIndex],
numBytesRead[subpartitionIndex]);
+
+ List<DataAndType> eventsWritten = new ArrayList<>();
+ List<Buffer> eventsRead = new ArrayList<>();
+
+ ByteBuffer subpartitionDataWritten =
ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
+ for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
+ subpartitionDataWritten.put(dataAndType.data);
+ dataAndType.data.rewind();
+ if (dataAndType.dataType.isEvent()) {
+ eventsWritten.add(dataAndType);
+ }
+ }
+
+ ByteBuffer subpartitionDataRead =
ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
+ for (Buffer buffer : buffersRead[subpartitionIndex]) {
+ subpartitionDataRead.put(buffer.getNioBufferReadable());
+ if (!buffer.isBuffer()) {
+ eventsRead.add(buffer);
+ }
+ }
+
+ subpartitionDataWritten.flip();
+ subpartitionDataRead.flip();
+ assertEquals(subpartitionDataWritten, subpartitionDataRead);
+
+ assertEquals(eventsWritten.size(), eventsRead.size());
+ for (int i = 0; i < eventsWritten.size(); ++i) {
+ assertEquals(eventsWritten.get(i).dataType,
eventsRead.get(i).getDataType());
+ assertEquals(eventsWritten.get(i).data,
eventsRead.get(i).getNioBufferReadable());
+ }
+ }
+ }
+}
diff --git
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
new file mode 100644
index 000000000..73dbb6a9d
--- /dev/null
+++
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RemoteShuffleServiceFactorySuitJ {
+ @Test
+ public void testCreateShuffleEnvironment() {
+ RemoteShuffleServiceFactory remoteShuffleServiceFactory = new
RemoteShuffleServiceFactory();
+ ShuffleEnvironmentContext shuffleEnvironmentContext =
mock(ShuffleEnvironmentContext.class);
+ when(shuffleEnvironmentContext.getConfiguration()).thenReturn(new
Configuration());
+ when(shuffleEnvironmentContext.getNetworkMemorySize())
+ .thenReturn(new MemorySize(64 * 1024 * 1024));
+ MetricGroup parentMeric = mock(MetricGroup.class);
+
when(shuffleEnvironmentContext.getParentMetricGroup()).thenReturn(parentMeric);
+ MetricGroup childGroup = mock(MetricGroup.class);
+ MetricGroup childChildGroup = mock(MetricGroup.class);
+ when(parentMeric.addGroup(anyString())).thenReturn(childGroup);
+ when(childGroup.addGroup(any())).thenReturn(childChildGroup);
+ when(childChildGroup.gauge(any(), any())).thenReturn(null);
+ ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate>
shuffleEnvironment =
+
remoteShuffleServiceFactory.createShuffleEnvironment(shuffleEnvironmentContext);
+ Assert.assertEquals(
+ 32 * 1024,
+ ((RemoteShuffleEnvironment) shuffleEnvironment)
+ .getResultPartitionFactory()
+ .getNetworkBufferSize());
+ }
+}
diff --git a/dev/dependencies.sh b/dev/dependencies.sh
index 5824c0ba5..73c6c41c4 100755
--- a/dev/dependencies.sh
+++ b/dev/dependencies.sh
@@ -186,6 +186,10 @@ case "$MODULE" in
MVN_MODULES="client-flink/flink-1.17"
SBT_PROJECT="celeborn-client-flink-1_17"
;;
+ "flink-1.18")
+ MVN_MODULES="client-flink/flink-1.18"
+ SBT_PROJECT="celeborn-client-flink-1_18"
+ ;;
"mr")
MVN_MODULES="client-mr/mr"
SBT_PROJECT="celeborn-client-mr"
diff --git a/dev/deps/dependencies-client-flink-1.18
b/dev/deps/dependencies-client-flink-1.18
new file mode 100644
index 000000000..7e8c870ff
--- /dev/null
+++ b/dev/deps/dependencies-client-flink-1.18
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+commons-crypto/1.0.0//commons-crypto-1.0.0.jar
+commons-io/2.13.0//commons-io-2.13.0.jar
+commons-lang3/3.12.0//commons-lang3-3.12.0.jar
+commons-logging/1.1.3//commons-logging-1.1.3.jar
+guava/14.0.1//guava-14.0.1.jar
+hadoop-client-api/3.2.4//hadoop-client-api-3.2.4.jar
+hadoop-client-runtime/3.2.4//hadoop-client-runtime-3.2.4.jar
+htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
+jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar
+jsr305/1.3.9//jsr305-1.3.9.jar
+jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
+leveldbjni-all/1.8//leveldbjni-all-1.8.jar
+lz4-java/1.8.0//lz4-java-1.8.0.jar
+metrics-core/3.2.6//metrics-core-3.2.6.jar
+metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
+metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
+netty-all/4.1.93.Final//netty-all-4.1.93.Final.jar
+netty-buffer/4.1.93.Final//netty-buffer-4.1.93.Final.jar
+netty-codec-dns/4.1.93.Final//netty-codec-dns-4.1.93.Final.jar
+netty-codec-haproxy/4.1.93.Final//netty-codec-haproxy-4.1.93.Final.jar
+netty-codec-http/4.1.93.Final//netty-codec-http-4.1.93.Final.jar
+netty-codec-http2/4.1.93.Final//netty-codec-http2-4.1.93.Final.jar
+netty-codec-memcache/4.1.93.Final//netty-codec-memcache-4.1.93.Final.jar
+netty-codec-mqtt/4.1.93.Final//netty-codec-mqtt-4.1.93.Final.jar
+netty-codec-redis/4.1.93.Final//netty-codec-redis-4.1.93.Final.jar
+netty-codec-smtp/4.1.93.Final//netty-codec-smtp-4.1.93.Final.jar
+netty-codec-socks/4.1.93.Final//netty-codec-socks-4.1.93.Final.jar
+netty-codec-stomp/4.1.93.Final//netty-codec-stomp-4.1.93.Final.jar
+netty-codec-xml/4.1.93.Final//netty-codec-xml-4.1.93.Final.jar
+netty-codec/4.1.93.Final//netty-codec-4.1.93.Final.jar
+netty-common/4.1.93.Final//netty-common-4.1.93.Final.jar
+netty-handler-proxy/4.1.93.Final//netty-handler-proxy-4.1.93.Final.jar
+netty-handler/4.1.93.Final//netty-handler-4.1.93.Final.jar
+netty-resolver-dns-classes-macos/4.1.93.Final//netty-resolver-dns-classes-macos-4.1.93.Final.jar
+netty-resolver-dns-native-macos/4.1.93.Final/osx-aarch_64/netty-resolver-dns-native-macos-4.1.93.Final-osx-aarch_64.jar
+netty-resolver-dns-native-macos/4.1.93.Final/osx-x86_64/netty-resolver-dns-native-macos-4.1.93.Final-osx-x86_64.jar
+netty-resolver-dns/4.1.93.Final//netty-resolver-dns-4.1.93.Final.jar
+netty-resolver/4.1.93.Final//netty-resolver-4.1.93.Final.jar
+netty-transport-classes-epoll/4.1.93.Final//netty-transport-classes-epoll-4.1.93.Final.jar
+netty-transport-classes-kqueue/4.1.93.Final//netty-transport-classes-kqueue-4.1.93.Final.jar
+netty-transport-native-epoll/4.1.93.Final/linux-aarch_64/netty-transport-native-epoll-4.1.93.Final-linux-aarch_64.jar
+netty-transport-native-epoll/4.1.93.Final/linux-x86_64/netty-transport-native-epoll-4.1.93.Final-linux-x86_64.jar
+netty-transport-native-kqueue/4.1.93.Final/osx-aarch_64/netty-transport-native-kqueue-4.1.93.Final-osx-aarch_64.jar
+netty-transport-native-kqueue/4.1.93.Final/osx-x86_64/netty-transport-native-kqueue-4.1.93.Final-osx-x86_64.jar
+netty-transport-native-unix-common/4.1.93.Final//netty-transport-native-unix-common-4.1.93.Final.jar
+netty-transport-rxtx/4.1.93.Final//netty-transport-rxtx-4.1.93.Final.jar
+netty-transport-sctp/4.1.93.Final//netty-transport-sctp-4.1.93.Final.jar
+netty-transport-udt/4.1.93.Final//netty-transport-udt-4.1.93.Final.jar
+netty-transport/4.1.93.Final//netty-transport-4.1.93.Final.jar
+protobuf-java/3.19.2//protobuf-java-3.19.2.jar
+ratis-client/2.5.1//ratis-client-2.5.1.jar
+ratis-common/2.5.1//ratis-common-2.5.1.jar
+ratis-proto/2.5.1//ratis-proto-2.5.1.jar
+ratis-thirdparty-misc/1.0.4//ratis-thirdparty-misc-1.0.4.jar
+scala-library/2.12.15//scala-library-2.12.15.jar
+scala-reflect/2.12.15//scala-reflect-2.12.15.jar
+shims/0.9.32//shims-0.9.32.jar
+slf4j-api/1.7.36//slf4j-api-1.7.36.jar
+snakeyaml/1.33//snakeyaml-1.33.jar
+zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/dev/reformat b/dev/reformat
index 11b6e6775..0b30a3776 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -22,6 +22,7 @@ PROJECT_DIR="$(cd "`dirname "$0"`/.."; pwd)"
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.14
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.15
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.17
+${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.18
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
${PROJECT_DIR}/build/mvn spotless:apply -Pmr
diff --git a/docs/README.md b/docs/README.md
index b026f5581..7cd683bc9 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -119,7 +119,7 @@ INFO [async-reply] Controller: CommitFiles for
local-1690000152711-0 success wit
## Start Flink with Celeborn
#### Copy Celeborn Client to Flink's lib
-Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x and
Flink 1.17.x, copy the corresponding client jar into Flink's
+Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink
1.17.x and Flink 1.18.x, copy the corresponding client jar into Flink's
`lib/` directory:
```shell
cp $CELEBORN_HOME/flink/<Celeborn Client Jar> $FLINK_HOME/lib/
diff --git a/docs/developers/sbt.md b/docs/developers/sbt.md
index 484241f68..35b324d3c 100644
--- a/docs/developers/sbt.md
+++ b/docs/developers/sbt.md
@@ -38,6 +38,7 @@ The following table indicates the compatibility of Celeborn
Spark and Flink clie
| Flink 1.14 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.15 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.17 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
+| Flink 1.18 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
## Useful SBT commands
diff --git a/pom.xml b/pom.xml
index 0510f59c4..8ba97fbff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1265,6 +1265,26 @@
</properties>
</profile>
+ <profile>
+ <id>flink-1.18</id>
+ <modules>
+ <module>client-flink/common</module>
+ <module>client-flink/flink-1.18</module>
+ <module>client-flink/flink-1.18-shaded</module>
+ <module>tests/flink-it</module>
+ </modules>
+ <properties>
+ <flink.version>1.18.0</flink.version>
+ <flink.binary.version>1.18</flink.binary.version>
+ <scala.binary.version>2.12</scala.binary.version>
+
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.18_${scala.binary.version}</celeborn.flink.plugin.artifact>
+ <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+ <flink.clients.artifact>flink-clients</flink.clients.artifact>
+
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
+
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
+ </properties>
+ </profile>
+
<profile>
<id>mr</id>
<modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 708e86ba1..168a6f3bd 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -275,6 +275,7 @@ object Utils {
case Some("flink-1.14") => Some(Flink114)
case Some("flink-1.15") => Some(Flink115)
case Some("flink-1.17") => Some(Flink117)
+ case Some("flink-1.18") => Some(Flink118)
case _ => None
}
@@ -749,6 +750,16 @@ object Flink117 extends FlinkClientProjects {
val flinkClientShadedProjectName: String =
"celeborn-client-flink-1_17-shaded"
}
+object Flink118 extends FlinkClientProjects {
+ val flinkVersion = "1.18.0"
+
+ // note that SBT does not allow using the period symbol (.) in project names.
+ val flinkClientProjectPath = "client-flink/flink-1.18"
+ val flinkClientProjectName = "celeborn-client-flink-1_18"
+ val flinkClientShadedProjectPath: String = "client-flink/flink-1.18-shaded"
+ val flinkClientShadedProjectName: String =
"celeborn-client-flink-1_18-shaded"
+}
+
trait FlinkClientProjects {
val flinkVersion: String