This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6e5082eee21a5a045a09ee50cbbc8cf8fa0f763f
Author: slothever <[email protected]>
AuthorDate: Tue Oct 17 22:36:10 2023 +0800

    [fix](multi-catalog)fix maxcompute partition filter and session creation 
(#24911)
    
    add maxcompute partition support
    fix maxcompute partition filter
    modify maxcompute session create method
---
 be/src/runtime/descriptors.cpp                     |   1 +
 be/src/runtime/descriptors.h                       |   2 +
 .../exec/format/table/max_compute_jni_reader.cpp   |   1 +
 .../doris/maxcompute/MaxComputeJniScanner.java     |  39 +++++-
 .../doris/maxcompute/MaxComputePartitionValue.java | 137 +++++++++++++++++++++
 .../doris/maxcompute/MaxComputeTableScan.java      |  30 +++--
 .../catalog/external/MaxComputeExternalTable.java  |  80 ++++++++++++
 .../datasource/MaxComputeExternalCatalog.java      |  12 +-
 .../doris/planner/external/MaxComputeScanNode.java |   4 +-
 gensrc/thrift/Descriptors.thrift                   |   1 +
 regression-test/conf/regression-conf.groovy        |   4 +
 .../test_external_catalog_maxcompute.out           |  24 ++++
 .../test_external_catalog_maxcompute.groovy        |  60 +++++++++
 13 files changed, 379 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 1855f5d58d7..be582616522 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -186,6 +186,7 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const 
TTableDescriptor& tde
           _table(tdesc.mcTable.table),
           _access_key(tdesc.mcTable.access_key),
           _secret_key(tdesc.mcTable.secret_key),
+          _partition_spec(tdesc.mcTable.partition_spec),
           _public_access(tdesc.mcTable.public_access) {}
 
 MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index ad1199f3f16..f3c3ea33c4d 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -229,6 +229,7 @@ public:
     const std::string table() const { return _table; }
     const std::string access_key() const { return _access_key; }
     const std::string secret_key() const { return _secret_key; }
+    const std::string partition_spec() const { return _partition_spec; }
     const std::string public_access() const { return _public_access; }
 
 private:
@@ -237,6 +238,7 @@ private:
     std::string _table;
     std::string _access_key;
     std::string _secret_key;
+    std::string _partition_spec;
     std::string _public_access;
 };
 
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp 
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index 9edd8bfc514..34db6a1df4d 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -64,6 +64,7 @@ MaxComputeJniReader::MaxComputeJniReader(const 
MaxComputeTableDescriptor* mc_des
                                        {"access_key", 
_table_desc->access_key()},
                                        {"secret_key", 
_table_desc->secret_key()},
                                        {"project", _table_desc->project()},
+                                       {"partition_spec", 
_table_desc->partition_spec()},
                                        {"table", _table_desc->table()},
                                        {"public_access", 
_table_desc->public_access()},
                                        {"start_offset", 
std::to_string(_range.start_offset)},
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 8f9b903afdc..5f4125ec4ed 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.jni.vec.ScanPredicate;
 
 import com.aliyun.odps.Column;
 import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.data.ArrowRecordReader;
 import com.aliyun.odps.tunnel.TableTunnel;
 import com.aliyun.odps.type.TypeInfo;
@@ -31,6 +32,7 @@ import com.google.common.base.Strings;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
@@ -40,7 +42,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * MaxComputeJ JniScanner. BE will read data from the scanner object.
@@ -49,16 +53,19 @@ public class MaxComputeJniScanner extends JniScanner {
     private static final Logger LOG = 
Logger.getLogger(MaxComputeJniScanner.class);
     private static final String REGION = "region";
     private static final String PROJECT = "project";
+    private static final String PARTITION_SPEC = "partition_spec";
     private static final String TABLE = "table";
     private static final String ACCESS_KEY = "access_key";
     private static final String SECRET_KEY = "secret_key";
     private static final String START_OFFSET = "start_offset";
     private static final String SPLIT_SIZE = "split_size";
     private static final String PUBLIC_ACCESS = "public_access";
-    private static final Map<String, MaxComputeTableScan> tableScans = new 
ConcurrentHashMap<>();
+    private final Map<String, MaxComputeTableScan> tableScans = new 
ConcurrentHashMap<>();
     private final String region;
     private final String project;
     private final String table;
+    private PartitionSpec partitionSpec;
+    private Set<String> partitionColumns;
     private final MaxComputeTableScan curTableScan;
     private MaxComputeColumnValue columnValue;
     private long remainBatchRows = 0;
@@ -76,7 +83,10 @@ public class MaxComputeJniScanner extends JniScanner {
         table = Objects.requireNonNull(params.get(TABLE), "required property 
'" + TABLE + "'.");
         tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
         curTableScan = tableScans.get(tableUniqKey());
-
+        String partitionSpec = params.get(PARTITION_SPEC);
+        if (StringUtils.isNotEmpty(partitionSpec)) {
+            this.partitionSpec = new PartitionSpec(partitionSpec);
+        }
         String[] requiredFields = params.get("required_fields").split(",");
         String[] types = params.get("columns_types").split("#");
         ColumnType[] columnTypes = new ColumnType[types.length];
@@ -124,6 +134,7 @@ public class MaxComputeJniScanner extends JniScanner {
         }
         // reorder columns
         List<Column> columnList = curTableScan.getSchema().getColumns();
+        columnList.addAll(curTableScan.getSchema().getPartitionColumns());
         Map<String, Integer> columnRank = new HashMap<>();
         for (int i = 0; i < columnList.size(); i++) {
             columnRank.put(columnList.get(i).getName(), i);
@@ -139,13 +150,23 @@ public class MaxComputeJniScanner extends JniScanner {
             return;
         }
         try {
-            TableTunnel.DownloadSession session = curTableScan.getSession();
+            TableTunnel.DownloadSession session;
+            if (partitionSpec != null) {
+                session = curTableScan.openDownLoadSession(partitionSpec);
+            } else {
+                session = curTableScan.openDownLoadSession();
+            }
             long start = startOffset == -1L ? 0 : startOffset;
             long recordCount = session.getRecordCount();
             totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : 
recordCount;
 
             arrowAllocator = new RootAllocator(Long.MAX_VALUE);
-            curReader = session.openArrowRecordReader(start, totalRows, 
readColumns, arrowAllocator);
+            partitionColumns = 
session.getSchema().getPartitionColumns().stream()
+                    .map(Column::getName)
+                    .collect(Collectors.toSet());
+            List<Column> maxComputeColumns = new ArrayList<>(readColumns);
+            maxComputeColumns.removeIf(e -> 
partitionColumns.contains(e.getName()));
+            curReader = session.openArrowRecordReader(start, totalRows, 
maxComputeColumns, arrowAllocator);
         } catch (Exception e) {
             close();
             throw new IOException(e);
@@ -252,6 +273,16 @@ public class MaxComputeJniScanner extends JniScanner {
                         appendData(readColumnsToId.get(column.getName()), 
columnValue);
                     }
                 }
+                if (partitionSpec != null) {
+                    for (String partitionColumn : partitionColumns) {
+                        String partitionValue = 
partitionSpec.get(partitionColumn);
+                        Integer readColumnId = 
readColumnsToId.get(partitionColumn);
+                        if (readColumnId != null && partitionValue != null) {
+                            MaxComputePartitionValue value = new 
MaxComputePartitionValue(partitionValue);
+                            appendData(readColumnId, value);
+                        }
+                    }
+                }
                 curReadRows += batchRows;
             } finally {
                 batch.close();
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
new file mode 100644
index 00000000000..cb76447e589
--- /dev/null
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * MaxCompute Column value in vector column
+ */
+public class MaxComputePartitionValue implements ColumnValue {
+    private String partitionValue;
+
+    public MaxComputePartitionValue(String partitionValue) {
+        reset(partitionValue);
+    }
+
+    public void reset(String partitionValue) {
+        this.partitionValue = partitionValue;
+    }
+
+    @Override
+    public boolean canGetStringAsBytes() {
+        return false;
+    }
+
+    @Override
+    public boolean isNull() {
+        return false;
+    }
+
+    @Override
+    public boolean getBoolean() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte getByte() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short getShort() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getInt() {
+        return Integer.parseInt(partitionValue);
+    }
+
+    @Override
+    public float getFloat() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLong() {
+        return Long.parseLong(partitionValue);
+    }
+
+    @Override
+    public double getDouble() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        return BigInteger.valueOf(getLong());
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return BigDecimal.valueOf(getDouble());
+    }
+
+    @Override
+    public String getString() {
+        return partitionValue;
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LocalDate getDate() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return partitionValue.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
index da67196a3a2..c0fa40dae46 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
@@ -18,6 +18,7 @@
 package org.apache.doris.maxcompute;
 
 import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,8 +36,8 @@ public class MaxComputeTableScan {
     private final TableTunnel tunnel;
     private final String project;
     private final String table;
-    private volatile TableTunnel.DownloadSession tableSession;
     private volatile long readRows = 0;
+    private long totalRows = 0;
 
     public MaxComputeTableScan(String region, String project, String table,
                                String accessKey, String secretKey, boolean 
enablePublicAccess) {
@@ -59,13 +60,24 @@ public class MaxComputeTableScan {
         return odps.tables().get(table).getSchema();
     }
 
-    public synchronized TableTunnel.DownloadSession getSession() throws 
IOException {
-        if (tableSession == null) {
-            try {
-                tableSession = tunnel.createDownloadSession(project, table);
-            } catch (TunnelException e) {
-                throw new IOException(e);
-            }
+    public TableTunnel.DownloadSession openDownLoadSession() throws 
IOException {
+        TableTunnel.DownloadSession tableSession;
+        try {
+            tableSession = tunnel.getDownloadSession(project, table, null);
+            totalRows = tableSession.getRecordCount();
+        } catch (TunnelException e) {
+            throw new IOException(e);
+        }
+        return tableSession;
+    }
+
+    public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec 
partitionSpec) throws IOException {
+        TableTunnel.DownloadSession tableSession;
+        try {
+            tableSession = tunnel.getDownloadSession(project, table, 
partitionSpec, null);
+            totalRows = tableSession.getRecordCount();
+        } catch (TunnelException e) {
+            throw new IOException(e);
         }
         return tableSession;
     }
@@ -76,6 +88,6 @@ public class MaxComputeTableScan {
     }
 
     public boolean endOfScan() {
-        return readRows >= tableSession.getRecordCount();
+        return readRows >= totalRows;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 012693bccd6..3c2f3bada03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.catalog.external;
 
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.MapType;
@@ -39,9 +44,16 @@ import com.aliyun.odps.type.StructTypeInfo;
 import com.aliyun.odps.type.TypeInfo;
 import com.aliyun.odps.type.VarcharTypeInfo;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.StringJoiner;
 
 /**
  * MaxCompute external table.
@@ -49,6 +61,8 @@ import java.util.List;
 public class MaxComputeExternalTable extends ExternalTable {
 
     private Table odpsTable;
+    private Set<String> partitionKeys;
+    private String partitionSpec;
 
     public MaxComputeExternalTable(long id, String name, String dbName, 
MaxComputeExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
@@ -72,9 +86,74 @@ public class MaxComputeExternalTable extends ExternalTable {
             result.add(new Column(field.getName(), 
mcTypeToDorisType(field.getTypeInfo()), true, null,
                     true, field.getComment(), true, -1));
         }
+        List<com.aliyun.odps.Column> partitionColumns = 
odpsTable.getSchema().getPartitionColumns();
+        partitionKeys = new HashSet<>();
+        for (com.aliyun.odps.Column partColumn : partitionColumns) {
+            result.add(new Column(partColumn.getName(), 
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
+                    true, partColumn.getComment(), true, -1));
+            partitionKeys.add(partColumn.getName());
+        }
         return result;
     }
 
+    public Optional<String> getPartitionSpec(List<Expr> conjuncts) {
+        if (!partitionKeys.isEmpty()) {
+            if (conjuncts.isEmpty()) {
+                throw new IllegalArgumentException("Max Compute partition 
table need partition predicate.");
+            }
+            // recreate partitionSpec when conjuncts is changed.
+            List<String> partitionConjuncts = 
parsePartitionConjuncts(conjuncts, partitionKeys);
+            StringJoiner partitionSpec = new StringJoiner(",");
+            partitionConjuncts.forEach(partitionSpec::add);
+            this.partitionSpec = partitionSpec.toString();
+            return Optional.of(this.partitionSpec);
+        }
+        return Optional.empty();
+    }
+
+    private static List<String> parsePartitionConjuncts(List<Expr> conjuncts, 
Set<String> partitionKeys) {
+        List<String> partitionConjuncts = new ArrayList<>();
+        Set<Predicate> predicates = Sets.newHashSet();
+        for (Expr conjunct : conjuncts) {
+            // collect depart predicate
+            conjunct.collect(BinaryPredicate.class, predicates);
+            conjunct.collect(InPredicate.class, predicates);
+        }
+        Map<String, Predicate> slotToConjuncts = new HashMap<>();
+        for (Predicate predicate : predicates) {
+            List<SlotRef> slotRefs = new ArrayList<>();
+            if (predicate instanceof BinaryPredicate) {
+                if (((BinaryPredicate) predicate).getOp() != 
BinaryPredicate.Operator.EQ) {
+                    // max compute only support the EQ operator: pt='pt-value'
+                    continue;
+                }
+                // BinaryPredicate has one left slotRef, and partition value 
not slotRef
+                predicate.collect(SlotRef.class, slotRefs);
+                slotToConjuncts.put(slotRefs.get(0).getColumnName(), 
predicate);
+            } else if (predicate instanceof InPredicate) {
+                predicate.collect(SlotRef.class, slotRefs);
+                slotToConjuncts.put(slotRefs.get(0).getColumnName(), 
predicate);
+            }
+        }
+        for (String partitionKey : partitionKeys) {
+            Predicate partitionPredicate = slotToConjuncts.get(partitionKey);
+            if (partitionPredicate == null) {
+                continue;
+            }
+            if (partitionPredicate instanceof InPredicate) {
+                List<Expr> inList = ((InPredicate) 
partitionPredicate).getListChildren();
+                for (Expr expr : inList) {
+                    String partitionConjunct = partitionKey + "=" + 
expr.toSql();
+                    partitionConjuncts.add(partitionConjunct.replace("`", ""));
+                }
+            } else {
+                String partitionConjunct = partitionPredicate.toSql();
+                partitionConjuncts.add(partitionConjunct.replace("`", ""));
+            }
+        }
+        return partitionConjuncts;
+    }
+
     private Type mcTypeToDorisType(TypeInfo typeInfo) {
         OdpsType odpsType = typeInfo.getOdpsType();
         switch (odpsType) {
@@ -166,6 +245,7 @@ public class MaxComputeExternalTable extends ExternalTable {
         tMcTable.setRegion(mcCatalog.getRegion());
         tMcTable.setAccessKey(mcCatalog.getAccessKey());
         tMcTable.setSecretKey(mcCatalog.getSecretKey());
+        tMcTable.setPartitionSpec(this.partitionSpec);
         
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
         // use mc project as dbName
         tMcTable.setProject(dbName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index 5c5e4ded0c1..0cd99678bad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.datasource.property.constants.MCProperties;
 
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,6 +36,7 @@ import com.google.gson.annotations.SerializedName;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class MaxComputeExternalCatalog extends ExternalCatalog {
     private Odps odps;
@@ -93,15 +95,21 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
         odps.setDefaultProject(defaultProject);
     }
 
-    public long getTotalRows(String project, String table) throws 
TunnelException {
+    public long getTotalRows(String project, String table, Optional<String> 
partitionSpec) throws TunnelException {
         makeSureInitialized();
         TableTunnel tunnel = new TableTunnel(odps);
         String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
         if (enablePublicAccess) {
             tunnelUrl = tunnelUrl.replace("-inc", "");
         }
+        TableTunnel.DownloadSession downloadSession;
         tunnel.setEndpoint(tunnelUrl);
-        return tunnel.createDownloadSession(project, table).getRecordCount();
+        if (!partitionSpec.isPresent()) {
+            downloadSession = tunnel.getDownloadSession(project, table, null);
+        } else {
+            downloadSession = tunnel.getDownloadSession(project, table, new 
PartitionSpec(partitionSpec.get()), null);
+        }
+        return downloadSession.getRecordCount();
     }
 
     public Odps getClient() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 1030a67a30a..d7f8d599a61 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class MaxComputeScanNode extends FileQueryScanNode {
 
@@ -96,7 +97,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
         }
         try {
             List<Pair<Long, Long>> sliceRange = new ArrayList<>();
-            long totalRows = catalog.getTotalRows(table.getDbName(), 
table.getName());
+            Optional<String> partitionSpec = table.getPartitionSpec(conjuncts);
+            long totalRows = catalog.getTotalRows(table.getDbName(), 
table.getName(), partitionSpec);
             long fileNum = odpsTable.getFileNum();
             long start = 0;
             long splitSize = (long) Math.ceil((double) totalRows / fileNum);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 326a5bd0d0c..168e448dd86 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -323,6 +323,7 @@ struct TMCTable {
   4: optional string access_key
   5: optional string secret_key
   6: optional string public_access
+  7: optional string partition_spec
 }
 
 // "Union" of all table types.
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index be5971448ae..d6001476392 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -150,6 +150,10 @@ extEsPort = 9200
 extEsUser = "*******"
 extEsPassword = "***********"
 
+enableMaxComputeTest=false
+aliYunAk="***********"
+aliYunSk="***********"
+
 s3Endpoint = "cos.ap-hongkong.myqcloud.com"
 s3BucketName = "doris-build-hk-1308700295"
 s3Region = "ap-hongkong"
diff --git 
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
 
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
new file mode 100644
index 00000000000..5fc7ade4894
--- /dev/null
+++ 
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
@@ -0,0 +1,24 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+8639377
+
+-- !q2 --
+1      2                       2000-08-15      2000-08-16      t       3       
4       5       6       7       8       9       10      11      12              
                                        13              14      15              
16              17      18      19      20              21      22.22   23.23
+
+-- !q3 --
+false  2       44      423432
+true   77      8920    182239402452
+
+-- !q4 --
+6223   maxam   2020-09-21
+9601   qewtoll 2020-09-21
+
+-- !q5 --
+1633   siwtow  2021-08-21
+
+-- !q6 --
+9601   qewtoll 2020-09-21
+
+-- !q7 --
+6223   maxam   2020-09-21
+9601   qewtoll 2020-09-21
diff --git 
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
new file mode 100644
index 00000000000..6b050e277a8
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_maxcompute", 
"p2,external,maxcompute,external_remote,external_remote_maxcompute") {
+    String enabled = context.config.otherConfigs.get("enableMaxComputeTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String ak = context.config.otherConfigs.get("aliYunAk")
+        String sk = context.config.otherConfigs.get("aliYunSk");
+        String mc_db = "jz_datalake"
+        String mc_catalog_name = "test_external_mc_catalog"
+
+        sql """drop catalog if exists ${mc_catalog_name};"""
+        sql """
+            create catalog if not exists ${mc_catalog_name} properties (
+                "type" = "max_compute",
+                "mc.region" = "cn-beijing",
+                "mc.default.project" = "${mc_db}",
+                "mc.access_key" = "${ak}",
+                "mc.secret_key" = "${sk}",
+                "mc.public_access" = "true"
+            );
+        """
+        
+        // query data test
+        def q01 = {
+            qt_q1 """ select count(*) from store_sales """
+        }
+        // data type test
+        def q02 = {
+            qt_q2 """ select * from web_site where web_site_id=2 order by 
web_site_id """ // test char,date,varchar,double,decimal
+            qt_q3 """ select * from int_types order by mc_boolean limit 2 """ 
// test bool,tinyint,int,bigint
+        }
+        // test partition table filter
+        def q03 = {
+            qt_q4 """ select * from mc_parts where dt = '2020-09-21' """
+            qt_q5 """ select * from mc_parts where dt = '2021-08-21' """
+            qt_q6 """ select * from mc_parts where dt = '2020-09-21' and 
mc_bigint > 6223 """
+            qt_q7 """ select * from mc_parts where dt = '2020-09-21' or 
mc_bigint > 0 """
+        }
+        sql """ switch `${mc_catalog_name}`; """
+        sql """ use `${mc_db}`; """
+        q01()
+        q02()
+        q03()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to