This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new efa22a493 [CELEBORN-1105][FLINK] Support Flink 1.18
efa22a493 is described below

commit efa22a4936335d98195ed99d8cabe1fd89d3a3a6
Author: sychen <[email protected]>
AuthorDate: Mon Nov 6 15:53:39 2023 +0800

    [CELEBORN-1105][FLINK] Support Flink 1.18
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    ```bash
    flink-1.18.0
    ./bin/start-cluster.sh
    ./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
    ```
    
    ```java
    Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.<init>(Ljava/lang/String;ILorg/apache/flink/runtime/jobgraph/IntermediateDataSetID;Lorg/apache/flink/runtime/io/network/partition/ResultPartitionType;Lorg/apache/flink/runtime/executiongraph/IndexRange;ILorg/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider;Lorg/apache/flink/util/function/SupplierWithException;Lorg/apache/flink/runtime/io/network/buffer
 [...]
            at 
org.apache.celeborn.plugin.flink.RemoteShuffleInputGate$FakedRemoteInputChannel.<init>(RemoteShuffleInputGate.java:225)
            at 
org.apache.celeborn.plugin.flink.RemoteShuffleInputGate.getChannel(RemoteShuffleInputGate.java:179)
            at 
org.apache.flink.runtime.io.network.partition.consumer.InputGate.setChannelStateWriter(InputGate.java:90)
            at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setChannelStateWriter(InputGateWithMetrics.java:120)
            at 
org.apache.flink.streaming.runtime.tasks.StreamTask.injectChannelStateWriterIntoChannels(StreamTask.java:524)
            at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:496)
    ```
    
    Flink 1.18.0 release
    
https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
    
    Interface `org.apache.flink.runtime.io.network.buffer.Buffer` adds 
