This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new ccc9b1a29d DRILL-8482:Assign region throw exception when some region
is deployed on affinity node and some on non-affinity node (#2885)
ccc9b1a29d is described below
commit ccc9b1a29d4e15e721382073f90b831777215ae7
Author: shfshihuafeng <[email protected]>
AuthorDate: Sun Mar 3 08:30:50 2024 +0800
DRILL-8482:Assign region throw exception when some region is deployed on
affinity node and some on non-affinity node (#2885)
---
.../drill/exec/store/hbase/HBaseGroupScan.java | 2 +-
.../hbase/TestHBaseRegionScanAssignments.java | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 1ad879f7eb..cec24bf0ad 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -267,7 +267,7 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
PriorityQueue<List<HBaseSubScanSpec>> minHeap = new
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR);
PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV);
for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
- if (listOfScan.size() < minPerEndpointSlot) {
+ if (listOfScan.size() < maxPerEndpointSlot) {
minHeap.offer(listOfScan);
} else if (listOfScan.size() > minPerEndpointSlot) {
maxHeap.offer(listOfScan);
diff --git
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
index 216eb9104c..877bf581c0 100644
---
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
+++
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java
@@ -188,6 +188,34 @@ public class TestHBaseRegionScanAssignments extends
BaseHBaseTest {
testParallelizationWidth(scan, endpoints.size());
}
+ @Test
+ public void testHBaseGroupScanAssignmentSomeAfinedAndSomeWithOrphans()
throws Exception {
+ NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[0], splits[1]),
SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[1], splits[2]),
SERVER_A);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[2], splits[3]),
SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[3], splits[4]),
SERVER_B);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[6], splits[7]),
SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[7], splits[8]),
SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[8], splits[9]),
SERVER_D);
+ regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[9], splits[10]),
SERVER_D);
+ final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build());
+
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build());
+
endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build());
+
+ HBaseGroupScan scan = new HBaseGroupScan();
+ scan.setRegionsToScan(regionsToScan);
+ scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME_STR, splits[0],
splits[0], null));
+ scan.applyAssignments(endpoints);
+
+ int i = 0;
+ assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size());
// 'A'
+ assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size());
// 'B'
+ assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size());
// 'C'
+ testParallelizationWidth(scan, i);
+ }
+
@Test
public void testHBaseGroupScanAssignmentOneEach() throws Exception {
NavigableMap<HRegionInfo,ServerName> regionsToScan = Maps.newTreeMap();