This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new da9b818b915 [enchement](mc)mc catalog append netowrk config (#44194)
da9b818b915 is described below

commit da9b818b9156e09b19db02b28521310e56704c58
Author: daidai <changyu...@selectdb.com>
AuthorDate: Fri Dec 6 10:33:19 2024 +0800

    [enchement](mc)mc catalog append netowrk config (#44194)
    
    ### What problem does this PR solve?
    
    Since there may be some network problems when accessing maxcompute, in
    order to provide a better user experience, we introduce some parameters
    to help users better adapt to the network.
    
    You can add these parameters when creating a catalog:
    `mc.connect_timeout` :  Connection timeout, default is 10s.
    `mc.read_timeout` : Read timeout,default is 120s.
    `mc.retry_count` : Number of retries after failure,default is 4.
    
    ### Release note
    
    MaxCompute catalog add three parameters : `mc.connect_timeout` ,
    `mc.read_timeout`,`mc.retry_count` help users better adapt to the
    network between MaxCompute and Doris.
---
 .../exec/format/table/max_compute_jni_reader.cpp   |  6 ++-
 .../doris/maxcompute/MaxComputeJniScanner.java     | 47 +++++++++++++++---
 .../maxcompute/MaxComputeExternalCatalog.java      | 57 ++++++++++++++++++++++
 .../maxcompute/source/MaxComputeScanNode.java      | 13 +++++
 .../property/constants/MCProperties.java           |  8 +++
 gensrc/thrift/PlanNodes.thrift                     |  4 ++
 .../test_external_catalog_maxcompute.groovy        |  5 +-
 7 files changed, 131 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp 
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index d1a71fd1a2f..665e19b6bce 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -77,7 +77,11 @@ MaxComputeJniReader::MaxComputeJniReader(const 
MaxComputeTableDescriptor* mc_des
             {"start_offset", std::to_string(_range.start_offset)},
             {"split_size", std::to_string(_range.size)},
             {"required_fields", required_fields.str()},
-            {"columns_types", columns_types.str()}};
+            {"columns_types", columns_types.str()},
+
+            {"connect_timeout", 
std::to_string(_max_compute_params.connect_timeout)},
+            {"read_timeout", std::to_string(_max_compute_params.read_timeout)},
+            {"retry_count", std::to_string(_max_compute_params.retry_times)}};
     _jni_connector = std::make_unique<JniConnector>(
             "org/apache/doris/maxcompute/MaxComputeJniScanner", params, 
column_names);
 }
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 6cbed70adc7..d6325bdae46 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -25,6 +25,7 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.table.configuration.CompressionCodec;
 import com.aliyun.odps.table.configuration.ReaderOptions;
+import com.aliyun.odps.table.configuration.RestOptions;
 import com.aliyun.odps.table.enviroment.Credentials;
 import com.aliyun.odps.table.enviroment.EnvironmentSettings;
 import com.aliyun.odps.table.read.SplitReader;
@@ -67,6 +68,10 @@ public class MaxComputeJniScanner extends JniScanner {
     private static final String SCAN_SERIALIZER = "scan_serializer";
     private static final String TIME_ZONE = "time_zone";
 
+    private static final String CONNECT_TIMEOUT = "connect_timeout";
+    private static final String READ_TIMEOUT = "read_timeout";
+    private static final String RETRY_COUNT  = "retry_count";
+
     private enum SplitType {
         BYTE_SIZE,
         ROW_OFFSET
@@ -136,16 +141,40 @@ public class MaxComputeJniScanner extends JniScanner {
         Credentials credentials = 
Credentials.newBuilder().withAccount(odps.getAccount())
                 .withAppAccount(odps.getAppAccount()).build();
 
+
+        int connectTimeout = 10; // 10s
+        if (!Strings.isNullOrEmpty(params.get(CONNECT_TIMEOUT))) {
+            connectTimeout = Integer.parseInt(params.get(CONNECT_TIMEOUT));
+        }
+
+        int readTimeout = 120; // 120s
+        if (!Strings.isNullOrEmpty(params.get(READ_TIMEOUT))) {
+            readTimeout =  Integer.parseInt(params.get(READ_TIMEOUT));
+        }
+
+        int retryTimes = 4; // 4 times
+        if (!Strings.isNullOrEmpty(params.get(RETRY_COUNT))) {
+            retryTimes = Integer.parseInt(params.get(RETRY_COUNT));
+        }
+
+        RestOptions restOptions = RestOptions.newBuilder()
+                .withConnectTimeout(connectTimeout)
+                .withReadTimeout(readTimeout)
+                .withRetryTimes(retryTimes).build();
+
         settings = EnvironmentSettings.newBuilder()
                 .withCredentials(credentials)
                 .withServiceEndpoint(odps.getEndpoint())
                 .withQuotaName(quota)
+                .withRestOptions(restOptions)
                 .build();
 
         try {
             scan = (TableBatchReadSession) deserialize(scanSerializer);
         } catch (Exception e) {
-            LOG.info("deserialize TableBatchReadSession failed.", e);
+            String errorMsg = "Failed to deserialize table batch read 
session.";
+            LOG.warn(errorMsg, e);
+            throw new IllegalArgumentException(errorMsg, e);
         }
     }
 
@@ -176,11 +205,11 @@ public class MaxComputeJniScanner extends JniScanner {
                     .withReuseBatch(true)
                     .build());
 
-        } catch (IOException e) {
-            LOG.info("createArrowReader failed.", e);
         } catch (Exception e) {
+            String errorMsg = "MaxComputeJniScanner Failed to open table batch 
read session.";
+            LOG.warn(errorMsg, e);
             close();
-            throw new IOException(e.getMessage(), e);
+            throw new IOException(errorMsg, e);
         }
     }
 
