This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new da9b818b915 [enchement](mc)mc catalog append netowrk config (#44194)
da9b818b915 is described below
commit da9b818b9156e09b19db02b28521310e56704c58
Author: daidai <[email protected]>
AuthorDate: Fri Dec 6 10:33:19 2024 +0800
[enchement](mc)mc catalog append netowrk config (#44194)
### What problem does this PR solve?
Since there may be some network problems when accessing maxcompute, in
order to provide a better user experience, we introduce some parameters
to help users better adapt to the network.
You can add these parameters when creating a catalog:
`mc.connect_timeout` : Connection timeout, default is 10s.
`mc.read_timeout` : Read timeout,default is 120s.
`mc.retry_count` : Number of retries after failure,default is 4.
### Release note
MaxCompute catalog add three parameters : `mc.connect_timeout` ,
`mc.read_timeout`,`mc.retry_count` help users better adapt to the
network between MaxCompute and Doris.
---
.../exec/format/table/max_compute_jni_reader.cpp | 6 ++-
.../doris/maxcompute/MaxComputeJniScanner.java | 47 +++++++++++++++---
.../maxcompute/MaxComputeExternalCatalog.java | 57 ++++++++++++++++++++++
.../maxcompute/source/MaxComputeScanNode.java | 13 +++++
.../property/constants/MCProperties.java | 8 +++
gensrc/thrift/PlanNodes.thrift | 4 ++
.../test_external_catalog_maxcompute.groovy | 5 +-
7 files changed, 131 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index d1a71fd1a2f..665e19b6bce 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -77,7 +77,11 @@ MaxComputeJniReader::MaxComputeJniReader(const
MaxComputeTableDescriptor* mc_des
{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
- {"columns_types", columns_types.str()}};
+ {"columns_types", columns_types.str()},
+
+ {"connect_timeout",
std::to_string(_max_compute_params.connect_timeout)},
+ {"read_timeout", std::to_string(_max_compute_params.read_timeout)},
+ {"retry_count", std::to_string(_max_compute_params.retry_times)}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params,
column_names);
}
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 6cbed70adc7..d6325bdae46 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -25,6 +25,7 @@ import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.ReaderOptions;
+import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.read.SplitReader;
@@ -67,6 +68,10 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String SCAN_SERIALIZER = "scan_serializer";
private static final String TIME_ZONE = "time_zone";
+ private static final String CONNECT_TIMEOUT = "connect_timeout";
+ private static final String READ_TIMEOUT = "read_timeout";
+ private static final String RETRY_COUNT = "retry_count";
+
private enum SplitType {
BYTE_SIZE,
ROW_OFFSET
@@ -136,16 +141,40 @@ public class MaxComputeJniScanner extends JniScanner {
Credentials credentials =
Credentials.newBuilder().withAccount(odps.getAccount())
.withAppAccount(odps.getAppAccount()).build();
+
+ int connectTimeout = 10; // 10s
+ if (!Strings.isNullOrEmpty(params.get(CONNECT_TIMEOUT))) {
+ connectTimeout = Integer.parseInt(params.get(CONNECT_TIMEOUT));
+ }
+
+ int readTimeout = 120; // 120s
+ if (!Strings.isNullOrEmpty(params.get(READ_TIMEOUT))) {
+ readTimeout = Integer.parseInt(params.get(READ_TIMEOUT));
+ }
+
+ int retryTimes = 4; // 4 times
+ if (!Strings.isNullOrEmpty(params.get(RETRY_COUNT))) {
+ retryTimes = Integer.parseInt(params.get(RETRY_COUNT));
+ }
+
+ RestOptions restOptions = RestOptions.newBuilder()
+ .withConnectTimeout(connectTimeout)
+ .withReadTimeout(readTimeout)
+ .withRetryTimes(retryTimes).build();
+
settings = EnvironmentSettings.newBuilder()
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
+ .withRestOptions(restOptions)
.build();
try {
scan = (TableBatchReadSession) deserialize(scanSerializer);
} catch (Exception e) {
- LOG.info("deserialize TableBatchReadSession failed.", e);
+ String errorMsg = "Failed to deserialize table batch read
session.";
+ LOG.warn(errorMsg, e);
+ throw new IllegalArgumentException(errorMsg, e);
}
}
@@ -176,11 +205,11 @@ public class MaxComputeJniScanner extends JniScanner {
.withReuseBatch(true)
.build());
- } catch (IOException e) {
- LOG.info("createArrowReader failed.", e);
} catch (Exception e) {
+ String errorMsg = "MaxComputeJniScanner Failed to open table batch
read session.";
+ LOG.warn(errorMsg, e);
close();
- throw new IOException(e.getMessage(), e);
+ throw new IOException(errorMsg, e);
}
}
@@ -215,8 +244,9 @@ public class MaxComputeJniScanner extends JniScanner {
break;
}
} catch (Exception e) {
- LOG.info("currentSplitReader hasNext fail", e);
- break;
+ String errorMsg = "MaxComputeJniScanner readVectors hasNext
fail";
+ LOG.warn(errorMsg, e);
+ throw new IOException(e.getMessage(), e);
}
try {
@@ -241,7 +271,10 @@ public class MaxComputeJniScanner extends JniScanner {
}
curReadRows += batchRows;
} catch (Exception e) {
- throw new RuntimeException("Fail to read arrow data, reason: "
+ e.getMessage(), e);
+ String errorMsg = String.format("MaxComputeJniScanner Fail to
read arrow data. "
+ + "curReadRows = {}, expectedRows = {}", curReadRows,
expectedRows);
+ LOG.warn(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
}
}
return curReadRows;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index e6cd77103db..06c1e55dcf6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.Project;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
+import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
@@ -71,6 +72,10 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
private long splitRowCount;
private long splitByteSize;
+ private int connectTimeout;
+ private int readTimeout;
+ private int retryTimes;
+
private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
@@ -178,6 +183,17 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
.build();
}
+ connectTimeout = Integer.parseInt(
+ props.getOrDefault(MCProperties.CONNECT_TIMEOUT,
MCProperties.DEFAULT_CONNECT_TIMEOUT));
+ readTimeout = Integer.parseInt(
+ props.getOrDefault(MCProperties.READ_TIMEOUT,
MCProperties.DEFAULT_READ_TIMEOUT));
+ retryTimes = Integer.parseInt(
+ props.getOrDefault(MCProperties.RETRY_COUNT,
MCProperties.DEFAULT_RETRY_COUNT));
+
+ RestOptions restOptions = RestOptions.newBuilder()
+ .withConnectTimeout(connectTimeout)
+ .withReadTimeout(readTimeout)
+ .withRetryTimes(retryTimes).build();
CloudCredential credential = MCProperties.getCredential(props);
accessKey = credential.getAccessKey();
@@ -196,6 +212,7 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
+ .withRestOptions(restOptions)
.build();
}
@@ -304,6 +321,21 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
return defaultProject;
}
+ public int getRetryTimes() {
+ makeSureInitialized();
+ return retryTimes;
+ }
+
+ public int getConnectTimeout() {
+ makeSureInitialized();
+ return connectTimeout;
+ }
+
+ public int getReadTimeout() {
+ makeSureInitialized();
+ return readTimeout;
+ }
+
public ZoneId getProjectDateTimeZone() {
makeSureInitialized();
@@ -385,6 +417,31 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
+ MCProperties.SPLIT_ROW_COUNT + "must be an integer");
}
+
+ try {
+ connectTimeout = Integer.parseInt(
+ props.getOrDefault(MCProperties.CONNECT_TIMEOUT,
MCProperties.DEFAULT_CONNECT_TIMEOUT));
+ readTimeout = Integer.parseInt(
+ props.getOrDefault(MCProperties.READ_TIMEOUT,
MCProperties.DEFAULT_READ_TIMEOUT));
+ retryTimes = Integer.parseInt(
+ props.getOrDefault(MCProperties.RETRY_COUNT,
MCProperties.DEFAULT_RETRY_COUNT));
+ if (connectTimeout <= 0) {
+ throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must
be greater than 0");
+ }
+
+ if (readTimeout <= 0) {
+ throw new DdlException(MCProperties.READ_TIMEOUT + " must be
greater than 0");
+ }
+
+ if (retryTimes <= 0) {
+ throw new DdlException(MCProperties.RETRY_COUNT + " must be
greater than 0");
+ }
+
+ } catch (NumberFormatException e) {
+ throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT
+ "/"
+ + MCProperties.READ_TIMEOUT + "/" +
MCProperties.RETRY_COUNT + "must be an integer");
+ }
+
CloudCredential credential = MCProperties.getCredential(props);
if (!credential.isWhole()) {
throw new DdlException("Max-Compute credential properties '"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index e177e9d8b7c..4ad971a5c64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -89,6 +89,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static final LocationPath ROW_OFFSET_PATH = new
LocationPath("/row_offset", Maps.newHashMap());
private static final LocationPath BYTE_SIZE_PATH = new
LocationPath("/byte_size", Maps.newHashMap());
+ private int connectTimeout;
+ private int readTimeout;
+ private int retryTimes;
+
@Setter
private SelectedPartitions selectedPartitions = null;
@@ -127,6 +131,11 @@ public class MaxComputeScanNode extends FileQueryScanNode {
fileDesc.setPartitionSpec("deprecated");
fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
fileDesc.setSessionId(maxComputeSplit.getSessionId());
+
+ fileDesc.setReadTimeout(readTimeout);
+ fileDesc.setConnectTimeout(connectTimeout);
+ fileDesc.setRetryTimes(retryTimes);
+
tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " +
maxComputeSplit.getLength() + " ]");
@@ -477,6 +486,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
table.getCatalog();
+ readTimeout = mcCatalog.getReadTimeout();
+ connectTimeout = mcCatalog.getConnectTimeout();
+ retryTimes = mcCatalog.getRetryTimes();
+
if
(mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY))
{
for (com.aliyun.odps.table.read.split.InputSplit split :
assigner.getAllSplits()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
index 20a77574fc7..efbd01c1477 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -56,6 +56,14 @@ public class MCProperties extends BaseProperties {
public static final String SPLIT_ROW_COUNT = "mc.split_row_count";
public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 *
4096
+ public static final String CONNECT_TIMEOUT = "mc.connect_timeout";
+ public static final String READ_TIMEOUT = "mc.read_timeout";
+ public static final String RETRY_COUNT = "mc.retry_count";
+
+ public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
+ public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
+ public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
+
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY,
SESSION_TOKEN);
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 77d9c0ef672..bd8c43622d1 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -353,6 +353,10 @@ struct TMaxComputeFileDesc {
1: optional string partition_spec // deprecated
2: optional string session_id
3: optional string table_batch_read_session
+ // for mc network configuration
+ 4: optional i32 connect_timeout
+ 5: optional i32 read_timeout
+ 6: optional i32 retry_times
}
struct THudiFileDesc {
diff --git
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
index 3f0929b59ea..81133270fb6 100644
---
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
+++
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -388,7 +388,10 @@ suite("test_external_catalog_maxcompute",
"p2,external,maxcompute,external_remot
order_qt_multi_partition_q6 """ select max(pt), yy, mm from
multi_partitions where yy = '2023' and mm='08' group by yy, mm order by yy, mm;
"""
order_qt_multi_partition_q7 """ select count(*) from multi_partitions
where yy < '2023' or dd < '03'; """
order_qt_multi_partition_q8 """ select count(*) from multi_partitions
where pt>=3; """
- order_qt_multi_partition_q9 """ select
city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd
from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null;
"""
+
+ //`finished_time is not null` => com.aliyun.odps.OdpsException:
ODPS-0010000:System internal error - fuxi job failed, caused by: timestamp_ntz
+ // order_qt_multi_partition_q9 """ select
city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd
from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null;
"""
+
order_qt_multi_partition_q10 """ select pt, yy, mm, dd from
multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by
pt, yy, mm, dd; """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]