This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fb3b0afb85d [opt](task-assignment) use consistent hash as default task
assigner and cache the consistent hash ring (#28522)
fb3b0afb85d is described below
commit fb3b0afb85dccc9ee7964d16ecba9b01d09586b8
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Dec 19 22:29:35 2023 +0800
[opt](task-assignment) use consistent hash as default task assigner and
cache the consistent hash ring (#28522)
1. Use consistent hash algo as the default assigner for file query scan node
A consistent assignment can better utilize the page cache of BE node.
2. Cache the consistent hash ring
Init a consistent hash ring is time-consuming because there a thousands
of virtual node need to be added.
So cache it for better performance
---
.../planner/external/FederationBackendPolicy.java | 63 ++++++++++++++++++++--
.../doris/planner/external/FileQueryScanNode.java | 11 ++--
.../doris/planner/FederationBackendPolicyTest.java | 45 +++++++++++++++-
3 files changed, 107 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index 9e23463235f..d1d7e90d35a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -30,6 +30,9 @@ import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -46,7 +49,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class FederationBackendPolicy {
@@ -59,6 +64,53 @@ public class FederationBackendPolicy {
private int nextBe = 0;
private boolean initialized = false;
+ // Create a ConsistentHash ring may be a time-consuming operation, so we
cache it.
+ private static LoadingCache<HashCacheKey,
ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
+
+ static {
+ consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
+ .build(new CacheLoader<HashCacheKey,
ConsistentHash<TScanRangeLocations, Backend>>() {
+ @Override
+ public ConsistentHash<TScanRangeLocations, Backend>
load(HashCacheKey key) {
+ return new ConsistentHash<>(Hashing.murmur3_128(), new
ScanRangeHash(),
+ new BackendHash(), key.bes,
Config.virtual_node_number);
+ }
+ });
+ }
+
+ private static class HashCacheKey {
+ // sorted backend ids as key
+ private List<Long> beIds;
+ // backends is not part of key, just an attachment
+ private List<Backend> bes;
+
+ HashCacheKey(List<Backend> backends) {
+ this.bes = backends;
+ this.beIds = backends.stream().map(b ->
b.getId()).sorted().collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HashCacheKey)) {
+ return false;
+ }
+ return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(beIds);
+ }
+
+ @Override
+ public String toString() {
+ return "HashCache{" + "beIds=" + beIds + '}';
+ }
+ }
+
public void init() throws UserException {
if (!initialized) {
init(Collections.emptyList());
@@ -96,8 +148,11 @@ public class FederationBackendPolicy {
throw new UserException("No available backends");
}
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
- consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new
ScanRangeHash(),
- new BackendHash(), backends, Config.virtual_node_number);
+ try {
+ consistentHash = consistentHashCache.get(new
HashCacheKey(backends));
+ } catch (ExecutionException e) {
+ throw new UserException("failed to get consistent hash", e);
+ }
}
public Backend getNextBe() {
@@ -111,7 +166,7 @@ public class FederationBackendPolicy {
}
// Try to find a local BE, if not exists, use `getNextBe` instead
- public Backend getNextLocalBe(List<String> hosts) {
+ public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations
scanRangeLocations) {
List<Backend> candidateBackends =
Lists.newArrayListWithCapacity(hosts.size());
for (String host : hosts) {
List<Backend> backends = backendMap.get(host);
@@ -121,7 +176,7 @@ public class FederationBackendPolicy {
}
return CollectionUtils.isEmpty(candidateBackends)
- ? getNextBe()
+ ? getNextConsistentBe(scanRangeLocations)
:
candidateBackends.get(random.nextInt(candidateBackends.size()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index ada9f1fda61..658ac49659b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -311,7 +311,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setProperties(locationProperties);
}
- boolean enableSqlCache =
ConnectContext.get().getSessionVariable().enableFileCache;
boolean enableShortCircuitRead =
HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
@@ -369,14 +368,12 @@ public abstract class FileQueryScanNode extends
FileScanNode {
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
Backend selectedBackend;
- if (enableSqlCache) {
- // Use consistent hash to assign the same scan range into the
same backend among different queries
- selectedBackend =
backendPolicy.getNextConsistentBe(curLocations);
- } else if (enableShortCircuitRead) {
+ if (enableShortCircuitRead) {
// Try to find a local BE if enable hdfs short circuit read
- selectedBackend =
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
+ selectedBackend =
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
} else {
- selectedBackend = backendPolicy.getNextBe();
+ // Use consistent hash to assign the same scan range into the
same backend among different queries
+ selectedBackend =
backendPolicy.getNextConsistentBe(curLocations);
}
setLocationPropertiesIfNecessary(selectedBackend, locationType,
locationProperties);
location.setBackendId(selectedBackend.getId());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 05a7f6985df..ef65d1b6165 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -22,6 +22,11 @@ import org.apache.doris.common.UserException;
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRange;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Stopwatch;
import mockit.Mock;
@@ -93,12 +98,50 @@ public class FederationBackendPolicyTest {
int invokeTimes = 1000000;
Assertions.assertEquals(policy.numBackends(), backendNum);
List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1",
"192.168.1.2");
+ TScanRangeLocations scanRangeLocations =
getScanRangeLocations("path1", 0, 100);
Stopwatch sw = Stopwatch.createStarted();
for (int i = 0; i < invokeTimes; i++) {
-
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
+
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts,
scanRangeLocations).getHost()));
}
sw.stop();
System.out.println("Invoke getNextLocalBe() " + invokeTimes
+ " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "]
ms");
}
+
+ @Test
+ public void testConsistentHash() throws UserException {
+ FederationBackendPolicy policy = new FederationBackendPolicy();
+ policy.init();
+ int backendNum = 200;
+ Assertions.assertEquals(policy.numBackends(), backendNum);
+
+ TScanRangeLocations scanRangeLocations =
getScanRangeLocations("path1", 0, 100);
+ Assertions.assertEquals(39,
policy.getNextConsistentBe(scanRangeLocations).getId());
+
+ scanRangeLocations = getScanRangeLocations("path2", 0, 100);
+ Assertions.assertEquals(78,
policy.getNextConsistentBe(scanRangeLocations).getId());
+ }
+
+ private TScanRangeLocations getScanRangeLocations(String path, long
startOffset, long size) {
+ // Generate on file scan range
+ TFileScanRange fileScanRange = new TFileScanRange();
+ // Scan range
+ TExternalScanRange externalScanRange = new TExternalScanRange();
+ externalScanRange.setFileScanRange(fileScanRange);
+ TScanRange scanRange = new TScanRange();
+ scanRange.setExtScanRange(externalScanRange);
+
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path,
startOffset, size));
+ // Locations
+ TScanRangeLocations locations = new TScanRangeLocations();
+ locations.setScanRange(scanRange);
+ return locations;
+ }
+
+ private TFileRangeDesc createRangeDesc(String path, long startOffset, long
size) {
+ TFileRangeDesc rangeDesc = new TFileRangeDesc();
+ rangeDesc.setPath(path);
+ rangeDesc.setStartOffset(startOffset);
+ rangeDesc.setSize(size);
+ return rangeDesc;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]