This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 76a42beab [CELEBORN-610][FLINK] Eliminate pluginconf and merge its
content to CelebornConf
76a42beab is described below
commit 76a42beab0888c2996f2bd9eb1ccee8912d18a9e
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jun 5 14:08:53 2023 +0800
[CELEBORN-610][FLINK] Eliminate pluginconf and merge its content to
CelebornConf
### What changes were proposed in this pull request?
Pluginconf might be hard to understand why Celeborn needs to config class.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
UT.
Closes #1524 from FMX/CELEBORN-610.
Authored-by: Ethan Feng <[email protected]>
Signed-off-by: Ethan Feng <[email protected]>
---
.../AbstractRemoteShuffleInputGateFactory.java | 28 +++----
...bstractRemoteShuffleResultPartitionFactory.java | 21 ++----
.../celeborn/plugin/flink/RemoteShuffleMaster.java | 18 +----
.../celeborn/plugin/flink/config/PluginConf.java | 55 --------------
.../celeborn/plugin/flink/utils/FlinkUtils.java | 31 +++++---
.../celeborn/plugin/flink/PluginConfSuiteJ.java | 63 ----------------
.../plugin/flink/PluginSideConfSuiteJ.java | 59 +++++++++++++++
.../flink/RemoteShuffleInputGateFactory.java | 8 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 4 +-
.../plugin/flink/RemoteShuffleServiceFactory.java | 12 +--
.../plugin/flink/RemoteShuffleMasterTest.java | 16 ++--
.../flink/RemoteShuffleInputGateFactory.java | 8 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 4 +-
.../plugin/flink/RemoteShuffleServiceFactory.java | 12 +--
.../plugin/flink/RemoteShuffleMasterTest.java | 16 ++--
.../flink/RemoteShuffleInputGateFactory.java | 8 +-
.../flink/RemoteShuffleResultPartitionFactory.java | 4 +-
.../plugin/flink/RemoteShuffleServiceFactory.java | 6 +-
.../plugin/flink/RemoteShuffleMasterTest.java | 10 +--
.../org/apache/celeborn/common/CelebornConf.scala | 85 ++++++++++++++++++++++
docs/configuration/client.md | 8 ++
21 files changed, 221 insertions(+), 255 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index 597bd8994..c08d4e4de 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -32,8 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
-import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/** Factory class to create RemoteShuffleInputGate. */
@@ -67,38 +64,31 @@ public abstract class AbstractRemoteShuffleInputGateFactory
{
protected CelebornConf celebornConf;
public AbstractRemoteShuffleInputGateFactory(
- Configuration flinkConf,
- CelebornConf conf,
- NetworkBufferPool networkBufferPool,
- int networkBufferSize) {
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
this.celebornConf = conf;
- long configuredMemorySize =
- FlinkUtils.byteStringValueAsBytes(flinkConf,
PluginConf.MEMORY_PER_INPUT_GATE);
- long minConfiguredMemorySize =
- FlinkUtils.byteStringValueAsBytes(flinkConf,
PluginConf.MIN_MEMORY_PER_GATE);
+ long configuredMemorySize = celebornConf.clientFlinkMemoryPerInputGate();
+ long minConfiguredMemorySize =
celebornConf.clientFlinkMemoryPerInputGateMin();
if (configuredMemorySize < minConfiguredMemorySize) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per input gate, please increase %s
to at " + "least %s.",
- PluginConf.MEMORY_PER_INPUT_GATE.name,
- PluginConf.getValue(flinkConf, PluginConf.MIN_MEMORY_PER_GATE)));
+ CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
+ celebornConf.clientFlinkMemoryPerInputGate()));
}
this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
- this.supportFloatingBuffers =
- FlinkUtils.stringValueAsBoolean(
- flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE);
+ this.supportFloatingBuffers =
celebornConf.clientFlinkInputGateSupportFloatingBuffer();
if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per input gate, please increase %s
to at "
+ "least %d bytes.",
- PluginConf.MEMORY_PER_INPUT_GATE.name, networkBufferSize *
MIN_BUFFERS_PER_GATE));
+ CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
+ networkBufferSize * MIN_BUFFERS_PER_GATE));
}
this.networkBufferSize = networkBufferSize;
- this.numConcurrentReading =
- Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS));
+ this.numConcurrentReading = celebornConf.clientFlinkNumConcurrentReading();
this.networkBufferPool = networkBufferPool;
}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index 80e3c5dbe..37491c997 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
-import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/** Factory class to create {@link RemoteShuffleResultPartition}. */
@@ -70,33 +67,28 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
protected String compressionCodec;
public AbstractRemoteShuffleResultPartitionFactory(
- Configuration flinkConf,
CelebornConf celebornConf,
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
- long configuredMemorySize =
- FlinkUtils.byteStringValueAsBytes(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION);
- long minConfiguredMemorySize =
- FlinkUtils.byteStringValueAsBytes(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION);
+ long configuredMemorySize =
celebornConf.clientFlinkMemoryPerResultPartition();
+ long minConfiguredMemorySize =
celebornConf.clientFlinkMemoryPerResultPartitionMin();
if (configuredMemorySize < minConfiguredMemorySize) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per result partition, please
increase %s "
+ "to at least %s.",
- PluginConf.MEMORY_PER_RESULT_PARTITION.name,
minConfiguredMemorySize));
+ CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
minConfiguredMemorySize));
}
this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
- this.supportFloatingBuffers =
- FlinkUtils.stringValueAsBoolean(
- flinkConf, PluginConf.SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE);
+ this.supportFloatingBuffers =
celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per partition, please increase %s
to at "
+ "least %d bytes.",
- PluginConf.MEMORY_PER_RESULT_PARTITION.name,
+ CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
networkBufferSize * MIN_BUFFERS_PER_PARTITION));
}
@@ -104,8 +96,7 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
this.bufferPoolFactory = bufferPoolFactory;
this.networkBufferSize = networkBufferSize;
- this.compressionCodec =
- PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC);
+ this.compressionCodec = celebornConf.shuffleCompressionCodec().name();
}
public ResultPartition create(
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
index d50a0e625..4323a5d5c 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java
@@ -26,20 +26,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.shuffle.JobShuffleContext;
-import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ProducerDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
-import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.apache.flink.runtime.shuffle.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
import org.apache.celeborn.plugin.flink.utils.ThreadUtils;
@@ -214,15 +207,12 @@ public class RemoteShuffleMaster implements
ShuffleMaster<RemoteShuffleDescripto
}
int numResultPartitions =
taskInputsOutputsDescriptor.getSubpartitionNums().size();
- long numBytesPerPartition =
- FlinkUtils.byteStringValueAsBytes(
- shuffleMasterContext.getConfiguration(),
PluginConf.MEMORY_PER_RESULT_PARTITION);
+ CelebornConf conf =
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
+ long numBytesPerPartition = conf.clientFlinkMemoryPerResultPartition();
long numBytesForOutput = numBytesPerPartition * numResultPartitions;
int numInputGates =
taskInputsOutputsDescriptor.getInputChannelNums().size();
- long numBytesPerGate =
- FlinkUtils.byteStringValueAsBytes(
- shuffleMasterContext.getConfiguration(),
PluginConf.MEMORY_PER_INPUT_GATE);
+ long numBytesPerGate = conf.clientFlinkMemoryPerInputGate();
long numBytesForInput = numBytesPerGate * numInputGates;
LOG.debug(
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
deleted file mode 100644
index ddc0b5060..000000000
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.plugin.flink.config;
-
-import org.apache.flink.configuration.Configuration;
-
-import org.apache.celeborn.common.CelebornConf;
-
-public enum PluginConf {
- MIN_MEMORY_PER_PARTITION("remote-shuffle.job.min.memory-per-partition", "",
"8m"),
- MIN_MEMORY_PER_GATE("remote-shuffle.job.min.memory-per-gate", "", "8m"),
- NUM_CONCURRENT_READINGS(
- "remote-shuffle.job.concurrent-readings-per-gate", "",
String.valueOf(Integer.MAX_VALUE)),
- MEMORY_PER_RESULT_PARTITION("remote-shuffle.job.memory-per-partition", "",
"64m"),
- MEMORY_PER_INPUT_GATE("remote-shuffle.job.memory-per-gate", "", "32m"),
- SUPPORT_FLOATING_BUFFER_PER_INPUT_GATE(
- "remote-shuffle.job.support-floating-buffer-per-input-gate", "", "true"),
- ENABLE_DATA_COMPRESSION("remote-shuffle.job.enable-data-compression", "",
"true"),
- SUPPORT_FLOATING_BUFFER_PER_OUTPUT_GATE(
- "remote-shuffle.job.support-floating-buffer-per-output-gate", "",
"true"),
- REMOTE_SHUFFLE_COMPRESSION_CODEC(
- "remote-shuffle.job.compression.codec",
- CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(),
- "LZ4"),
- ;
-
- public String name;
- public String alterName;
- public String defaultValue;
-
- PluginConf(String name, String alterName, String defaultValue) {
- this.name = name;
- this.alterName = alterName;
- this.defaultValue = defaultValue;
- }
-
- public static String getValue(Configuration flinkConf, PluginConf conf) {
- return flinkConf.getString(conf.name, conf.defaultValue);
- }
-}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
index 740c21ec3..2bcde62ab 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/FlinkUtils.java
@@ -17,7 +17,9 @@
package org.apache.celeborn.plugin.flink.utils;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
@@ -25,18 +27,31 @@ import
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.util.Utils;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
public class FlinkUtils {
+ public static final Set<String> pluginConfNames =
+ new HashSet<String>() {
+ {
+ add("remote-shuffle.job.min.memory-per-partition");
+ add("remote-shuffle.job.min.memory-per-gate");
+ add("remote-shuffle.job.concurrent-readings-per-gate");
+ add("remote-shuffle.job.memory-per-partition");
+ add("remote-shuffle.job.memory-per-gate");
+ add("remote-shuffle.job.support-floating-buffer-per-input-gate");
+ add("remote-shuffle.job.enable-data-compression");
+ add("remote-shuffle.job.support-floating-buffer-per-output-gate");
+ add("remote-shuffle.job.compression.codec");
+ }
+ };
+
public static CelebornConf toCelebornConf(Configuration configuration) {
CelebornConf tmpCelebornConf = new CelebornConf();
Map<String, String> confMap = configuration.toMap();
for (Map.Entry<String, String> entry : confMap.entrySet()) {
String key = entry.getKey();
- if (key.startsWith("celeborn.")) {
- tmpCelebornConf.set(entry.getKey(), entry.getValue());
+ if (key.startsWith("celeborn.") || pluginConfNames.contains(key)) {
+ tmpCelebornConf.set(key, entry.getValue());
}
}
@@ -54,12 +69,4 @@ public class FlinkUtils {
public static String toAttemptId(ExecutionAttemptID attemptID) {
return attemptID.toString();
}
-
- public static long byteStringValueAsBytes(Configuration flinkConf,
PluginConf pluginConf) {
- return Utils.byteStringAsBytes(PluginConf.getValue(flinkConf, pluginConf));
- }
-
- public static boolean stringValueAsBoolean(Configuration flinkConf,
PluginConf pluginConf) {
- return Boolean.valueOf(PluginConf.getValue(flinkConf, pluginConf));
- }
}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
deleted file mode 100644
index 871027255..000000000
---
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.plugin.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.celeborn.plugin.flink.config.PluginConf;
-
-public class PluginConfSuiteJ {
- @Test
- public void testCoalesce() {
- Configuration flinkConf = new Configuration();
- Assert.assertEquals("8m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION));
- Assert.assertEquals("8m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_GATE));
- Assert.assertTrue(
- Integer.MAX_VALUE
- == Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS)));
- Assert.assertEquals(
- "64m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION));
- Assert.assertEquals("32m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_INPUT_GATE));
-
- Assert.assertEquals("true", PluginConf.getValue(flinkConf,
PluginConf.ENABLE_DATA_COMPRESSION));
- Assert.assertEquals(
- "LZ4", PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
-
- flinkConf.setString(PluginConf.MIN_MEMORY_PER_PARTITION.name, "16m");
- flinkConf.setString(PluginConf.MIN_MEMORY_PER_GATE.name, "17m");
- flinkConf.setString(PluginConf.NUM_CONCURRENT_READINGS.name, "12323");
- flinkConf.setString(PluginConf.MEMORY_PER_RESULT_PARTITION.name, "1888m");
- flinkConf.setString(PluginConf.MEMORY_PER_INPUT_GATE.name, "176m");
- flinkConf.setString(PluginConf.ENABLE_DATA_COMPRESSION.name, "false");
- flinkConf.setString(PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC.name,
"lz423");
- Assert.assertEquals("16m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION));
- Assert.assertEquals("17m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_GATE));
- Assert.assertTrue(
- 12323
- == Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS)));
- Assert.assertEquals(
- "1888m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION));
- Assert.assertEquals("176m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_INPUT_GATE));
- Assert.assertEquals(
- "false", PluginConf.getValue(flinkConf,
PluginConf.ENABLE_DATA_COMPRESSION));
- Assert.assertEquals(
- "lz423", PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
- }
-}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
new file mode 100644
index 000000000..03568c3f4
--- /dev/null
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class PluginSideConfSuiteJ {
+ @Test
+ public void testCoalesce() {
+ Configuration flinkConf = new Configuration();
+ CelebornConf celebornConf = FlinkUtils.toCelebornConf(flinkConf);
+ Assert.assertEquals(8 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartitionMin());
+ Assert.assertEquals(8 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGateMin());
+ Assert.assertTrue(Integer.MAX_VALUE ==
celebornConf.clientFlinkNumConcurrentReading());
+ Assert.assertEquals(64 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartition());
+ Assert.assertEquals(32 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGate());
+
+ Assert.assertEquals(true,
celebornConf.clientFlinkDataCompressionEnabled());
+ Assert.assertEquals("LZ4", celebornConf.shuffleCompressionCodec().name());
+
+ flinkConf.setString("remote-shuffle.job.min.memory-per-partition", "16m");
+ flinkConf.setString("remote-shuffle.job.min.memory-per-gate", "17m");
+ flinkConf.setString("remote-shuffle.job.concurrent-readings-per-gate",
"12323");
+ flinkConf.setString("remote-shuffle.job.memory-per-partition", "1888m");
+ flinkConf.setString("remote-shuffle.job.memory-per-gate", "176m");
+ flinkConf.setString("remote-shuffle.job.enable-data-compression", "false");
+
flinkConf.setString("remote-shuffle.job.support-floating-buffer-per-input-gate",
"false");
+ flinkConf.setString("remote-shuffle.job.compression.codec", "ZSTD");
+
+ celebornConf = FlinkUtils.toCelebornConf(flinkConf);
+ Assert.assertEquals(16 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartitionMin());
+ Assert.assertEquals(17 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGateMin());
+ Assert.assertTrue(12323 == celebornConf.clientFlinkNumConcurrentReading());
+ Assert.assertEquals(1888 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartition());
+ Assert.assertEquals(176 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGate());
+ Assert.assertEquals(false,
celebornConf.clientFlinkDataCompressionEnabled());
+ Assert.assertEquals("ZSTD", celebornConf.shuffleCompressionCodec().name());
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 3a4b2bd20..7e53e2585 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,11 +32,8 @@ import org.apache.celeborn.common.CelebornConf;
public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGateFactory {
public RemoteShuffleInputGateFactory(
- Configuration flinkConf,
- CelebornConf conf,
- NetworkBufferPool networkBufferPool,
- int networkBufferSize) {
- super(flinkConf, conf, networkBufferPool, networkBufferSize);
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ super(conf, networkBufferPool, networkBufferSize);
}
protected RemoteShuffleInputGate createInputGate(
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 59011c51a..bd3549878 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -37,12 +36,11 @@ public class RemoteShuffleResultPartitionFactory
extends AbstractRemoteShuffleResultPartitionFactory {
public RemoteShuffleResultPartitionFactory(
- Configuration flinkConf,
CelebornConf celebornConf,
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
- super(flinkConf, celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
+ super(celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
}
@Override
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
index 7776572cc..81f6ca5a6 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -19,11 +19,7 @@ package org.apache.celeborn.plugin.flink;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
-import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+import org.apache.flink.runtime.shuffle.*;
public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFactory
implements ShuffleServiceFactory<
@@ -42,17 +38,13 @@ public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFac
initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
RemoteShuffleResultPartitionFactory resultPartitionFactory =
new RemoteShuffleResultPartitionFactory(
- parameters.configuration,
parameters.celebornConf,
parameters.resultPartitionManager,
parameters.networkBufferPool,
parameters.bufferSize);
RemoteShuffleInputGateFactory inputGateFactory =
new RemoteShuffleInputGateFactory(
- parameters.configuration,
- parameters.celebornConf,
- parameters.networkBufferPool,
- parameters.bufferSize);
+ parameters.celebornConf, parameters.networkBufferPool,
parameters.bufferSize);
return new RemoteShuffleEnvironment(
parameters.networkBufferPool,
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
index a4904e516..6768cedb1 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -35,11 +35,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.shuffle.JobShuffleContext;
-import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ProducerDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
-import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.apache.flink.runtime.shuffle.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -47,7 +43,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
+import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class RemoteShuffleMasterTest {
@@ -190,12 +186,12 @@ public class RemoteShuffleMasterTest {
TaskInputsOutputsDescriptor.from(
numberOfInputGateChannels, numbersOfResultSubpartitions,
resultPartitionTypes));
- long numBytesPerGate =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_INPUT_GATE);
+ CelebornConf conf = FlinkUtils.toCelebornConf(configuration);
+
+ long numBytesPerGate = conf.clientFlinkMemoryPerInputGate();
long expectedInput = 2 * numBytesPerGate;
- long numBytesPerResultPartition =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_RESULT_PARTITION);
+ long numBytesPerResultPartition =
conf.clientFlinkMemoryPerResultPartition();
long expectedOutput = 3 * numBytesPerResultPartition;
MemorySize expected = new MemorySize(expectedInput + expectedOutput);
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 2abed4291..a8cf5e645 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,11 +32,8 @@ import org.apache.celeborn.common.CelebornConf;
public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGateFactory {
public RemoteShuffleInputGateFactory(
- Configuration flinkConf,
- CelebornConf conf,
- NetworkBufferPool networkBufferPool,
- int networkBufferSize) {
- super(flinkConf, conf, networkBufferPool, networkBufferSize);
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ super(conf, networkBufferPool, networkBufferSize);
}
protected RemoteShuffleInputGate createInputGate(
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 50e349154..33ded3672 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -37,13 +36,12 @@ public class RemoteShuffleResultPartitionFactory
extends AbstractRemoteShuffleResultPartitionFactory {
public RemoteShuffleResultPartitionFactory(
- Configuration flinkConf,
CelebornConf celebornConf,
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
- super(flinkConf, celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
+ super(celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
}
@Override
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
index 7776572cc..81f6ca5a6 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -19,11 +19,7 @@ package org.apache.celeborn.plugin.flink;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
-import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+import org.apache.flink.runtime.shuffle.*;
public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFactory
implements ShuffleServiceFactory<
@@ -42,17 +38,13 @@ public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFac
initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
RemoteShuffleResultPartitionFactory resultPartitionFactory =
new RemoteShuffleResultPartitionFactory(
- parameters.configuration,
parameters.celebornConf,
parameters.resultPartitionManager,
parameters.networkBufferPool,
parameters.bufferSize);
RemoteShuffleInputGateFactory inputGateFactory =
new RemoteShuffleInputGateFactory(
- parameters.configuration,
- parameters.celebornConf,
- parameters.networkBufferPool,
- parameters.bufferSize);
+ parameters.celebornConf, parameters.networkBufferPool,
parameters.bufferSize);
return new RemoteShuffleEnvironment(
parameters.networkBufferPool,
diff --git
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
index a9b80b0c5..a8607a6d8 100644
---
a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
+++
b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -35,11 +35,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.shuffle.JobShuffleContext;
-import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ProducerDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
-import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.apache.flink.runtime.shuffle.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -47,7 +43,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
+import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class RemoteShuffleMasterTest {
@@ -190,12 +186,12 @@ public class RemoteShuffleMasterTest {
TaskInputsOutputsDescriptor.from(
numberOfInputGateChannels, numbersOfResultSubpartitions,
resultPartitionTypes));
- long numBytesPerGate =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_INPUT_GATE);
+ CelebornConf conf = FlinkUtils.toCelebornConf(configuration);
+
+ long numBytesPerGate = conf.clientFlinkMemoryPerInputGate();
long expectedInput = 2 * numBytesPerGate;
- long numBytesPerResultPartition =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_RESULT_PARTITION);
+ long numBytesPerResultPartition =
conf.clientFlinkMemoryPerResultPartition();
long expectedOutput = 3 * numBytesPerResultPartition;
MemorySize expected = new MemorySize(expectedInput + expectedOutput);
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 89eba2b1a..d13613023 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,11 +32,8 @@ import org.apache.celeborn.common.CelebornConf;
public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGateFactory {
public RemoteShuffleInputGateFactory(
- Configuration flinkConf,
- CelebornConf conf,
- NetworkBufferPool networkBufferPool,
- int networkBufferSize) {
- super(flinkConf, conf, networkBufferPool, networkBufferSize);
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ super(conf, networkBufferPool, networkBufferSize);
}
// For testing.
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index b5584b9f3..b75435b92 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
@@ -37,13 +36,12 @@ public class RemoteShuffleResultPartitionFactory
extends AbstractRemoteShuffleResultPartitionFactory {
public RemoteShuffleResultPartitionFactory(
- Configuration flinkConf,
CelebornConf celebornConf,
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
- super(flinkConf, celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
+ super(celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
}
@Override
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
index 7776572cc..bf95317bc 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -42,17 +42,13 @@ public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFac
initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
RemoteShuffleResultPartitionFactory resultPartitionFactory =
new RemoteShuffleResultPartitionFactory(
- parameters.configuration,
parameters.celebornConf,
parameters.resultPartitionManager,
parameters.networkBufferPool,
parameters.bufferSize);
RemoteShuffleInputGateFactory inputGateFactory =
new RemoteShuffleInputGateFactory(
- parameters.configuration,
- parameters.celebornConf,
- parameters.networkBufferPool,
- parameters.bufferSize);
+ parameters.celebornConf, parameters.networkBufferPool,
parameters.bufferSize);
return new RemoteShuffleEnvironment(
parameters.networkBufferPool,
diff --git
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
index 9a9a242d3..8548ca8af 100644
---
a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
+++
b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -50,7 +50,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.celeborn.plugin.flink.config.PluginConf;
+import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
public class RemoteShuffleMasterTest {
@@ -208,12 +208,12 @@ public class RemoteShuffleMasterTest {
inputPartitionTypes,
resultPartitionTypes));
- long numBytesPerGate =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_INPUT_GATE);
+ CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
+
+ long numBytesPerGate = celebornConf.clientFlinkMemoryPerInputGate();
long expectedInput = 2 * numBytesPerGate;
- long numBytesPerResultPartition =
- FlinkUtils.byteStringValueAsBytes(configuration,
PluginConf.MEMORY_PER_RESULT_PARTITION);
+ long numBytesPerResultPartition =
celebornConf.clientFlinkMemoryPerResultPartition();
long expectedOutput = 3 * numBytesPerResultPartition;
MemorySize expected = new MemorySize(expectedInput + expectedOutput);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 74144a39f..b77d1f895 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -945,6 +945,16 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def testPushSlaveDataTimeout: Boolean =
get(TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
+ def clientFlinkMemoryPerResultPartitionMin: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
+ def clientFlinkMemoryPerResultPartition: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
+ def clientFlinkMemoryPerInputGateMin: Long =
get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
+ def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
+ def clientFlinkNumConcurrentReading: Int =
get(CLIENT_NUM_CONCURRENT_READINGS)
+ def clientFlinkInputGateSupportFloatingBuffer: Boolean =
+ get(CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER)
+ def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
+ get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
+ def clientFlinkDataCompressionEnabled: Boolean =
get(CLIENT_DATA_COMPRESSION_ENABLED)
}
object CelebornConf extends Logging {
@@ -2877,6 +2887,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.client.shuffle.compression.codec")
.withAlternative("celeborn.shuffle.compression.codec")
.withAlternative("rss.client.compression.codec")
+ .withAlternative("remote-shuffle.job.compression.codec")
.categories("client")
.doc("The codec used to compress shuffle data. By default, Celeborn
provides two codecs: `lz4` and `zstd`.")
.version("0.3.0")
@@ -3488,4 +3499,78 @@ object CelebornConf extends Logging {
.doc("Whether to use codegen for columnar-based shuffle.")
.booleanConf
.createWithDefault(false)
+
+ // Flink specific client configurations.
+ val CLIENT_MEMORY_PER_RESULT_PARTITION_MIN: ConfigEntry[Long] =
+ buildConf("celeborn.client.flink.resultPartition.minMemory")
+ .withAlternative("remote-shuffle.job.min.memory-per-partition")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Min memory reserved for a result partition.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8m")
+
+ val CLIENT_MEMORY_PER_INPUT_GATE_MIN: ConfigEntry[Long] =
+ buildConf("celeborn.client.flink.inputGate.minMemory")
+ .withAlternative("remote-shuffle.job.min.memory-per-gate")
+ .categories("client")
+ .doc("Min memory reserved for a input gate.")
+ .version("0.3.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8m")
+
+ val CLIENT_NUM_CONCURRENT_READINGS: ConfigEntry[Int] =
+ buildConf("celeborn.client.flink.inputGate.concurrentReadings")
+ .withAlternative("remote-shuffle.job.concurrent-readings-per-gate")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Max concurrent reading channels for a input gate.")
+ .intConf
+ .createWithDefault(Int.MaxValue)
+
+ val CLIENT_MEMORY_PER_RESULT_PARTITION: ConfigEntry[Long] =
+ buildConf("celeborn.client.flink.resultPartition.memory")
+ .withAlternative("remote-shuffle.job.memory-per-partition")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Memory reserved for a result partition.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("64m")
+
+ val CLIENT_MEMORY_PER_INPUT_GATE: ConfigEntry[Long] =
+ buildConf("celeborn.client.flink.inputGate.memory")
+ .withAlternative("remote-shuffle.job.memory-per-gate")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Memory reserved for a input gate.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32m")
+
+ val CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.flink.inputGate.supportFloatingBuffer")
+
.withAlternative("remote-shuffle.job.support-floating-buffer-per-input-gate")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Whether to support floating buffer in Flink input gates.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CLIENT_DATA_COMPRESSION_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.flink.compression.enabled")
+ .withAlternative("remote-shuffle.job.enable-data-compression")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Whether to compress data in Flink plugin.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.flink.resultPartition.supportFloatingBuffer")
+
.withAlternative("remote-shuffle.job.support-floating-buffer-per-output-gate")
+ .categories("client")
+ .version("0.3.0")
+ .doc("Whether to support floating buffer for result partitions.")
+ .booleanConf
+ .createWithDefault(true)
+
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index b4caf617f..b60fe2243 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -26,6 +26,14 @@ license: |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.3.0 |
| celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of
fetch chunk on each replica | 0.3.0 |
| celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. |
0.3.0 |
+| celeborn.client.flink.compression.enabled | true | Whether to compress data
in Flink plugin. | 0.3.0 |
+| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | Max
concurrent reading channels for a input gate. | 0.3.0 |
+| celeborn.client.flink.inputGate.memory | 32m | Memory reserved for a input
gate. | 0.3.0 |
+| celeborn.client.flink.inputGate.minMemory | 8m | Min memory reserved for a
input gate. | 0.3.0 |
+| celeborn.client.flink.inputGate.supportFloatingBuffer | true | Whether to
support floating buffer in Flink input gates. | 0.3.0 |
+| celeborn.client.flink.resultPartition.memory | 64m | Memory reserved for a
result partition. | 0.3.0 |
+| celeborn.client.flink.resultPartition.minMemory | 8m | Min memory reserved
for a result partition. | 0.3.0 |
+| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | Whether
to support floating buffer for result partitions. | 0.3.0 |
| celeborn.client.push.blacklist.enabled | false | Whether to enable shuffle
client-side push blacklist of workers. | 0.3.0 |
| celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 |
| celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition
buffer memory for shuffle hash writer. The pushed data will be buffered in
memory before sending to Celeborn worker. For performance consideration keep
this buffer size higher than 32K. Example: If reducer amount is 2000, buffer
size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap
memory. | 0.3.0 |