This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 140ab60a74 [Enhancement](multi-catalog) add a BE selection strategy
for hdfs short-circuit-read. (#22697)
140ab60a74 is described below
commit 140ab60a749614c739851ad23391c25d3c277378
Author: Xiangyu Wang <[email protected]>
AuthorDate: Tue Aug 15 15:34:39 2023 +0800
[Enhancement](multi-catalog) add a BE selection strategy for hdfs
short-circuit-read. (#22697)
Sometimes the BEs will be deployed on the same node with DataNode, so we
can use a more reasonable BE selection policy to use the hdfs
short-circuit-read as much as possible.
---
.../org/apache/doris/catalog/HdfsResource.java | 7 +-
.../planner/external/FederationBackendPolicy.java | 28 +++++-
.../doris/planner/external/FileQueryScanNode.java | 18 +++-
.../doris/planner/FederationBackendPolicyTest.java | 104 +++++++++++++++++++++
4 files changed, 151 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index 9735f2f059..1f87f93e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -75,7 +75,7 @@ public class HdfsResource extends Resource {
protected void setProperties(Map<String, String> properties) throws
DdlException {
// `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should
be both set to enable short circuit read.
// We should disable short circuit read if they are not both set
because it will cause performance down.
- if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) ||
!properties.containsKey(HADOOP_SOCKET_PATH)) {
+ if (!(enableShortCircuitRead(properties))) {
properties.put(HADOOP_SHORT_CIRCUIT, "false");
}
this.properties = properties;
@@ -94,6 +94,11 @@ public class HdfsResource extends Resource {
}
}
+ public static boolean enableShortCircuitRead(Map<String, String>
properties) {
+ return
"true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false"))
+ && properties.containsKey(HADOOP_SOCKET_PATH);
+ }
+
// Will be removed after BE unified storage params
public static THdfsParams generateHdfsParam(Map<String, String>
properties) {
THdfsParams tHdfsParams = new THdfsParams();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index daa1b151ea..eb2545affd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -31,21 +31,29 @@ import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.stream.Collectors;
public class FederationBackendPolicy {
private static final Logger LOG =
LogManager.getLogger(FederationBackendPolicy.class);
private final List<Backend> backends = Lists.newArrayList();
+ private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
+ private final Random random = new Random(System.currentTimeMillis());
private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
private int nextBe = 0;
@@ -87,6 +95,7 @@ public class FederationBackendPolicy {
if (backends.isEmpty()) {
throw new UserException("No available backends");
}
+
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
int virtualNumber = Math.max(Math.min(512 / backends.size(), 32), 2);
consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new
ScanRangeHash(),
new BackendHash(), backends, virtualNumber);
@@ -102,12 +111,27 @@ public class FederationBackendPolicy {
return consistentHash.getNode(scanRangeLocations);
}
+ // Try to find a local BE, if not exists, use `getNextBe` instead
+ public Backend getNextLocalBe(List<String> hosts) {
+ List<Backend> candidateBackends =
Lists.newArrayListWithCapacity(hosts.size());
+ for (String host : hosts) {
+ List<Backend> backends = backendMap.get(host);
+ if (CollectionUtils.isNotEmpty(backends)) {
+
candidateBackends.add(backends.get(random.nextInt(backends.size())));
+ }
+ }
+
+ return CollectionUtils.isEmpty(candidateBackends)
+ ? getNextBe()
+ :
candidateBackends.get(random.nextInt(candidateBackends.size()));
+ }
+
public int numBackends() {
return backends.size();
}
- public List<Backend> getBackends() {
- return backends;
+ public Collection<Backend> getBackends() {
+ return CollectionUtils.unmodifiableCollection(backends);
}
private static class BackendHash implements Funnel<Backend> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index db8f835abb..adbbbfaeaf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -74,6 +74,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -271,6 +272,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setProperties(locationProperties);
}
+ boolean enableSqlCache =
ConnectContext.get().getSessionVariable().enableFileCache;
+ boolean enableShortCircuitRead =
HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
@@ -312,6 +315,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
+
// external data lake table
if (fileSplit instanceof IcebergSplit) {
// TODO: extract all data lake split to factory
@@ -322,11 +326,19 @@ public abstract class FileQueryScanNode extends
FileScanNode {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
}
+ Backend selectedBackend;
+ if (enableSqlCache) {
+ // Use consistent hash to assign the same scan range into the
same backend among different queries
+ selectedBackend =
backendPolicy.getNextConsistentBe(curLocations);
+ } else if (enableShortCircuitRead) {
+ // Try to find a local BE if enable hdfs short circuit read
+ selectedBackend =
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
+ } else {
+ selectedBackend = backendPolicy.getNextBe();
+ }
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
- // Use consistent hash to assign the same scan range into the same
backend among different queries
- Backend selectedBackend =
ConnectContext.get().getSessionVariable().enableFileCache
- ? backendPolicy.getNextConsistentBe(curLocations) :
backendPolicy.getNextBe();
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getHost(),
selectedBackend.getBePort()));
curLocations.addToLocations(location);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
new file mode 100644
index 0000000000..05a7f6985d
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Stopwatch;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FederationBackendPolicyTest {
+ @Mocked
+ private Env env;
+
+ @Before
+ public void setUp() {
+
+ SystemInfoService service = new SystemInfoService();
+
+ for (int i = 0; i < 190; i++) {
+ Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i,
9050);
+ backend.setAlive(true);
+ service.addBackend(backend);
+ }
+ for (int i = 0; i < 10; i++) {
+ Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1."
+ i, 9051);
+ backend.setAlive(true);
+ service.addBackend(backend);
+ }
+ for (int i = 0; i < 10; i++) {
+ Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2."
+ i, 9050);
+ backend.setAlive(false);
+ service.addBackend(backend);
+ }
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ }
+
+ @Test
+ public void testGetNextBe() throws UserException {
+ FederationBackendPolicy policy = new FederationBackendPolicy();
+ policy.init();
+ int backendNum = 200;
+ int invokeTimes = 1000000;
+ Assertions.assertEquals(policy.numBackends(), backendNum);
+ Stopwatch sw = Stopwatch.createStarted();
+ for (int i = 0; i < invokeTimes; i++) {
+
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
+ }
+ sw.stop();
+ System.out.println("Invoke getNextBe() " + invokeTimes
+ + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "]
ms");
+ }
+
+ @Test
+ public void testGetNextLocalBe() throws UserException {
+ FederationBackendPolicy policy = new FederationBackendPolicy();
+ policy.init();
+ int backendNum = 200;
+ int invokeTimes = 1000000;
+ Assertions.assertEquals(policy.numBackends(), backendNum);
+ List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1",
"192.168.1.2");
+ Stopwatch sw = Stopwatch.createStarted();
+ for (int i = 0; i < invokeTimes; i++) {
+
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
+ }
+ sw.stop();
+ System.out.println("Invoke getNextLocalBe() " + invokeTimes
+ + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "]
ms");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]