@@ -215,8 +244,9 @@ public class MaxComputeJniScanner extends JniScanner {
                     break;
                 }
             } catch (Exception e) {
-                LOG.info("currentSplitReader hasNext fail", e);
-                break;
+                String errorMsg = "MaxComputeJniScanner readVectors hasNext 
fail";
+                LOG.warn(errorMsg, e);
+                throw new IOException(e.getMessage(), e);
             }
 
             try {
@@ -241,7 +271,10 @@ public class MaxComputeJniScanner extends JniScanner {
                 }
                 curReadRows += batchRows;
             } catch (Exception e) {
-                throw new RuntimeException("Fail to read arrow data, reason: " 
+ e.getMessage(), e);
+                String errorMsg = String.format("MaxComputeJniScanner Fail to 
read arrow data. "
+                        + "curReadRows = {}, expectedRows = {}", curReadRows, 
expectedRows);
+                LOG.warn(errorMsg, e);
+                throw new RuntimeException(errorMsg, e);
             }
         }
         return curReadRows;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
index e6cd77103db..06c1e55dcf6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java
@@ -33,6 +33,7 @@ import com.aliyun.odps.Project;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.security.SecurityManager;
+import com.aliyun.odps.table.configuration.RestOptions;
 import com.aliyun.odps.table.configuration.SplitOptions;
 import com.aliyun.odps.table.enviroment.Credentials;
 import com.aliyun.odps.table.enviroment.EnvironmentSettings;
@@ -71,6 +72,10 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
     private long splitRowCount;
     private long splitByteSize;
 
+    private int connectTimeout;
+    private int readTimeout;
+    private int retryTimes;
+
     private static final Map<String, ZoneId> REGION_ZONE_MAP;
     private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
             MCProperties.PROJECT,
@@ -178,6 +183,17 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
                     .build();
         }
 
+        connectTimeout = Integer.parseInt(
+                props.getOrDefault(MCProperties.CONNECT_TIMEOUT, 
MCProperties.DEFAULT_CONNECT_TIMEOUT));
+        readTimeout = Integer.parseInt(
+                props.getOrDefault(MCProperties.READ_TIMEOUT, 
MCProperties.DEFAULT_READ_TIMEOUT));
+        retryTimes = Integer.parseInt(
+                props.getOrDefault(MCProperties.RETRY_COUNT, 
MCProperties.DEFAULT_RETRY_COUNT));
+
+        RestOptions restOptions = RestOptions.newBuilder()
+                .withConnectTimeout(connectTimeout)
+                .withReadTimeout(readTimeout)
+                .withRetryTimes(retryTimes).build();
 
         CloudCredential credential = MCProperties.getCredential(props);
         accessKey = credential.getAccessKey();
@@ -196,6 +212,7 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
                 .withCredentials(credentials)
                 .withServiceEndpoint(odps.getEndpoint())
                 .withQuotaName(quota)
+                .withRestOptions(restOptions)
                 .build();
     }
 
@@ -304,6 +321,21 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
         return defaultProject;
     }
 
+    public int getRetryTimes() {
+        makeSureInitialized();
+        return retryTimes;
+    }
+
+    public int getConnectTimeout() {
+        makeSureInitialized();
+        return connectTimeout;
+    }
+
+    public int getReadTimeout() {
+        makeSureInitialized();
+        return readTimeout;
+    }
+
     public ZoneId getProjectDateTimeZone() {
         makeSureInitialized();
 
@@ -385,6 +417,31 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
                     + MCProperties.SPLIT_ROW_COUNT + "must be an integer");
         }
 
