This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d1a3bd14 [#647] fix: Multiple coordinator produce conflicts when they
delect the same file (#648)
d1a3bd14 is described below
commit d1a3bd14b3428385bc03fac79118afb85fa2be3f
Author: roryqi <[email protected]>
AuthorDate: Thu Feb 23 09:35:06 2023 +0800
[#647] fix: Multiple coordinator produce conflicts when they delect the
same file (#648)
### What changes were proposed in this pull request?
Add coordinator's id to the file which Coordinator delect.
### Why are the changes needed?
Fix #647
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../java/org/apache/uniffle/coordinator/CoordinatorServer.java | 3 +++
.../strategy/storage/AbstractSelectStorageStrategy.java | 8 ++++++++
.../strategy/storage/AppBalanceSelectStorageStrategy.java | 2 +-
.../strategy/storage/LowestIOSampleCostSelectStorageStrategy.java | 2 +-
.../org/apache/uniffle/coordinator/util/CoordinatorUtils.java | 2 ++
.../strategy/storage/AppBalanceSelectStorageStrategyTest.java | 4 ++++
.../storage/LowestIOSampleCostSelectStorageStrategyTest.java | 4 ++++
7 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 671055e7..a946062a 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -40,6 +40,7 @@ import
org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -161,6 +162,8 @@ public class CoordinatorServer extends ReconfigurableBase {
}
SecurityContextFactory.get().init(securityConfig);
+ coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
+
// load default hadoop configuration
Configuration hadoopConf = new Configuration();
ClusterManagerFactory clusterManagerFactory = new
ClusterManagerFactory(coordinatorConf, hadoopConf);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index a6564ce2..b4f7c699 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator.strategy.storage;
import java.io.IOException;
import java.util.Map;
+import java.util.UUID;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
/**
* This is a simple implementation class, which provides some methods to check
whether the path is normal
@@ -38,12 +40,14 @@ public abstract class AbstractSelectStorageStrategy
implements SelectStorageStra
*/
protected final Map<String, RankValue> remoteStoragePathRankValue;
protected final int fileSize;
+ private final String coordinatorId;
public AbstractSelectStorageStrategy(
Map<String, RankValue> remoteStoragePathRankValue,
CoordinatorConf conf) {
this.remoteStoragePathRankValue = remoteStoragePathRankValue;
fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID,
UUID.randomUUID().toString());
}
public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
@@ -71,4 +75,8 @@ public abstract class AbstractSelectStorageStrategy
implements SelectStorageStra
} while (readBytes != -1);
}
}
+
+ String getCoordinatorId() {
+ return coordinatorId;
+ }
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index 94e421cf..d2932256 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -89,7 +89,7 @@ public class AppBalanceSelectStorageStrategy extends
AbstractSelectStorageStrate
RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
rankValue.setHealthy(new AtomicBoolean(true));
Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest";
+ String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
Path testPath = new Path(rssTest);
try {
FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index f3d9d61f..7414e4b2 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -102,7 +102,7 @@ public class LowestIOSampleCostSelectStorageStrategy
extends AbstractSelectStora
for (Map.Entry<String, RankValue> uri : uris) {
if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest";
+ String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
Path testPath = new Path(rssTest);
RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
rankValue.setHealthy(new AtomicBoolean(true));
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index c1247e30..ce18e805 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -38,6 +38,8 @@ public class CoordinatorUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorUtils.class);
+ public static final String COORDINATOR_ID = "coordinator.id";
+
public static GetShuffleAssignmentsResponse toGetShuffleAssignmentsResponse(
PartitionRangeAssignment pra) {
List<RssProtos.PartitionRangeAssignment> praList =
pra.convertToGrpcProto();
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index d5a8ac1d..e30ef2e6 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +61,7 @@ public class AppBalanceSelectStorageStrategyTest {
conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY,
APP_BALANCE);
conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
1000);
+ conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
applicationManager = new ApplicationManager(conf);
// to ensure that the reading and writing of hdfs can be controlled
applicationManager.closeDetectStorageScheduler();
@@ -174,5 +176,7 @@ public class AppBalanceSelectStorageStrategyTest {
assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
assertFalse(applicationManager.hasErrorInStatusCheck());
+ assertEquals("TESTXXX",
+
((AppBalanceSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 5dae0b4c..0a383b1c 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,6 +64,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
1000);
conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY,
IO_SAMPLE);
+ conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
applicationManager = new ApplicationManager(conf);
selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy)
applicationManager.getSelectStorageStrategy();
// to ensure that the reading and writing of hdfs can be controlled
@@ -191,5 +193,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
assertFalse(applicationManager.hasErrorInStatusCheck());
+ assertEquals("TESTXXX",
+
((LowestIOSampleCostSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
}
}