This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1b534553 [BUG][AQE][LocalOrder] Fix potenial bug when merging
continuous segments (#318)
1b534553 is described below
commit 1b534553d49514080ed7e1149214382b1c9e4e41
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Nov 13 19:10:29 2022 +0800
[BUG][AQE][LocalOrder] Fix potenial bug when merging continuous segments
(#318)
### What changes were proposed in this pull request?
1. Fix potenial bug when merging continuous segments
2. Show more details when throwing discontinuous blocks to better debug
### Why are the changes needed?
In current codebase, while expected task id is 230 but splitting the index
taskId seq of (229, 230, 221, 229, 230), it will throw exception of
discontinuous check.
This PR is to fix this
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../common/segment/LocalOrderSegmentSplitter.java | 26 ++++++++++++++-
.../segment/LocalOrderSegmentSplitterTest.java | 39 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 0a4a669d..fd5f9091 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -19,10 +19,14 @@ package org.apache.uniffle.common.segment;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
@@ -43,6 +47,7 @@ import org.apache.uniffle.common.exception.RssException;
* the shuffle server.
*/
public class LocalOrderSegmentSplitter implements SegmentSplitter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LocalOrderSegmentSplitter.class);
private Roaring64NavigableMap expectTaskIds;
private int readBufferSize;
@@ -72,6 +77,8 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
long lastTaskAttemptId = -1;
long lastExpectedBlockIndex = -1;
+ List<Long> indexTaskIds = new ArrayList<>();
+
/**
* One ShuffleDataSegment should meet following requirements:
*
@@ -90,6 +97,8 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
long blockId = byteBuffer.getLong();
long taskAttemptId = byteBuffer.getLong();
+ indexTaskIds.add(taskAttemptId);
+
if (lastTaskAttemptId == -1) {
lastTaskAttemptId = taskAttemptId;
}
@@ -100,7 +109,9 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
break;
}
- if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0 &&
index - lastExpectedBlockIndex != 1)
+ if ((taskAttemptId < lastTaskAttemptId
+ && bufferSegments.size() > 0
+ && (expectTaskIds.contains(taskAttemptId) ? index -
lastExpectedBlockIndex != 1 : true))
|| bufferOffset >= readBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
dataFileSegments.add(sds);
@@ -111,6 +122,10 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
if (expectTaskIds.contains(taskAttemptId)) {
if (bufferOffset != 0 && index - lastExpectedBlockIndex > 1) {
+ List<Long> expectedTaskIds = getExpectedTaskIds(expectTaskIds);
+ LOGGER.error("There are discontinuous blocks, all task ids in
index file: {}, all expected task ids: {}, "
+ + "current expected task id: {}, last unexpected task id:
{}, current data segment size: {}",
+ indexTaskIds, expectedTaskIds, taskAttemptId,
lastTaskAttemptId, dataFileSegments.size());
throw new RssException("There are discontinuous blocks which
should not happen when using LOCAL_ORDER.");
}
@@ -135,4 +150,13 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
}
return dataFileSegments;
}
+
+ private List<Long> getExpectedTaskIds(Roaring64NavigableMap expectTaskIds) {
+ List<Long> taskIds = new ArrayList<>();
+ LongIterator iterator = expectTaskIds.getLongIterator();
+ while (iterator.hasNext()) {
+ taskIds.add(iterator.next());
+ }
+ return taskIds;
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index 4df956d6..77b3e2cf 100644
---
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -196,6 +196,45 @@ public class LocalOrderSegmentSplitterTest {
assertEquals(74, dataSegments.get(1).getOffset());
assertEquals(6, dataSegments.get(1).getLength());
+
+ /**
+ * case4
+ */
+ data = generateData(
+ Pair.of(16, 229),
+ Pair.of(16, 230),
+ Pair.of(16, 221),
+ Pair.of(16, 229),
+ Pair.of(16, 230)
+ );
+ taskIds = Roaring64NavigableMap.bitmapOf(230);
+ dataSegments = new LocalOrderSegmentSplitter(taskIds, 10000).split(new
ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments.size());
+ assertEquals(16, dataSegments.get(0).getOffset());
+ assertEquals(16, dataSegments.get(0).getLength());
+ assertEquals(64, dataSegments.get(1).getOffset());
+ assertEquals(16, dataSegments.get(1).getLength());
+
+ /**
+ * case5
+ */
+ data = generateData(
+ Pair.of(1, 2),
+ Pair.of(1, 3),
+ Pair.of(1, 4),
+ Pair.of(1, 5),
+ Pair.of(1, 6),
+ Pair.of(1, 4),
+ Pair.of(1, 5),
+ Pair.of(1, 6)
+ );
+ taskIds = Roaring64NavigableMap.bitmapOf(2, 3, 4);
+ dataSegments = new LocalOrderSegmentSplitter(taskIds, 10000).split(new
ShuffleIndexResult(data, -1));
+ assertEquals(2, dataSegments.size());
+ assertEquals(0, dataSegments.get(0).getOffset());
+ assertEquals(3, dataSegments.get(0).getLength());
+ assertEquals(5, dataSegments.get(1).getOffset());
+ assertEquals(1, dataSegments.get(1).getLength());
}
public static byte[] generateData(Pair<Integer, Integer>... configEntries) {