+
+        try {
+            connectTimeout = Integer.parseInt(
+                    props.getOrDefault(MCProperties.CONNECT_TIMEOUT, 
MCProperties.DEFAULT_CONNECT_TIMEOUT));
+            readTimeout = Integer.parseInt(
+                    props.getOrDefault(MCProperties.READ_TIMEOUT, 
MCProperties.DEFAULT_READ_TIMEOUT));
+            retryTimes = Integer.parseInt(
+                    props.getOrDefault(MCProperties.RETRY_COUNT, 
MCProperties.DEFAULT_RETRY_COUNT));
+            if (connectTimeout <= 0) {
+                throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must 
be greater than 0");
+            }
+
+            if (readTimeout <= 0) {
+                throw new DdlException(MCProperties.READ_TIMEOUT + " must be 
greater than 0");
+            }
+
+            if (retryTimes <= 0) {
+                throw new DdlException(MCProperties.RETRY_COUNT + " must be 
greater than 0");
+            }
+
+        } catch (NumberFormatException e) {
+            throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT 
+ "/"
+                    + MCProperties.READ_TIMEOUT + "/" + 
MCProperties.RETRY_COUNT + "must be an integer");
+        }
+
         CloudCredential credential = MCProperties.getCredential(props);
         if (!credential.isWhole()) {
             throw new DdlException("Max-Compute credential properties '"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index e177e9d8b7c..4ad971a5c64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -89,6 +89,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
     private static final LocationPath ROW_OFFSET_PATH = new 
LocationPath("/row_offset", Maps.newHashMap());
     private static final LocationPath BYTE_SIZE_PATH = new 
LocationPath("/byte_size", Maps.newHashMap());
 
+    private int connectTimeout;
+    private int readTimeout;
+    private int retryTimes;
+
     @Setter
     private SelectedPartitions selectedPartitions = null;
 
@@ -127,6 +131,11 @@ public class MaxComputeScanNode extends FileQueryScanNode {
         fileDesc.setPartitionSpec("deprecated");
         fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
         fileDesc.setSessionId(maxComputeSplit.getSessionId());
+
+        fileDesc.setReadTimeout(readTimeout);
+        fileDesc.setConnectTimeout(connectTimeout);
+        fileDesc.setRetryTimes(retryTimes);
+
         tableFormatFileDesc.setMaxComputeParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
         rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + 
maxComputeSplit.getLength() + " ]");
@@ -477,6 +486,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
 
             MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) 
table.getCatalog();
 
+            readTimeout = mcCatalog.getReadTimeout();
+            connectTimeout = mcCatalog.getConnectTimeout();
+            retryTimes = mcCatalog.getRetryTimes();
+
             if 
(mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) 
{
 
                 for (com.aliyun.odps.table.read.split.InputSplit split : 
assigner.getAllSplits()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
index 20a77574fc7..efbd01c1477 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -56,6 +56,14 @@ public class MCProperties extends BaseProperties {
     public static final String SPLIT_ROW_COUNT = "mc.split_row_count";
     public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 * 
4096
 
+    public static final String CONNECT_TIMEOUT = "mc.connect_timeout";
+    public static final String READ_TIMEOUT = "mc.read_timeout";
+    public static final String RETRY_COUNT = "mc.retry_count";
+
+    public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
+    public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
+    public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times
+
     public static CloudCredential getCredential(Map<String, String> props) {
         return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, 
SESSION_TOKEN);
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 77d9c0ef672..bd8c43622d1 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -353,6 +353,10 @@ struct TMaxComputeFileDesc {
     1: optional string partition_spec // deprecated 
     2: optional string session_id 
     3: optional string table_batch_read_session
+    // for mc network configuration
+    4: optional i32 connect_timeout
+    5: optional i32 read_timeout
+    6: optional i32 retry_times
 }
 
 struct THudiFileDesc {
diff --git 
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
index 3f0929b59ea..81133270fb6 100644
--- 
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
+++ 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -388,7 +388,10 @@ suite("test_external_catalog_maxcompute", 
"p2,external,maxcompute,external_remot
         order_qt_multi_partition_q6 """ select max(pt), yy, mm from 
multi_partitions where yy = '2023' and mm='08' group by yy, mm order by yy, mm; 
"""
         order_qt_multi_partition_q7 """ select count(*) from multi_partitions 
where yy < '2023' or dd < '03'; """
         order_qt_multi_partition_q8 """ select count(*) from multi_partitions 
where pt>=3; """
-        order_qt_multi_partition_q9 """ select 
city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd 
from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; 
"""
+        
+        //`finished_time is not null` => com.aliyun.odps.OdpsException: 
ODPS-0010000:System internal error - fuxi job failed, caused by: timestamp_ntz
+        // order_qt_multi_partition_q9 """ select 
city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd 
from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; 
"""
+
         order_qt_multi_partition_q10 """ select pt, yy, mm, dd from 
multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by 
pt, yy, mm, dd; """
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to