`setRecycler` method.
    [[FLINK-32549](https://issues.apache.org/jira/browse/FLINK-32549)][network] 
Tiered storage memory manager supports ownership transfer for buffers
    
    `org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate` 
constructor adds parameters.
    [[FLINK-31638](https://issues.apache.org/jira/browse/FLINK-31638)][network] 
Introduce the TieredStorageConsumerClient to SingleInputGate
    [[FLINK-31642](https://issues.apache.org/jira/browse/FLINK-31642)][network] 
Introduce the MemoryTierConsumerAgent to TieredStorageConsumerClient
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    ```bash
    flink-1.18.0 ./bin/flink run examples/streaming/WordCount.jar 
--execution-mode BATCH
    Executing example with default input data.
    Use --input to specify file input.
    Printing result to stdout. Use --output to specify output path.
    Job has been submitted with JobID d7fc5f0ca018a54e9453c4d35f7c598a
    Program execution finished
    Job with JobID d7fc5f0ca018a54e9453c4d35f7c598a has finished.
    Job Runtime: 1635 ms
    ```
    
    <img width="1297" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/3898450/6a5266bf-2386-4386-b98b-a60d2570fa99";>
    
    Closes #2063 from cxzl25/CELEBORN-1105.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .github/workflows/deps.yml                         |   2 +
 .github/workflows/maven.yml                        |   1 +
 .github/workflows/sbt.yml                          |   1 +
 .github/workflows/style.yml                        |   1 +
 README.md                                          |   5 +-
 build/make-distribution.sh                         |   2 +
 build/release/release.sh                           |   8 +
 .../celeborn/plugin/flink/buffer/BufferPacker.java |  11 +
 client-flink/flink-1.18-shaded/pom.xml             | 136 +++++
 .../src/main/resources/META-INF/LICENSE            | 247 ++++++++
 .../src/main/resources/META-INF/NOTICE             |  13 +
 .../META-INF/licenses/LICENSE-protobuf.txt         |  42 ++
 client-flink/flink-1.18/pom.xml                    |  69 +++
 .../plugin/flink/RemoteShuffleEnvironment.java     |  82 +++
 .../plugin/flink/RemoteShuffleInputGate.java       | 301 ++++++++++
 .../flink/RemoteShuffleInputGateFactory.java       |  55 ++
 .../plugin/flink/RemoteShuffleResultPartition.java | 225 ++++++++
 .../flink/RemoteShuffleResultPartitionFactory.java |  79 +++
 .../plugin/flink/RemoteShuffleServiceFactory.java  |  60 ++
 .../plugin/flink/SimpleResultPartitionAdapter.java |  27 +
 .../plugin/flink/RemoteShuffleMasterTest.java      | 290 ++++++++++
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 621 +++++++++++++++++++++
 .../flink/RemoteShuffleServiceFactorySuitJ.java    |  58 ++
 dev/dependencies.sh                                |   4 +
 dev/deps/dependencies-client-flink-1.18            |  78 +++
 dev/reformat                                       |   1 +
 docs/README.md                                     |   2 +-
 docs/developers/sbt.md                             |   1 +
 pom.xml                                            |  20 +
 project/CelebornBuild.scala                        |  11 +
 30 files changed, 2450 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/deps.yml b/.github/workflows/deps.yml
index 32d551ce1..a1318d009 100644
--- a/.github/workflows/deps.yml
+++ b/.github/workflows/deps.yml
@@ -50,6 +50,7 @@ jobs:
           - 'flink-1.14'
           - 'flink-1.15'
           - 'flink-1.17'
+          - 'flink-1.18'
           - 'mr'
     steps:
     - uses: actions/checkout@v2
@@ -80,6 +81,7 @@ jobs:
           - 'flink-1.14'
           - 'flink-1.15'
           - 'flink-1.17'
+          - 'flink-1.18'
           - 'mr'
     steps:
     - uses: actions/checkout@v2
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 1080b615c..43e602d63 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -154,6 +154,7 @@ jobs:
           - '1.14'
           - '1.15'
           - '1.17'
+          - '1.18'
     steps:
       - uses: actions/checkout@v2
       - name: Setup JDK ${{ matrix.java }}
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index e778f70be..740e6bdf7 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -203,6 +203,7 @@ jobs:
           - '1.14'
           - '1.15'
           - '1.17'
+          - '1.18'
     steps:
       - uses: actions/checkout@v2
       - name: Setup JDK ${{ matrix.java }}
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index 377cf7822..55c885952 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -50,6 +50,7 @@ jobs:
           build/mvn spotless:check -Pgoogle-mirror,flink-1.14
           build/mvn spotless:check -Pgoogle-mirror,flink-1.15
           build/mvn spotless:check -Pgoogle-mirror,flink-1.17
+          build/mvn spotless:check -Pgoogle-mirror,flink-1.18
           build/mvn spotless:check -Pgoogle-mirror,spark-2.4
           build/mvn spotless:check -Pgoogle-mirror,spark-3.3
           build/mvn spotless:check -Pgoogle-mirror,mr
diff --git a/README.md b/README.md
index 2472ce4c1..1820734be 100644
--- a/README.md
+++ b/README.md
@@ -41,12 +41,12 @@ Celeborn Worker's slot count is decided by `total usable 
disk size / average shu
 Celeborn worker's slot count decreases when a partition is allocated and 
increments when a partition is freed.
 
 ## Build
-1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5, Flink 1.14/1.15/1.17 
and Hadoop MapReduce 2/3.
+1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5, Flink 
1.14/1.15/1.17/1.18 and Hadoop MapReduce 2/3.
 2. Celeborn tested under Scala 2.11/2.12/2.13 and Java 8/11/17 environment.
 
 Build Celeborn
 ```shell
-./build/make-distribution.sh 
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pmr
+./build/make-distribution.sh 
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pflink-1.18/-Pmr
 ```
 
 package apache-celeborn-${project.version}-bin.tgz will be generated.
@@ -65,6 +65,7 @@ package apache-celeborn-${project.version}-bin.tgz will be 
generated.
 | Flink 1.14 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 | Flink 1.15 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 | Flink 1.17 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
+| Flink 1.18 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 
 ### Package Details
 Build procedure will create a compressed package.
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 6dd19e249..7aa4fc9c2 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -309,6 +309,7 @@ if [ "$SBT_ENABLED" == "true" ]; then
     sbt_build_client -Pflink-1.14
     sbt_build_client -Pflink-1.15
     sbt_build_client -Pflink-1.17
+    sbt_build_client -Pflink-1.18
     sbt_build_client -Pmr
   else
     echo "build client with $@"
@@ -339,6 +340,7 @@ else
     build_flink_client -Pflink-1.14
     build_flink_client -Pflink-1.15
     build_flink_client -Pflink-1.17
+    build_flink_client -Pflink-1.18
     build_mr_client -Pmr
   else
     ## build release package on demand
diff --git a/build/release/release.sh b/build/release/release.sh
index ce89a981b..a87d2d31c 100755
--- a/build/release/release.sh
+++ b/build/release/release.sh
@@ -136,6 +136,14 @@ upload_nexus_staging() {
   ${PROJECT_DIR}/build/mvn deploy -DskipTests -Papache-release,flink-1.17 \
     -s "${PROJECT_DIR}/build/release/asf-settings.xml" \
     -pl :celeborn-client-flink-1.17-shaded_2.12
+
+  echo "Deploying celeborn-client-flink-1.18-shaded_2.12"
+  ${PROJECT_DIR}/build/mvn clean install -DskipTests 
-Papache-release,flink-1.18 \
+    -s "${PROJECT_DIR}/build/release/asf-settings.xml" \
+    -pl :celeborn-client-flink-1.18-shaded_2.12 -am
+  ${PROJECT_DIR}/build/mvn deploy -DskipTests -Papache-release,flink-1.18 \
+    -s "${PROJECT_DIR}/build/release/asf-settings.xml" \
+    -pl :celeborn-client-flink-1.18-shaded_2.12
 }
 
 finalize_svn() {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
index 53c887d42..c0176e111 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
+import org.apache.celeborn.reflect.DynMethods;
 
 /** Harness used to pack multiple partial buffers together as a full one. */
 public class BufferPacker {
@@ -216,6 +217,16 @@ public class BufferPacker {
       return buffer.getRecycler();
     }
 
+    // Flink 1.18.0
+    // [FLINK-32549][network] Tiered storage memory manager supports ownership 
transfer for buffers
+    public void setRecycler(BufferRecycler bufferRecycler) {
+      DynMethods.builder("setRecycler")
+          .impl(buffer.getClass().getName(), BufferRecycler.class)
+          .build()
+          .bind(buffer)
+          .invoke(bufferRecycler);
+    }
+
     @Override
     public void recycleBuffer() {
       buffer.recycleBuffer();
diff --git a/client-flink/flink-1.18-shaded/pom.xml 
b/client-flink/flink-1.18-shaded/pom.xml
new file mode 100644
index 000000000..b03e83029
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  
<artifactId>celeborn-client-flink-1.18-shaded_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Shaded Client for Flink 1.18</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-flink-1.18_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <relocations>
+            <relocation>
+              <pattern>com.google.protobuf</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>com.google.common</pattern>
+              
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>io.netty</pattern>
+              <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons</pattern>
+              
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.roaringbitmap</pattern>
+              
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+            </relocation>
+          </relocations>
+          <artifactSet>
+            <includes>
+              <include>org.apache.celeborn:*</include>
+              <include>com.google.protobuf:protobuf-java</include>
+              <include>com.google.guava:guava</include>
+              <include>io.netty:*</include>
+              <include>org.apache.commons:commons-lang3</include>
+              <include>org.roaringbitmap:RoaringBitmap</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>**/*.proto</exclude>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+                <exclude>**/log4j.properties</exclude>
+                <exclude>META-INF/LICENSE.txt</exclude>
+                <exclude>META-INF/NOTICE.txt</exclude>
+                <exclude>LICENSE.txt</exclude>
+                <exclude>NOTICE.txt</exclude>
+              </excludes>
+            </filter>
+          </filters>
+          <transformers>
+            <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+          </transformers>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>${maven.plugin.antrun.version}</version>
+        <executions>
+          <execution>
+            <id>rename-native-library</id>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <target>
+                <echo message="unpacking netty jar"></echo>
+                <unzip dest="${project.build.directory}/unpacked/" 
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+                <echo message="renaming native epoll library"></echo>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_x86_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so" 
type="glob"></mapper>
+                </move>
+                <move includeemptydirs="false" 
todir="${project.build.directory}/unpacked/META-INF/native">
+                  <fileset 
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+                  <mapper from="libnetty_transport_native_epoll_aarch_64.so" 
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so" 
type="glob"></mapper>
+                </move>
+                <echo message="deleting native kqueue library"></echo>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+                <delete 
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+                <echo message="repackaging netty jar"></echo>
+                <jar basedir="${project.build.directory}/unpacked" 
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE 
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000..0441af2ac
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,247 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+------------------------------------------------------------------------------------
+This project bundles the following dependencies under the Apache License 2.0 
(http://www.apache.org/licenses/LICENSE-2.0.txt):
+
+
+Apache License 2.0
+--------------------------------------
+
+org.apache.commons:commons-lang3
+io.netty:netty-all
+io.netty:netty-buffer
+io.netty:netty-codec
+io.netty:netty-codec-dns
+io.netty:netty-codec-haproxy
+io.netty:netty-codec-http
+io.netty:netty-codec-http2
+io.netty:netty-codec-memcache
+io.netty:netty-codec-mqtt
+io.netty:netty-codec-redis
+io.netty:netty-codec-smtp
+io.netty:netty-codec-socks
+io.netty:netty-codec-stomp
+io.netty:netty-codec-xml
+io.netty:netty-common
+io.netty:netty-handler
+io.netty:netty-transport-native-unix-common
+io.netty:netty-handler-proxy
+io.netty:netty-resolver
+io.netty:netty-resolver-dns
+io.netty:netty-transport
+io.netty:netty-transport-rxtx
+io.netty:netty-transport-sctp
+io.netty:netty-transport-udt
+io.netty:netty-transport-classes-epoll
+io.netty:netty-transport-classes-kqueue
+io.netty:netty-transport-native-epoll
+io.netty:netty-transport-native-kqueue
+com.google.guava:guava
+org.roaringbitmap:RoaringBitmap
+
+
+BSD 3-clause
+------------
+See license/LICENSE-protobuf.txt for details.
+com.google.protobuf:protobuf-java
diff --git a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE 
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000..a4001a2ce
--- /dev/null
+++ b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,13 @@
+
+Apache Celeborn (Incubating)
+Copyright 2022-2023 The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (https://www.apache.org/).
+
+=============================================================================
+= NOTICE file corresponding to section 4d of the Apache License Version 2.0 =
+=============================================================================
+
+Apache Commons Lang
+Copyright 2001-2021 The Apache Software Foundation
diff --git 
a/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
 
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
new file mode 100644
index 000000000..b4350ec83
--- /dev/null
+++ 
b/client-flink/flink-1.18-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
@@ -0,0 +1,42 @@
+This license applies to all parts of Protocol Buffers except the following:
+
+  - Atomicops support for generic gcc, located in
+    src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
+    This file is copyrighted by Red Hat Inc.
+
+  - Atomicops support for AIX/POWER, located in
+    src/google/protobuf/stubs/atomicops_internals_aix.h.
+    This file is copyrighted by Bloomberg Finance LP.
+
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
\ No newline at end of file
diff --git a/client-flink/flink-1.18/pom.xml b/client-flink/flink-1.18/pom.xml
new file mode 100644
index 000000000..1d51f8dc8
--- /dev/null
+++ b/client-flink/flink-1.18/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-client-flink-1.18_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Client for Flink 1.18</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-flink-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
new file mode 100644
index 000000000..01740f1f3
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/**
+ * The implementation of {@link ShuffleEnvironment} based on the remote 
shuffle service, providing
+ * shuffle environment on flink TM side.
+ */
+public class RemoteShuffleEnvironment extends AbstractRemoteShuffleEnvironment
+    implements ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> {
+
+  /** Factory class to create {@link RemoteShuffleResultPartition}. */
+  private final RemoteShuffleResultPartitionFactory resultPartitionFactory;
+
+  private final RemoteShuffleInputGateFactory inputGateFactory;
+
+  /**
+   * @param networkBufferPool Network buffer pool for shuffle read and shuffle 
write.
+   * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
+   * @param resultPartitionFactory Factory class to create {@link 
RemoteShuffleResultPartition}. //
+   *     * @param inputGateFactory Factory class to create {@link 
RemoteShuffleInputGate}.
+   */
+  public RemoteShuffleEnvironment(
+      NetworkBufferPool networkBufferPool,
+      ResultPartitionManager resultPartitionManager,
+      RemoteShuffleResultPartitionFactory resultPartitionFactory,
+      RemoteShuffleInputGateFactory inputGateFactory,
+      CelebornConf conf) {
+
+    super(networkBufferPool, resultPartitionManager, conf);
+    this.resultPartitionFactory = resultPartitionFactory;
+    this.inputGateFactory = inputGateFactory;
+  }
+
+  @Override
+  public ResultPartitionWriter createResultPartitionWriterInternal(
+      ShuffleIOOwnerContext ownerContext,
+      int index,
+      ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor,
+      CelebornConf conf) {
+    return resultPartitionFactory.create(
+        ownerContext.getOwnerName(), index, 
resultPartitionDeploymentDescriptor, conf);
+  }
+
+  @Override
+  IndexedInputGate createInputGateInternal(
+      ShuffleIOOwnerContext ownerContext, int gateIndex, 
InputGateDeploymentDescriptor igdd) {
+    return inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, 
igdd);
+  }
+
+  @VisibleForTesting
+  RemoteShuffleResultPartitionFactory getResultPartitionFactory() {
+    return resultPartitionFactory;
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
new file mode 100644
index 000000000..3e73ae91e
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.executiongraph.IndexRange;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** A {@link IndexedInputGate} which ingest data from remote shuffle workers. 
*/
+public class RemoteShuffleInputGate extends IndexedInputGate {
+
+  private final RemoteShuffleInputGateDelegation inputGateDelegation;
+
+  public RemoteShuffleInputGate(
+      CelebornConf celebornConf,
+      String taskName,
+      int gateIndex,
+      InputGateDeploymentDescriptor gateDescriptor,
+      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+      BufferDecompressor bufferDecompressor,
+      int numConcurrentReading) {
+
+    inputGateDelegation =
+        new RemoteShuffleInputGateDelegation(
+            celebornConf,
+            taskName,
+            gateIndex,
+            gateDescriptor,
+            bufferPoolFactory,
+            bufferDecompressor,
+            numConcurrentReading,
+            availabilityHelper,
+            gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
+            gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
+  }
+
+  /** Setup gate and build network connections. */
+  @Override
+  public void setup() throws IOException {
+    inputGateDelegation.setup();
+  }
+
+  /** Index of the gate of the corresponding computing task. */
+  @Override
+  public int getGateIndex() {
+    return inputGateDelegation.getGateIndex();
+  }
+
+  /** Get number of input channels. A channel is a data flow from one shuffle 
worker. */
+  @Override
+  public int getNumberOfInputChannels() {
+    return inputGateDelegation.getBufferReaders().size();
+  }
+
+  /** Whether reading is finished -- all channels are finished and cached 
buffers are drained. */
+  @Override
+  public boolean isFinished() {
+    return inputGateDelegation.isFinished();
+  }
+
+  @Override
+  public Optional<BufferOrEvent> getNext() {
+    throw new UnsupportedOperationException("Not implemented (DataSet API is 
not supported).");
+  }
+
+  /** Poll a received {@link BufferOrEvent}. */
+  @Override
+  public Optional<BufferOrEvent> pollNext() throws IOException {
+    return inputGateDelegation.pollNext();
+  }
+
+  /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
+  @Override
+  public void close() throws Exception {
+    inputGateDelegation.close();
+  }
+
+  /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
+  @Override
+  public List<InputChannelInfo> getChannelInfos() {
+    return inputGateDelegation.getChannelsInfo();
+  }
+
+  @Override
+  public void requestPartitions() {
+    // do-nothing
+  }
+
+  @Override
+  public void checkpointStarted(CheckpointBarrier barrier) {
+    // do-nothing.
+  }
+
+  @Override
+  public void checkpointStopped(long cancelledCheckpointId) {
+    // do-nothing.
+  }
+
+  @Override
+  public void triggerDebloating() {
+    // do-nothing.
+  }
+
+  @Override
+  public List<InputChannelInfo> getUnfinishedChannels() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public EndOfDataStatus hasReceivedEndOfData() {
+    if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
+      return EndOfDataStatus.NOT_END_OF_DATA;
+    } else {
+      // Keep compatibility with streaming mode.
+      return EndOfDataStatus.DRAINED;
+    }
+  }
+
+  @Override
+  public void finishReadRecoveredState() {
+    // do-nothing.
+  }
+
+  @Override
+  public InputChannel getChannel(int channelIndex) {
+    return new FakedRemoteInputChannel(channelIndex);
+  }
+
+  @Override
+  public void sendTaskEvent(TaskEvent event) {
+    throw new FlinkRuntimeException("Method should not be called.");
+  }
+
+  @Override
+  public void resumeConsumption(InputChannelInfo channelInfo) {
+    throw new FlinkRuntimeException("Method should not be called.");
+  }
+
+  @Override
+  public void acknowledgeAllRecordsProcessed(InputChannelInfo 
inputChannelInfo) {}
+
+  @Override
+  public CompletableFuture<Void> getStateConsumedFuture() {
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
+        inputGateDelegation.getTaskName(),
+        inputGateDelegation.getGateIndex(),
+        inputGateDelegation.getGateDescriptor().toString());
+  }
+
+  /** Accommodation for the incompleteness of Flink pluggable shuffle service. 
*/
+  private class FakedRemoteInputChannel extends RemoteInputChannel {
+    FakedRemoteInputChannel(int channelIndex) {
+      // Flink 1.18.0
+      // [FLINK-31638][network] Introduce the TieredStorageConsumerClient to 
SingleInputGate
+      // [FLINK-31642][network] Introduce the MemoryTierConsumerAgent to 
TieredStorageConsumerClient
+      super(
+          new SingleInputGate(
+              "",
+              inputGateDelegation.getGateIndex(),
+              new IntermediateDataSetID(),
+              ResultPartitionType.BLOCKING,
+              new IndexRange(0, 0),
+              1,
+              (a, b, c) -> {},
+              () -> null,
+              null,
+              new FakedMemorySegmentProvider(),
+              0,
+              new ThroughputCalculator(SystemClock.getInstance()),
+              null,
+              null,
+              null,
+              null),
+          channelIndex,
+          new ResultPartitionID(),
+          0,
+          new ConnectionID(
+              new TaskManagerLocation(ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 1),
+              0),
+          new LocalConnectionManager(),
+          0,
+          0,
+          0,
+          new SimpleCounter(),
+          new SimpleCounter(),
+          new FakedChannelStateWriter());
+    }
+  }
+
+  /** Accommodation for the incompleteness of Flink pluggable shuffle service. 
*/
+  private static class FakedMemorySegmentProvider implements 
MemorySegmentProvider {
+
+    @Override
+    public Collection<MemorySegment> requestUnpooledMemorySegments(int i) 
throws IOException {
+      return null;
+    }
+
+    @Override
+    public void recycleUnpooledMemorySegments(Collection<MemorySegment> 
collection)
+        throws IOException {}
+  }
+
+  /** Accommodation for the incompleteness of Flink pluggable shuffle service. 
*/
+  private static class FakedChannelStateWriter implements ChannelStateWriter {
+
+    @Override
+    public void start(long cpId, CheckpointOptions checkpointOptions) {}
+
+    @Override
+    public void addInputData(
+        long cpId, InputChannelInfo info, int startSeqNum, 
CloseableIterator<Buffer> data) {}
+
+    @Override
+    public void addOutputData(
+        long cpId, ResultSubpartitionInfo info, int startSeqNum, Buffer... 
data) {}
+
+    @Override
+    public void addOutputDataFuture(
+        long l,
+        ResultSubpartitionInfo resultSubpartitionInfo,
+        int i,
+        CompletableFuture<List<Buffer>> completableFuture)
+        throws IllegalArgumentException {}
+
+    @Override
+    public void finishInput(long checkpointId) {}
+
+    @Override
+    public void finishOutput(long checkpointId) {}
+
+    @Override
+    public void abort(long checkpointId, Throwable cause, boolean cleanup) {}
+
+    @Override
+    public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
+      return null;
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
new file mode 100644
index 000000000..d13613023
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleInputGate}. */
+public class RemoteShuffleInputGateFactory extends 
AbstractRemoteShuffleInputGateFactory {
+
+  public RemoteShuffleInputGateFactory(
+      CelebornConf conf, NetworkBufferPool networkBufferPool, int 
networkBufferSize) {
+    super(conf, networkBufferPool, networkBufferSize);
+  }
+
+  // For testing.
+  protected RemoteShuffleInputGate createInputGate(
+      String owningTaskName,
+      int gateIndex,
+      InputGateDeploymentDescriptor igdd,
+      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+      BufferDecompressor bufferDecompressor) {
+    return new RemoteShuffleInputGate(
+        this.celebornConf,
+        owningTaskName,
+        gateIndex,
+        igdd,
+        bufferPoolFactory,
+        bufferDecompressor,
+        numConcurrentReading);
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
new file mode 100644
index 000000000..820e456db
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import javax.annotation.Nullable;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+import org.apache.celeborn.plugin.flink.utils.Utils;
+
+/**
+ * A {@link ResultPartition} which appends records and events to {@link 
SortBuffer} and after the
+ * {@link SortBuffer} is full, all data in the {@link SortBuffer} will be 
copied and spilled to the
+ * remote shuffle service in subpartition index order sequentially. Large 
records that can not be
+ * appended to an empty {@link SortBuffer} will be spilled directly.
+ */
+public class RemoteShuffleResultPartition extends ResultPartition {
+
+  private final RemoteShuffleResultPartitionDelegation delegation;
+
+  public RemoteShuffleResultPartition(
+      String owningTaskName,
+      int partitionIndex,
+      ResultPartitionID partitionId,
+      ResultPartitionType partitionType,
+      int numSubpartitions,
+      int numTargetKeyGroups,
+      int networkBufferSize,
+      ResultPartitionManager partitionManager,
+      @Nullable BufferCompressor bufferCompressor,
+      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+      RemoteShuffleOutputGate outputGate) {
+
+    super(
+        owningTaskName,
+        partitionIndex,
+        partitionId,
+        partitionType,
+        numSubpartitions,
+        numTargetKeyGroups,
+        partitionManager,
+        bufferCompressor,
+        bufferPoolFactory);
+
+    delegation =
+        new RemoteShuffleResultPartitionDelegation(
+            networkBufferSize,
+            outputGate,
+            (bufferWithChannel, isBroadcast) -> 
updateStatistics(bufferWithChannel, isBroadcast),
+            numSubpartitions);
+  }
+
+  @Override
+  public void setup() throws IOException {
+    super.setup();
+    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
+    delegation.setup(
+        bufferPool,
+        bufferCompressor,
+        buffer -> canBeCompressed(buffer),
+        () -> checkInProduceState());
+  }
+
+  @Override
+  protected void setupInternal() {
+    // do not need to implement
+  }
+
+  @Override
+  public void emitRecord(ByteBuffer record, int targetSubpartition) throws 
IOException {
+    delegation.emit(record, targetSubpartition, DataType.DATA_BUFFER, false);
+  }
+
+  @Override
+  public void broadcastRecord(ByteBuffer record) throws IOException {
+    delegation.broadcast(record, DataType.DATA_BUFFER);
+  }
+
+  @Override
+  public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
+    Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+    try {
+      ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+      delegation.broadcast(serializedEvent, buffer.getDataType());
+    } finally {
+      buffer.recycleBuffer();
+    }
+  }
+
+  @Override
+  public void alignedBarrierTimeout(long l) {}
+
+  @Override
+  public void abortCheckpoint(long l, CheckpointException e) {}
+
+  @Override
+  public void finish() throws IOException {
+    Utils.checkState(!isReleased(), "Result partition is already released.");
+    broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+    delegation.finish();
+    super.finish();
+  }
+
+  @Override
+  public synchronized void close() {
+    delegation.close(() -> super.close());
+  }
+
+  @Override
+  protected void releaseInternal() {
+    // no-op
+  }
+
+  @Override
+  public void flushAll() {
+    delegation.flushAll();
+  }
+
+  @Override
+  public void flush(int subpartitionIndex) {
+    flushAll();
+  }
+
+  @Override
+  public CompletableFuture<?> getAvailableFuture() {
+    return AVAILABLE;
+  }
+
+  @Override
+  public int getNumberOfQueuedBuffers() {
+    return 0;
+  }
+
+  @Override
+  public long getSizeOfQueuedBuffersUnsafe() {
+    return 0;
+  }
+
+  @Override
+  public int getNumberOfQueuedBuffers(int targetSubpartition) {
+    return 0;
+  }
+
+  @Override
+  public ResultSubpartitionView createSubpartitionView(
+      int index, BufferAvailabilityListener availabilityListener) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public void notifyEndOfData(StopMode mode) throws IOException {
+    if (!delegation.isEndOfDataNotified()) {
+      broadcastEvent(new EndOfData(mode), false);
+      delegation.setEndOfDataNotified(true);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> getAllDataProcessedFuture() {
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public String toString() {
+    return "ResultPartition "
+        + partitionId.toString()
+        + " ["
+        + partitionType
+        + ", "
+        + numSubpartitions
+        + " subpartitions, shuffle-descriptor: "
+        + delegation.getOutputGate().getShuffleDesc()
+        + "]";
+  }
+
+  @VisibleForTesting
+  public RemoteShuffleResultPartitionDelegation getDelegation() {
+    return delegation;
+  }
+
+  public void updateStatistics(
+      SortBuffer.BufferWithChannel bufferWithChannel, boolean isBroadcast) {
+    numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
+    long readableBytes = bufferWithChannel.getBuffer().readableBytes() - 
BufferUtils.HEADER_LENGTH;
+    if (isBroadcast) {
+      resultPartitionBytes.incAll(readableBytes);
+    } else {
+      resultPartitionBytes.inc(bufferWithChannel.getChannelIndex(), 
readableBytes);
+    }
+    numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions : 
readableBytes);
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
new file mode 100644
index 000000000..b75435b92
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleResultPartition}. */
+public class RemoteShuffleResultPartitionFactory
+    extends AbstractRemoteShuffleResultPartitionFactory {
+
+  public RemoteShuffleResultPartitionFactory(
+      CelebornConf celebornConf,
+      ResultPartitionManager partitionManager,
+      BufferPoolFactory bufferPoolFactory,
+      int networkBufferSize) {
+
+    super(celebornConf, partitionManager, bufferPoolFactory, 
networkBufferSize);
+  }
+
+  @Override
+  ResultPartition createRemoteShuffleResultPartitionInternal(
+      String taskNameWithSubtaskAndId,
+      int partitionIndex,
+      ResultPartitionID id,
+      ResultPartitionType type,
+      int numSubpartitions,
+      int maxParallelism,
+      List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
+      CelebornConf celebornConf,
+      int numMappers,
+      BufferCompressor bufferCompressor,
+      RemoteShuffleDescriptor rsd) {
+    return new RemoteShuffleResultPartition(
+        taskNameWithSubtaskAndId,
+        partitionIndex,
+        id,
+        type,
+        numSubpartitions,
+        maxParallelism,
+        networkBufferSize,
+        partitionManager,
+        bufferCompressor,
+        bufferPoolFactories.get(0),
+        new RemoteShuffleOutputGate(
+            rsd,
+            numSubpartitions,
+            networkBufferSize,
+            bufferPoolFactories.get(1),
+            celebornConf,
+            numMappers));
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
new file mode 100644
index 000000000..bf95317bc
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+
+public class RemoteShuffleServiceFactory extends 
AbstractRemoteShuffleServiceFactory
+    implements ShuffleServiceFactory<
+        RemoteShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> {
+
+  @Override
+  public ShuffleMaster<RemoteShuffleDescriptor> createShuffleMaster(
+      ShuffleMasterContext shuffleMasterContext) {
+    return new RemoteShuffleMaster(shuffleMasterContext, new 
SimpleResultPartitionAdapter());
+  }
+
+  @Override
+  public ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> 
createShuffleEnvironment(
+      ShuffleEnvironmentContext shuffleEnvironmentContext) {
+    AbstractRemoteShuffleServiceParameters parameters =
+        initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
+    RemoteShuffleResultPartitionFactory resultPartitionFactory =
+        new RemoteShuffleResultPartitionFactory(
+            parameters.celebornConf,
+            parameters.resultPartitionManager,
+            parameters.networkBufferPool,
+            parameters.bufferSize);
+    RemoteShuffleInputGateFactory inputGateFactory =
+        new RemoteShuffleInputGateFactory(
+            parameters.celebornConf, parameters.networkBufferPool, 
parameters.bufferSize);
+
+    return new RemoteShuffleEnvironment(
+        parameters.networkBufferPool,
+        parameters.resultPartitionManager,
+        resultPartitionFactory,
+        inputGateFactory,
+        parameters.celebornConf);
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
new file mode 100644
index 000000000..3476b8fff
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+public class SimpleResultPartitionAdapter implements ResultPartitionAdapter {
+  @Override
+  public boolean isBlockingResultPartition(ResultPartitionType partitionType) {
+    return partitionType.isBlockingOrBlockingPersistentResultPartition();
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
new file mode 100644
index 000000000..8548ca8af
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class RemoteShuffleMasterTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteShuffleMasterTest.class);
+  private RemoteShuffleMaster remoteShuffleMaster;
+  private Configuration configuration;
+
+  @Before
+  public void setUp() {
+    configuration = new Configuration();
+    remoteShuffleMaster = createShuffleMaster(configuration);
+  }
+
+  @Test
+  public void testRegisterJob() {
+    JobShuffleContext jobShuffleContext = 
createJobShuffleContext(JobID.generate());
+    remoteShuffleMaster.registerJob(jobShuffleContext);
+
+    // reRunRegister job
+    try {
+      remoteShuffleMaster.registerJob(jobShuffleContext);
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+
+    // unRegister job
+    remoteShuffleMaster.unregisterJob(jobShuffleContext.getJobId());
+    remoteShuffleMaster.registerJob(jobShuffleContext);
+  }
+
+  @Test
+  public void testRegisterPartitionWithProducer()
+      throws UnknownHostException, ExecutionException, InterruptedException {
+    JobID jobID = JobID.generate();
+    JobShuffleContext jobShuffleContext = createJobShuffleContext(jobID);
+    remoteShuffleMaster.registerJob(jobShuffleContext);
+
+    IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+    PartitionDescriptor partitionDescriptor = 
createPartitionDescriptor(intermediateDataSetID, 0);
+    ProducerDescriptor producerDescriptor = createProducerDescriptor();
+    RemoteShuffleDescriptor remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    ShuffleResource shuffleResource = 
remoteShuffleDescriptor.getShuffleResource();
+    ShuffleResourceDescriptor mapPartitionShuffleDescriptor =
+        shuffleResource.getMapPartitionShuffleDescriptor();
+
+    LOG.info("remoteShuffleDescriptor:{}", remoteShuffleDescriptor);
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getPartitionId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getAttemptId());
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getMapId());
+
+    // use same dataset id
+    partitionDescriptor = createPartitionDescriptor(intermediateDataSetID, 1);
+    remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    mapPartitionShuffleDescriptor =
+        
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+
+    // use another attemptId
+    producerDescriptor = createProducerDescriptor();
+    remoteShuffleDescriptor =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID, partitionDescriptor, 
producerDescriptor)
+            .get();
+    mapPartitionShuffleDescriptor =
+        
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+    Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
+    Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+  }
+
+  @Test
+  public void testRegisterMultipleJobs()
+      throws UnknownHostException, ExecutionException, InterruptedException {
+    JobID jobID1 = JobID.generate();
+    JobShuffleContext jobShuffleContext1 = createJobShuffleContext(jobID1);
+    remoteShuffleMaster.registerJob(jobShuffleContext1);
+
+    JobID jobID2 = JobID.generate();
+    JobShuffleContext jobShuffleContext2 = createJobShuffleContext(jobID2);
+    remoteShuffleMaster.registerJob(jobShuffleContext2);
+
+    IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+    PartitionDescriptor partitionDescriptor = 
createPartitionDescriptor(intermediateDataSetID, 0);
+    ProducerDescriptor producerDescriptor = createProducerDescriptor();
+    RemoteShuffleDescriptor remoteShuffleDescriptor1 =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID1, partitionDescriptor, 
producerDescriptor)
+            .get();
+
+    // use same datasetId but different jobId
+    RemoteShuffleDescriptor remoteShuffleDescriptor2 =
+        remoteShuffleMaster
+            .registerPartitionWithProducer(jobID2, partitionDescriptor, 
producerDescriptor)
+            .get();
+
+    Assert.assertEquals(
+        remoteShuffleDescriptor1
+            .getShuffleResource()
+            .getMapPartitionShuffleDescriptor()
+            .getShuffleId(),
+        0);
+    Assert.assertEquals(
+        remoteShuffleDescriptor2
+            .getShuffleResource()
+            .getMapPartitionShuffleDescriptor()
+            .getShuffleId(),
+        1);
+  }
+
+  @Test
+  public void testShuffleMemoryAnnouncing() {
+    Map<IntermediateDataSetID, Integer> numberOfInputGateChannels = new 
HashMap<>();
+    Map<IntermediateDataSetID, Integer> numbersOfResultSubpartitions = new 
HashMap<>();
+    Map<IntermediateDataSetID, Integer> subPartitionNums = new HashMap<>();
+    Map<IntermediateDataSetID, ResultPartitionType> inputPartitionTypes = new 
HashMap<>();
+    Map<IntermediateDataSetID, ResultPartitionType> resultPartitionTypes = new 
HashMap<>();
+    IntermediateDataSetID inputDataSetID0 = new IntermediateDataSetID();
+    IntermediateDataSetID inputDataSetID1 = new IntermediateDataSetID();
+    IntermediateDataSetID outputDataSetID0 = new IntermediateDataSetID();
+    IntermediateDataSetID outputDataSetID1 = new IntermediateDataSetID();
+    IntermediateDataSetID outputDataSetID2 = new IntermediateDataSetID();
+    Random random = new Random();
+    numberOfInputGateChannels.put(inputDataSetID0, random.nextInt(1000));
+    numberOfInputGateChannels.put(inputDataSetID1, random.nextInt(1000));
+    int subPartitionNum1 = random.nextInt(1000);
+    int subPartitionNum2 = random.nextInt(1000);
+    int subPartitionNum3 = random.nextInt(1000);
+    numbersOfResultSubpartitions.put(outputDataSetID0, subPartitionNum1);
+    numbersOfResultSubpartitions.put(outputDataSetID1, subPartitionNum2);
+    numbersOfResultSubpartitions.put(outputDataSetID2, subPartitionNum3);
+    subPartitionNums.put(outputDataSetID0, subPartitionNum1);
+    subPartitionNums.put(outputDataSetID1, subPartitionNum2);
+    subPartitionNums.put(outputDataSetID2, subPartitionNum3);
+    inputPartitionTypes.put(inputDataSetID0, ResultPartitionType.BLOCKING);
+    inputPartitionTypes.put(inputDataSetID1, ResultPartitionType.BLOCKING);
+    resultPartitionTypes.put(outputDataSetID0, ResultPartitionType.BLOCKING);
+    resultPartitionTypes.put(outputDataSetID1, ResultPartitionType.BLOCKING);
+    resultPartitionTypes.put(outputDataSetID2, ResultPartitionType.BLOCKING);
+    MemorySize calculated =
+        remoteShuffleMaster.computeShuffleMemorySizeForTask(
+            TaskInputsOutputsDescriptor.from(
+                3,
+                numberOfInputGateChannels,
+                numbersOfResultSubpartitions,
+                subPartitionNums,
+                inputPartitionTypes,
+                resultPartitionTypes));
+
+    CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
+
+    long numBytesPerGate = celebornConf.clientFlinkMemoryPerInputGate();
+    long expectedInput = 2 * numBytesPerGate;
+
+    long numBytesPerResultPartition = 
celebornConf.clientFlinkMemoryPerResultPartition();
+    long expectedOutput = 3 * numBytesPerResultPartition;
+    MemorySize expected = new MemorySize(expectedInput + expectedOutput);
+
+    Assert.assertEquals(expected, calculated);
+  }
+
+  @After
+  public void tearDown() {
+    if (remoteShuffleMaster != null) {
+      try {
+        remoteShuffleMaster.close();
+      } catch (Exception e) {
+        LOG.warn(e.getMessage(), e);
+      }
+    }
+  }
+
+  public RemoteShuffleMaster createShuffleMaster(Configuration configuration) {
+    remoteShuffleMaster =
+        new RemoteShuffleMaster(
+            new ShuffleMasterContext() {
+              @Override
+              public Configuration getConfiguration() {
+                return configuration;
+              }
+
+              @Override
+              public void onFatalError(Throwable throwable) {
+                System.exit(-1);
+              }
+            },
+            new SimpleResultPartitionAdapter());
+
+    return remoteShuffleMaster;
+  }
+
+  public JobShuffleContext createJobShuffleContext(JobID jobId) {
+    return new JobShuffleContext() {
+      @Override
+      public org.apache.flink.api.common.JobID getJobId() {
+        return jobId;
+      }
+
+      @Override
+      public CompletableFuture<?> stopTrackingAndReleasePartitions(
+          Collection<ResultPartitionID> collection) {
+        return CompletableFuture.completedFuture(null);
+      }
+    };
+  }
+
+  public PartitionDescriptor createPartitionDescriptor(
+      IntermediateDataSetID intermediateDataSetId, int partitionNum) {
+    IntermediateResultPartitionID intermediateResultPartitionId =
+        new IntermediateResultPartitionID(intermediateDataSetId, partitionNum);
+    return new PartitionDescriptor(
+        intermediateDataSetId,
+        10,
+        intermediateResultPartitionId,
+        ResultPartitionType.BLOCKING,
+        5,
+        1,
+        false,
+        false);
+  }
+
+  public ProducerDescriptor createProducerDescriptor() throws 
UnknownHostException {
+    ExecutionAttemptID executionAttemptId =
+        new ExecutionAttemptID(
+            new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(0, 
0), 0), 0);
+    return new ProducerDescriptor(
+        ResourceID.generate(), executionAttemptId, InetAddress.getLocalHost(), 
100);
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
new file mode 100644
index 000000000..622c0eea7
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -0,0 +1,621 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+public class RemoteShuffleResultPartitionSuiteJ {
+  private final int networkBufferSize = 32 * 1024;
+  private BufferCompressor bufferCompressor = new 
BufferCompressor(networkBufferSize, "lz4");
+  private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
+  private final String compressCodec = "LZ4";
+  private final CelebornConf conf = new CelebornConf();
+  BufferDecompressor bufferDecompressor = new 
BufferDecompressor(networkBufferSize, "LZ4");
+
+  private static final int totalBuffers = 1000;
+
+  private static final int bufferSize = 1024;
+
+  private NetworkBufferPool globalBufferPool;
+
+  private BufferPool sortBufferPool;
+
+  private BufferPool nettyBufferPool;
+
+  private RemoteShuffleResultPartition partitionWriter;
+
+  private FakedRemoteShuffleOutputGate outputGate;
+
+  @Before
+  public void setup() {
+    globalBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (outputGate != null) {
+      outputGate.release();
+    }
+
+    if (sortBufferPool != null) {
+      sortBufferPool.lazyDestroy();
+    }
+    if (nettyBufferPool != null) {
+      nettyBufferPool.lazyDestroy();
+    }
+    assertEquals(totalBuffers, 
globalBufferPool.getNumberOfAvailableMemorySegments());
+    globalBufferPool.destroy();
+  }
+
+  @Test
+  public void tesSimpleFlush() throws IOException, InterruptedException {
+    List<SupplierWithException<BufferPool, IOException>> bufferPool = 
createBufferPoolFactory();
+    RemoteShuffleResultPartition remoteShuffleResultPartition =
+        new RemoteShuffleResultPartition(
+            "test",
+            0,
+            new ResultPartitionID(),
+            ResultPartitionType.BLOCKING,
+            2,
+            2,
+            32 * 1024,
+            new ResultPartitionManager(),
+            bufferCompressor,
+            bufferPool.get(0),
+            remoteShuffleOutputGate);
+    remoteShuffleResultPartition.setup();
+    doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
+    doNothing().when(remoteShuffleOutputGate).regionFinish();
+    
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
+    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getDelegation().getUnicastSortBuffer();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
+    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.getDelegation().flushSortBuffer(sortBuffer, 
true);
+  }
+
+  @Test
+  public void test123() throws IOException, InterruptedException {}
+
+  private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
+    NetworkBufferPool networkBufferPool =
+        new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
+
+    int numBuffersPerPartition = 64 * 1024 / 32;
+    int numForResultPartition = numBuffersPerPartition * 7 / 8;
+    int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+
+    List<SupplierWithException<BufferPool, IOException>> factories = new 
ArrayList<>();
+    factories.add(
+        () -> networkBufferPool.createBufferPool(numForResultPartition, 
numForResultPartition));
+    factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate, 
numForOutputGate));
+    return factories;
+  }
+
+  @Test
+  public void testWriteNormalRecordWithCompressionEnabled() throws Exception {
+    testWriteNormalRecord(true);
+  }
+
+  @Test
+  public void testWriteNormalRecordWithCompressionDisabled() throws Exception {
+    testWriteNormalRecord(false);
+  }
+
+  @Test
+  public void testWriteLargeRecord() throws Exception {
+    int numSubpartitions = 2;
+    int numBuffers = 100;
+    initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+    partitionWriter.setup();
+
+    byte[] dataWritten = new byte[bufferSize * numBuffers];
+    Random random = new Random();
+    random.nextBytes(dataWritten);
+    ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+    partitionWriter.emitRecord(recordWritten, 0);
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.finish();
+    partitionWriter.close();
+
+    List<Buffer> receivedBuffers = outputGate.getReceivedBuffers()[0];
+
+    ByteBuffer recordRead = ByteBuffer.allocate(bufferSize * numBuffers);
+    for (Buffer buffer : receivedBuffers) {
+      if (buffer.isBuffer()) {
+        recordRead.put(
+            buffer.getNioBuffer(
+                BufferUtils.HEADER_LENGTH, buffer.readableBytes() - 
BufferUtils.HEADER_LENGTH));
+      }
+    }
+    recordWritten.rewind();
+    recordRead.flip();
+    assertEquals(recordWritten, recordRead);
+  }
+
+  @Test
+  public void testBroadcastLargeRecord() throws Exception {
+    int numSubpartitions = 2;
+    int numBuffers = 100;
+    initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+    partitionWriter.setup();
+
+    byte[] dataWritten = new byte[bufferSize * numBuffers];
+    Random random = new Random();
+    random.nextBytes(dataWritten);
+    ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+    partitionWriter.broadcastRecord(recordWritten);
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.finish();
+    partitionWriter.close();
+
+    ByteBuffer recordRead0 = ByteBuffer.allocate(bufferSize * numBuffers);
+    for (Buffer buffer : outputGate.getReceivedBuffers()[0]) {
+      if (buffer.isBuffer()) {
+        recordRead0.put(
+            buffer.getNioBuffer(
+                BufferUtils.HEADER_LENGTH, buffer.readableBytes() - 
BufferUtils.HEADER_LENGTH));
+      }
+    }
+    recordWritten.rewind();
+    recordRead0.flip();
+    assertEquals(recordWritten, recordRead0);
+
+    ByteBuffer recordRead1 = ByteBuffer.allocate(bufferSize * numBuffers);
+    for (Buffer buffer : outputGate.getReceivedBuffers()[1]) {
+      if (buffer.isBuffer()) {
+        recordRead1.put(
+            buffer.getNioBuffer(
+                BufferUtils.HEADER_LENGTH, buffer.readableBytes() - 
BufferUtils.HEADER_LENGTH));
+      }
+    }
+    recordWritten.rewind();
+    recordRead1.flip();
+    assertEquals(recordWritten, recordRead0);
+  }
+
+  @Test
+  public void testFlush() throws Exception {
+    int numSubpartitions = 10;
+
+    initResultPartitionWriter(numSubpartitions, 10, 20, false, conf, 10);
+    partitionWriter.setup();
+
+    partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+    partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
+    assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
+    assertEquals(2, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.flush(0);
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
+    partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
+    assertEquals(3, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.flushAll();
+    assertEquals(0, sortBufferPool.bestEffortGetNumOfUsedBuffers());
+
+    partitionWriter.finish();
+    partitionWriter.close();
+  }
+
+  private void testWriteNormalRecord(boolean compressionEnabled) throws 
Exception {
+    int numSubpartitions = 4;
+    int numRecords = 100;
+    Random random = new Random();
+
+    initResultPartitionWriter(numSubpartitions, 100, 500, compressionEnabled, 
conf, 10);
+    partitionWriter.setup();
+    assertTrue(outputGate.isSetup());
+
+    Queue<DataAndType>[] dataWritten = new Queue[numSubpartitions];
+    IntStream.range(0, numSubpartitions).forEach(i -> dataWritten[i] = new 
ArrayDeque<>());
+    int[] numBytesWritten = new int[numSubpartitions];
+    Arrays.fill(numBytesWritten, 0);
+
+    for (int i = 0; i < numRecords; i++) {
+      byte[] data = new byte[random.nextInt(2 * bufferSize) + 1];
+      if (compressionEnabled) {
+        byte randomByte = (byte) random.nextInt();
+        Arrays.fill(data, randomByte);
+      } else {
+        random.nextBytes(data);
+      }
+      ByteBuffer record = ByteBuffer.wrap(data);
+      boolean isBroadCast = random.nextBoolean();
+
+      if (isBroadCast) {
+        partitionWriter.broadcastRecord(record);
+        IntStream.range(0, numSubpartitions)
+            .forEach(
+                subpartition ->
+                    recordDataWritten(
+                        record,
+                        Buffer.DataType.DATA_BUFFER,
+                        subpartition,
+                        dataWritten,
+                        numBytesWritten));
+      } else {
+        int subpartition = random.nextInt(numSubpartitions);
+        partitionWriter.emitRecord(record, subpartition);
+        recordDataWritten(
+            record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten, 
numBytesWritten);
+      }
+    }
+
+    partitionWriter.finish();
+    assertTrue(outputGate.isFinished());
+    partitionWriter.close();
+    assertTrue(outputGate.isClosed());
+
+    for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+      ByteBuffer record = 
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+      recordDataWritten(
+          record, Buffer.DataType.EVENT_BUFFER, subpartition, dataWritten, 
numBytesWritten);
+    }
+
+    outputGate
+        .getFinishedRegions()
+        .forEach(
+            regionIndex -> 
assertTrue(outputGate.getNumBuffersByRegion().containsKey(regionIndex)));
+
+    int[] numBytesRead = new int[numSubpartitions];
+    List<Buffer>[] receivedBuffers = outputGate.getReceivedBuffers();
+    List<Buffer>[] validateTarget = new List[numSubpartitions];
+    Arrays.fill(numBytesRead, 0);
+    for (int i = 0; i < numSubpartitions; i++) {
+      validateTarget[i] = new ArrayList<>();
+      for (Buffer buffer : receivedBuffers[i]) {
+        for (Buffer unpackedBuffer : BufferPacker.unpack(buffer.asByteBuf())) {
+          if (compressionEnabled && unpackedBuffer.isCompressed()) {
+            Buffer decompressedBuffer =
+                
bufferDecompressor.decompressToIntermediateBuffer(unpackedBuffer);
+            ByteBuffer decompressed = 
decompressedBuffer.getNioBufferReadable();
+            int numBytes = decompressed.remaining();
+            MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+            segment.put(0, decompressed, numBytes);
+            decompressedBuffer.recycleBuffer();
+            validateTarget[i].add(
+                new NetworkBuffer(segment, buf -> {}, 
unpackedBuffer.getDataType(), numBytes));
+            numBytesRead[i] += numBytes;
+          } else {
+            numBytesRead[i] += buffer.readableBytes();
+            validateTarget[i].add(buffer);
+          }
+        }
+      }
+    }
+    IntStream.range(0, numSubpartitions).forEach(subpartitions -> {});
+    checkWriteReadResult(
+        numSubpartitions, numBytesWritten, numBytesWritten, dataWritten, 
validateTarget);
+  }
+
+  private void initResultPartitionWriter(
+      int numSubpartitions,
+      int sortBufferPoolSize,
+      int nettyBufferPoolSize,
+      boolean compressionEnabled,
+      CelebornConf conf,
+      int numMappers)
+      throws Exception {
+
+    sortBufferPool = globalBufferPool.createBufferPool(sortBufferPoolSize, 
sortBufferPoolSize);
+    nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize, 
nettyBufferPoolSize);
+
+    outputGate =
+        new FakedRemoteShuffleOutputGate(
+            getShuffleDescriptor(), numSubpartitions, () -> nettyBufferPool, 
conf, numMappers);
+    outputGate.setup();
+
+    if (compressionEnabled) {
+      partitionWriter =
+          new RemoteShuffleResultPartition(
+              "RemoteShuffleResultPartitionWriterTest",
+              0,
+              new ResultPartitionID(),
+              ResultPartitionType.BLOCKING,
+              numSubpartitions,
+              numSubpartitions,
+              bufferSize,
+              new ResultPartitionManager(),
+              bufferCompressor,
+              () -> sortBufferPool,
+              outputGate);
+    } else {
+      partitionWriter =
+          new RemoteShuffleResultPartition(
+              "RemoteShuffleResultPartitionWriterTest",
+              0,
+              new ResultPartitionID(),
+              ResultPartitionType.BLOCKING,
+              numSubpartitions,
+              numSubpartitions,
+              bufferSize,
+              new ResultPartitionManager(),
+              null,
+              () -> sortBufferPool,
+              outputGate);
+    }
+  }
+
+  private void recordDataWritten(
+      ByteBuffer record,
+      Buffer.DataType dataType,
+      int subpartition,
+      Queue<DataAndType>[] dataWritten,
+      int[] numBytesWritten) {
+
+    record.rewind();
+    dataWritten[subpartition].add(new DataAndType(record, dataType));
+    numBytesWritten[subpartition] += record.remaining();
+  }
+
+  private static class FakedRemoteShuffleOutputGate extends 
RemoteShuffleOutputGate {
+
+    private boolean isSetup;
+    private boolean isFinished;
+    private boolean isClosed;
+    private final List<Buffer>[] receivedBuffers;
+    private final Map<Integer, Integer> numBuffersByRegion;
+    private final Set<Integer> finishedRegions;
+    private int currentRegionIndex;
+    private boolean currentIsBroadcast;
+
+    FakedRemoteShuffleOutputGate(
+        RemoteShuffleDescriptor shuffleDescriptor,
+        int numSubpartitions,
+        SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+        CelebornConf celebornConf,
+        int numMappers) {
+
+      super(
+          shuffleDescriptor,
+          numSubpartitions,
+          bufferSize,
+          bufferPoolFactory,
+          celebornConf,
+          numMappers);
+      isSetup = false;
+      isFinished = false;
+      isClosed = false;
+      numBuffersByRegion = new HashMap<>();
+      finishedRegions = new HashSet<>();
+      currentRegionIndex = -1;
+      receivedBuffers = new ArrayList[numSubpartitions];
+      IntStream.range(0, numSubpartitions).forEach(i -> receivedBuffers[i] = 
new ArrayList<>());
+      currentIsBroadcast = false;
+    }
+
+    @Override
+    FlinkShuffleClientImpl getShuffleClient() {
+      FlinkShuffleClientImpl client = mock(FlinkShuffleClientImpl.class);
+      doNothing().when(client).cleanup(anyInt(), anyInt(), anyInt());
+      return client;
+    }
+
+    @Override
+    public void setup() throws IOException, InterruptedException {
+      bufferPool = bufferPoolFactory.get();
+      isSetup = true;
+    }
+
+    @Override
+    public void write(Buffer buffer, int subIdx) {
+      if (currentIsBroadcast) {
+        assertEquals(0, subIdx);
+        ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+        for (int i = 0; i < numSubs; i++) {
+          int numBytes = buffer.readableBytes();
+          MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+          byteBuffer.rewind();
+          segment.put(0, byteBuffer, numBytes);
+          receivedBuffers[i].add(
+              new NetworkBuffer(
+                  segment, buf -> {}, buffer.getDataType(), 
buffer.isCompressed(), numBytes));
+        }
+        buffer.recycleBuffer();
+      } else {
+        receivedBuffers[subIdx].add(buffer);
+      }
+      if (numBuffersByRegion.containsKey(currentRegionIndex)) {
+        int prev = numBuffersByRegion.get(currentRegionIndex);
+        numBuffersByRegion.put(currentRegionIndex, prev + 1);
+      } else {
+        numBuffersByRegion.put(currentRegionIndex, 1);
+      }
+    }
+
+    @Override
+    public void regionStart(boolean isBroadcast) {
+      currentIsBroadcast = isBroadcast;
+      currentRegionIndex++;
+    }
+
+    @Override
+    public void regionFinish() {
+      if (finishedRegions.contains(currentRegionIndex)) {
+        throw new IllegalStateException("Unexpected region: " + 
currentRegionIndex);
+      }
+      finishedRegions.add(currentRegionIndex);
+    }
+
+    @Override
+    public void finish() throws InterruptedException {
+      isFinished = true;
+    }
+
+    @Override
+    public void close() {
+      isClosed = true;
+    }
+
+    public List<Buffer>[] getReceivedBuffers() {
+      return receivedBuffers;
+    }
+
+    public Map<Integer, Integer> getNumBuffersByRegion() {
+      return numBuffersByRegion;
+    }
+
+    public Set<Integer> getFinishedRegions() {
+      return finishedRegions;
+    }
+
+    public boolean isSetup() {
+      return isSetup;
+    }
+
+    public boolean isFinished() {
+      return isFinished;
+    }
+
+    public boolean isClosed() {
+      return isClosed;
+    }
+
+    public void release() throws Exception {
+      IntStream.range(0, numSubs)
+          .forEach(
+              subpartitionIndex -> {
+                
receivedBuffers[subpartitionIndex].forEach(Buffer::recycleBuffer);
+                receivedBuffers[subpartitionIndex].clear();
+              });
+      numBuffersByRegion.clear();
+      finishedRegions.clear();
+      super.close();
+    }
+  }
+
+  private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception {
+    Random random = new Random();
+    byte[] bytes = new byte[16];
+    random.nextBytes(bytes);
+    return new RemoteShuffleDescriptor(
+        new JobID(bytes).toString(),
+        new JobID(bytes),
+        new JobID(bytes).toString(),
+        new ResultPartitionID(),
+        new RemoteShuffleResource(
+            "1", 2, System.currentTimeMillis(), new 
ShuffleResourceDescriptor(1, 1, 1, 0)));
+  }
+
+  /** Data written and its {@link Buffer.DataType}. */
+  public static class DataAndType {
+    private final ByteBuffer data;
+    private final Buffer.DataType dataType;
+
+    DataAndType(ByteBuffer data, Buffer.DataType dataType) {
+      this.data = data;
+      this.dataType = dataType;
+    }
+  }
+
+  public static void checkWriteReadResult(
+      int numSubpartitions,
+      int[] numBytesWritten,
+      int[] numBytesRead,
+      Queue<DataAndType>[] dataWritten,
+      Collection<Buffer>[] buffersRead) {
+    for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; 
++subpartitionIndex) {
+      assertEquals(numBytesWritten[subpartitionIndex], 
numBytesRead[subpartitionIndex]);
+
+      List<DataAndType> eventsWritten = new ArrayList<>();
+      List<Buffer> eventsRead = new ArrayList<>();
+
+      ByteBuffer subpartitionDataWritten = 
ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
+      for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
+        subpartitionDataWritten.put(dataAndType.data);
+        dataAndType.data.rewind();
+        if (dataAndType.dataType.isEvent()) {
+          eventsWritten.add(dataAndType);
+        }
+      }
+
+      ByteBuffer subpartitionDataRead = 
ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
+      for (Buffer buffer : buffersRead[subpartitionIndex]) {
+        subpartitionDataRead.put(buffer.getNioBufferReadable());
+        if (!buffer.isBuffer()) {
+          eventsRead.add(buffer);
+        }
+      }
+
+      subpartitionDataWritten.flip();
+      subpartitionDataRead.flip();
+      assertEquals(subpartitionDataWritten, subpartitionDataRead);
+
+      assertEquals(eventsWritten.size(), eventsRead.size());
+      for (int i = 0; i < eventsWritten.size(); ++i) {
+        assertEquals(eventsWritten.get(i).dataType, 
eventsRead.get(i).getDataType());
+        assertEquals(eventsWritten.get(i).data, 
eventsRead.get(i).getNioBufferReadable());
+      }
+    }
+  }
+}
diff --git 
a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
new file mode 100644
index 000000000..73dbb6a9d
--- /dev/null
+++ 
b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RemoteShuffleServiceFactorySuitJ {
+  @Test
+  public void testCreateShuffleEnvironment() {
+    RemoteShuffleServiceFactory remoteShuffleServiceFactory = new 
RemoteShuffleServiceFactory();
+    ShuffleEnvironmentContext shuffleEnvironmentContext = 
mock(ShuffleEnvironmentContext.class);
+    when(shuffleEnvironmentContext.getConfiguration()).thenReturn(new 
Configuration());
+    when(shuffleEnvironmentContext.getNetworkMemorySize())
+        .thenReturn(new MemorySize(64 * 1024 * 1024));
+    MetricGroup parentMeric = mock(MetricGroup.class);
+    
when(shuffleEnvironmentContext.getParentMetricGroup()).thenReturn(parentMeric);
+    MetricGroup childGroup = mock(MetricGroup.class);
+    MetricGroup childChildGroup = mock(MetricGroup.class);
+    when(parentMeric.addGroup(anyString())).thenReturn(childGroup);
+    when(childGroup.addGroup(any())).thenReturn(childChildGroup);
+    when(childChildGroup.gauge(any(), any())).thenReturn(null);
+    ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> 
shuffleEnvironment =
+        
remoteShuffleServiceFactory.createShuffleEnvironment(shuffleEnvironmentContext);
+    Assert.assertEquals(
+        32 * 1024,
+        ((RemoteShuffleEnvironment) shuffleEnvironment)
+            .getResultPartitionFactory()
+            .getNetworkBufferSize());
+  }
+}
diff --git a/dev/dependencies.sh b/dev/dependencies.sh
index 5824c0ba5..73c6c41c4 100755
--- a/dev/dependencies.sh
+++ b/dev/dependencies.sh
@@ -186,6 +186,10 @@ case "$MODULE" in
     MVN_MODULES="client-flink/flink-1.17"
     SBT_PROJECT="celeborn-client-flink-1_17"
     ;;
+  "flink-1.18")
+    MVN_MODULES="client-flink/flink-1.18"
+    SBT_PROJECT="celeborn-client-flink-1_18"
+    ;;
   "mr")
     MVN_MODULES="client-mr/mr"
     SBT_PROJECT="celeborn-client-mr"
diff --git a/dev/deps/dependencies-client-flink-1.18 
b/dev/deps/dependencies-client-flink-1.18
new file mode 100644
index 000000000..7e8c870ff
--- /dev/null
+++ b/dev/deps/dependencies-client-flink-1.18
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+commons-crypto/1.0.0//commons-crypto-1.0.0.jar
+commons-io/2.13.0//commons-io-2.13.0.jar
+commons-lang3/3.12.0//commons-lang3-3.12.0.jar
+commons-logging/1.1.3//commons-logging-1.1.3.jar
+guava/14.0.1//guava-14.0.1.jar
+hadoop-client-api/3.2.4//hadoop-client-api-3.2.4.jar
+hadoop-client-runtime/3.2.4//hadoop-client-runtime-3.2.4.jar
+htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
+jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar
+jsr305/1.3.9//jsr305-1.3.9.jar
+jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
+leveldbjni-all/1.8//leveldbjni-all-1.8.jar
+lz4-java/1.8.0//lz4-java-1.8.0.jar
+metrics-core/3.2.6//metrics-core-3.2.6.jar
+metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
+metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
+netty-all/4.1.93.Final//netty-all-4.1.93.Final.jar
+netty-buffer/4.1.93.Final//netty-buffer-4.1.93.Final.jar
+netty-codec-dns/4.1.93.Final//netty-codec-dns-4.1.93.Final.jar
+netty-codec-haproxy/4.1.93.Final//netty-codec-haproxy-4.1.93.Final.jar
+netty-codec-http/4.1.93.Final//netty-codec-http-4.1.93.Final.jar
+netty-codec-http2/4.1.93.Final//netty-codec-http2-4.1.93.Final.jar
+netty-codec-memcache/4.1.93.Final//netty-codec-memcache-4.1.93.Final.jar
+netty-codec-mqtt/4.1.93.Final//netty-codec-mqtt-4.1.93.Final.jar
+netty-codec-redis/4.1.93.Final//netty-codec-redis-4.1.93.Final.jar
+netty-codec-smtp/4.1.93.Final//netty-codec-smtp-4.1.93.Final.jar
+netty-codec-socks/4.1.93.Final//netty-codec-socks-4.1.93.Final.jar
+netty-codec-stomp/4.1.93.Final//netty-codec-stomp-4.1.93.Final.jar
+netty-codec-xml/4.1.93.Final//netty-codec-xml-4.1.93.Final.jar
+netty-codec/4.1.93.Final//netty-codec-4.1.93.Final.jar
+netty-common/4.1.93.Final//netty-common-4.1.93.Final.jar
+netty-handler-proxy/4.1.93.Final//netty-handler-proxy-4.1.93.Final.jar
+netty-handler/4.1.93.Final//netty-handler-4.1.93.Final.jar
+netty-resolver-dns-classes-macos/4.1.93.Final//netty-resolver-dns-classes-macos-4.1.93.Final.jar
+netty-resolver-dns-native-macos/4.1.93.Final/osx-aarch_64/netty-resolver-dns-native-macos-4.1.93.Final-osx-aarch_64.jar
+netty-resolver-dns-native-macos/4.1.93.Final/osx-x86_64/netty-resolver-dns-native-macos-4.1.93.Final-osx-x86_64.jar
+netty-resolver-dns/4.1.93.Final//netty-resolver-dns-4.1.93.Final.jar
+netty-resolver/4.1.93.Final//netty-resolver-4.1.93.Final.jar
+netty-transport-classes-epoll/4.1.93.Final//netty-transport-classes-epoll-4.1.93.Final.jar
+netty-transport-classes-kqueue/4.1.93.Final//netty-transport-classes-kqueue-4.1.93.Final.jar
+netty-transport-native-epoll/4.1.93.Final/linux-aarch_64/netty-transport-native-epoll-4.1.93.Final-linux-aarch_64.jar
+netty-transport-native-epoll/4.1.93.Final/linux-x86_64/netty-transport-native-epoll-4.1.93.Final-linux-x86_64.jar
+netty-transport-native-kqueue/4.1.93.Final/osx-aarch_64/netty-transport-native-kqueue-4.1.93.Final-osx-aarch_64.jar
+netty-transport-native-kqueue/4.1.93.Final/osx-x86_64/netty-transport-native-kqueue-4.1.93.Final-osx-x86_64.jar
+netty-transport-native-unix-common/4.1.93.Final//netty-transport-native-unix-common-4.1.93.Final.jar
+netty-transport-rxtx/4.1.93.Final//netty-transport-rxtx-4.1.93.Final.jar
+netty-transport-sctp/4.1.93.Final//netty-transport-sctp-4.1.93.Final.jar
+netty-transport-udt/4.1.93.Final//netty-transport-udt-4.1.93.Final.jar
+netty-transport/4.1.93.Final//netty-transport-4.1.93.Final.jar
+protobuf-java/3.19.2//protobuf-java-3.19.2.jar
+ratis-client/2.5.1//ratis-client-2.5.1.jar
+ratis-common/2.5.1//ratis-common-2.5.1.jar
+ratis-proto/2.5.1//ratis-proto-2.5.1.jar
+ratis-thirdparty-misc/1.0.4//ratis-thirdparty-misc-1.0.4.jar
+scala-library/2.12.15//scala-library-2.12.15.jar
+scala-reflect/2.12.15//scala-reflect-2.12.15.jar
+shims/0.9.32//shims-0.9.32.jar
+slf4j-api/1.7.36//slf4j-api-1.7.36.jar
+snakeyaml/1.33//snakeyaml-1.33.jar
+zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/dev/reformat b/dev/reformat
index 11b6e6775..0b30a3776 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -22,6 +22,7 @@ PROJECT_DIR="$(cd "`dirname "$0"`/.."; pwd)"
 ${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.14
 ${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.15
 ${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.17
+${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.18
 ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
 ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
 ${PROJECT_DIR}/build/mvn spotless:apply -Pmr
diff --git a/docs/README.md b/docs/README.md
index b026f5581..7cd683bc9 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -119,7 +119,7 @@ INFO [async-reply] Controller: CommitFiles for 
local-1690000152711-0 success wit
 
 ## Start Flink with Celeborn
 #### Copy Celeborn Client to Flink's lib
-Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x and 
Flink 1.17.x, copy the corresponding client jar into Flink's
+Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 
1.17.x and Flink 1.18.x, copy the corresponding client jar into Flink's
 `lib/` directory:
 ```shell
 cp $CELEBORN_HOME/flink/<Celeborn Client Jar> $FLINK_HOME/lib/
diff --git a/docs/developers/sbt.md b/docs/developers/sbt.md
index 484241f68..35b324d3c 100644
--- a/docs/developers/sbt.md
+++ b/docs/developers/sbt.md
@@ -38,6 +38,7 @@ The following table indicates the compatibility of Celeborn 
Spark and Flink clie
 | Flink 1.14 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 | Flink 1.15 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 | Flink 1.17 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
+| Flink 1.18 | &#x274C;          | &#10004;          | &#10004;           | 
&#x274C;           | &#x274C;          | &#x274C;           | &#x274C;          
 |
 
 ## Useful SBT commands
 
diff --git a/pom.xml b/pom.xml
index 0510f59c4..8ba97fbff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1265,6 +1265,26 @@
       </properties>
     </profile>
 
+    <profile>
+      <id>flink-1.18</id>
+      <modules>
+        <module>client-flink/common</module>
+        <module>client-flink/flink-1.18</module>
+        <module>client-flink/flink-1.18-shaded</module>
+        <module>tests/flink-it</module>
+      </modules>
+      <properties>
+        <flink.version>1.18.0</flink.version>
+        <flink.binary.version>1.18</flink.binary.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.18_${scala.binary.version}</celeborn.flink.plugin.artifact>
+        <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+        <flink.clients.artifact>flink-clients</flink.clients.artifact>
+        
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
+        
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
+      </properties>
+    </profile>
+
     <profile>
       <id>mr</id>
       <modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 708e86ba1..168a6f3bd 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -275,6 +275,7 @@ object Utils {
     case Some("flink-1.14") => Some(Flink114)
     case Some("flink-1.15") => Some(Flink115)
     case Some("flink-1.17") => Some(Flink117)
+    case Some("flink-1.18") => Some(Flink118)
     case _ => None
   }
 
@@ -749,6 +750,16 @@ object Flink117 extends FlinkClientProjects {
   val flinkClientShadedProjectName: String = 
"celeborn-client-flink-1_17-shaded"
 }
 
+object Flink118 extends FlinkClientProjects {
+  val flinkVersion = "1.18.0"
+
+  // note that SBT does not allow using the period symbol (.) in project names.
+  val flinkClientProjectPath = "client-flink/flink-1.18"
+  val flinkClientProjectName = "celeborn-client-flink-1_18"
+  val flinkClientShadedProjectPath: String = "client-flink/flink-1.18-shaded"
+  val flinkClientShadedProjectName: String = 
"celeborn-client-flink-1_18-shaded"
+}
+
 trait FlinkClientProjects {
 
   val flinkVersion: String

Reply via email to