This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 855e9a508c6 [fix](catalog) opt the count pushdown rule for
iceberg/paimon/hive scan node (#44038) (#45564)
855e9a508c6 is described below
commit 855e9a508c6fd4a51b55e167eb15e29bb42a3351
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Dec 17 17:54:56 2024 -0800
[fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan
node (#44038) (#45564)
bp #44038
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 18 ++--
be/src/vec/exec/format/table/iceberg_reader.h | 4 +-
be/src/vec/exec/format/table/paimon_reader.cpp | 3 +
be/src/vec/exec/scan/vfile_scanner.cpp | 4 +
.../doris/analysis/TableValuedFunctionRef.java | 5 +-
.../apache/doris/datasource/FileQueryScanNode.java | 27 +++++-
.../org/apache/doris/datasource/FileScanNode.java | 79 ----------------
.../org/apache/doris/datasource/FileSplit.java | 5 +-
.../org/apache/doris/datasource/FileSplitter.java | 104 +++++++++++++++++++++
.../apache/doris/datasource/SplitAssignment.java | 2 +-
.../org/apache/doris/datasource/SplitCreator.java | 2 +-
.../apache/doris/datasource/SplitGenerator.java | 5 +-
.../doris/datasource/hive/source/HiveScanNode.java | 52 ++++++++---
.../doris/datasource/hive/source/HiveSplit.java | 6 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 8 +-
.../datasource/iceberg/source/IcebergScanNode.java | 42 +++++----
.../datasource/iceberg/source/IcebergSplit.java | 11 +--
.../maxcompute/source/MaxComputeScanNode.java | 28 +++---
.../datasource/paimon/source/PaimonScanNode.java | 65 +++++++------
.../datasource/paimon/source/PaimonSplit.java | 7 +-
.../doris/datasource/tvf/source/TVFScanNode.java | 22 ++++-
.../glue/translator/PhysicalPlanTranslator.java | 16 ++--
.../apache/doris/planner/SingleNodePlanner.java | 20 ++--
.../tablefunction/DataGenTableValuedFunction.java | 3 +-
.../ExternalFileTableValuedFunction.java | 5 +-
.../GroupCommitTableValuedFunction.java | 3 +-
.../tablefunction/JdbcQueryTableValueFunction.java | 3 +-
.../tablefunction/MetadataTableValuedFunction.java | 3 +-
.../tablefunction/QueryTableValueFunction.java | 3 +-
.../doris/tablefunction/TableValuedFunctionIf.java | 3 +-
.../paimon/test_paimon_catalog.out | 80 ++++++++++++++++
.../iceberg/test_iceberg_optimize_count.groovy | 2 +-
.../paimon/test_paimon_catalog.groovy | 14 +++
33 files changed, 436 insertions(+), 218 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 8f130ca6002..837269b0bb3 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -96,25 +96,25 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
- _remaining_push_down_count =
range.table_format_params.iceberg_params.row_count;
+ _remaining_table_level_row_count =
range.table_format_params.iceberg_params.row_count;
} else {
- _remaining_push_down_count = -1;
+ _remaining_table_level_row_count = -1;
}
}
Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
// already get rows from be
- if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_push_down_count > 0) {
- auto rows =
- std::min(_remaining_push_down_count,
(int64_t)_state->query_options().batch_size);
- _remaining_push_down_count -= rows;
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count > 0) {
+ auto rows = std::min(_remaining_table_level_row_count,
+ (int64_t)_state->query_options().batch_size);
+ _remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
- if (_remaining_push_down_count == 0) {
+ if (_remaining_table_level_row_count == 0) {
*eof = true;
}
@@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(
Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range,
io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the
delete file
- if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_push_down_count > 0) {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count > 0) {
return Status::OK();
}
@@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const
TFileRangeDesc& range, io::IOC
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path,
position_delete_files));
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
COUNTER_UPDATE(_iceberg_profile.num_delete_files,
table_desc.delete_files.size());
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 2e240f465b6..ee7dcdd68d2 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -167,7 +167,9 @@ protected:
bool _has_schema_change = false;
bool _has_iceberg_schema = false;
- int64_t _remaining_push_down_count;
+ // the table level row count for optimizing query like:
+ // select count(*) from table;
+ int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 47a956fd601..01c21ad7db9 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -44,6 +44,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc&
range, io::IOContext
return Status::OK();
}
+ // set push down agg type to NONE because we can not do count push down opt
+ // if there are delete files.
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 331b49b2082..e63bc99d41b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -829,6 +829,9 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ?
ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
+ // ATTN: the push down agg type may be set back to NONE,
+ // see IcebergTableReader::init_row_filters for example.
+ parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
@@ -894,6 +897,7 @@ Status VFileScanner::_get_next_reader() {
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(),
_state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
+ orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
index b1e7c7c89e9..9eacb8a0422 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
@@ -26,6 +26,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
@@ -119,8 +120,8 @@ public class TableValuedFunctionRef extends TableRef {
analyzeJoin(analyzer);
}
- public ScanNode getScanNode(PlanNodeId id) {
- return tableFunction.getScanNode(id, desc);
+ public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
+ return tableFunction.getScanNode(id, desc, sv);
}
public TableValuedFunctionIf getTableFunction() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index c4fcaebbb19..1f625d50f53 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
@@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;
protected TableSnapshot tableSnapshot;
+ // Save the reference of session variable, so that we don't need to get it
from connection context.
+ // connection context is a thread local variable, it is not available is
running in other thread.
+ protected SessionVariable sessionVariable;
/**
* External file scan node for Query hms table
@@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends
FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- StatisticalType statisticalType, boolean
needCheckColumnPriv) {
+ StatisticalType statisticalType, boolean needCheckColumnPriv,
+ SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ this.sessionVariable = sv;
}
@Override
@@ -112,7 +118,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
}
super.init(analyzer);
- initFileSplitSize();
doInitialize();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
@@ -313,6 +318,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setProperties(locationProperties);
}
+ int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while
scanning.
@@ -354,7 +360,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
setLocationPropertiesIfNecessary(backend, locationType,
locationProperties);
}
} else {
- List<Split> inputSplits = getSplits();
+ List<Split> inputSplits = getSplits(numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
@@ -603,4 +609,19 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
return this.tableSnapshot;
}
+
+ /**
+ * The real file split size is determined by:
+ * 1. If user specify the split size in session variable
`file_split_size`, use user specified value.
+ * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
+ * @param blockSize, got from file system, eg, hdfs
+ * @return the real file split size
+ */
+ protected long getRealFileSplitSize(long blockSize) {
+ long realSplitSize = sessionVariable.getFileSplitSize();
+ if (realSplitSize <= 0) {
+ realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
+ }
+ return realSplitSize;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 1b2b15c0ec5..2f346376955 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -25,15 +25,10 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
@@ -46,11 +41,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -67,8 +60,6 @@ public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
- protected long fileSplitSize;
- protected boolean isSplitSizeSetBySession = false;
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
@@ -76,19 +67,6 @@ public abstract class FileScanNode extends ExternalScanNode {
this.needCheckColumnPriv = needCheckColumnPriv;
}
- @Override
- public void init() throws UserException {
- initFileSplitSize();
- }
-
- protected void initFileSplitSize() {
- this.fileSplitSize =
ConnectContext.get().getSessionVariable().getFileSplitSize();
- this.isSplitSizeSetBySession = this.fileSplitSize > 0;
- if (this.fileSplitSize <= 0) {
- this.fileSplitSize = DEFAULT_SPLIT_SIZE;
- }
- }
-
@Override
protected void toThrift(TPlanNode planNode) {
planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
@@ -248,61 +226,4 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
}
}
-
- protected List<Split> splitFile(LocationPath path, long blockSize,
BlockLocation[] blockLocations, long length,
- long modificationTime, boolean splittable, List<String>
partitionValues, SplitCreator splitCreator)
- throws IOException {
- if (blockLocations == null) {
- blockLocations = new BlockLocation[0];
- }
- List<Split> result = Lists.newArrayList();
- TFileCompressType compressType =
Util.inferFileCompressTypeByPath(path.get());
- if (!splittable || compressType != TFileCompressType.PLAIN) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Path {} is not splittable.", path);
- }
- String[] hosts = blockLocations.length == 0 ? null :
blockLocations[0].getHosts();
- result.add(splitCreator.create(path, 0, length, length,
modificationTime, hosts, partitionValues));
- return result;
- }
- // if file split size is set by session variable, use session variable.
- // Otherwise, use max(file split size, block size)
- if (!isSplitSizeSetBySession) {
- fileSplitSize = Math.max(fileSplitSize, blockSize);
- }
- long bytesRemaining;
- for (bytesRemaining = length; (double) bytesRemaining / (double)
fileSplitSize > 1.1D;
- bytesRemaining -= fileSplitSize) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(splitCreator.create(path, length - bytesRemaining,
fileSplitSize,
- length, modificationTime, hosts, partitionValues));
- }
- if (bytesRemaining != 0L) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(splitCreator.create(path, length - bytesRemaining,
bytesRemaining,
- length, modificationTime, hosts, partitionValues));
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Path {} includes {} splits.", path, result.size());
- }
- return result;
- }
-
- protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
- if (blkLocations == null || blkLocations.length == 0) {
- return -1;
- }
- for (int i = 0; i < blkLocations.length; ++i) {
- if (blkLocations[i].getOffset() <= offset
- && offset < blkLocations[i].getOffset() +
blkLocations[i].getLength()) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1L;
- throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 1ebb390e904..37e66c7056f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -87,9 +87,12 @@ public class FileSplit implements Split {
@Override
public Split create(LocationPath path, long start, long length, long
fileLength,
+ long fileSplitSize,
long modificationTime, String[] hosts,
List<String> partitionValues) {
- return new FileSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
+ FileSplit split = new FileSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
+ split.setTargetSplitSize(fileSplitSize);
+ return split;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
new file mode 100644
index 00000000000..b923c87d3ac
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TFileCompressType;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FileSplitter {
+ private static final Logger LOG = LogManager.getLogger(FileSplitter.class);
+
+ // If the number of files is larger than parallel instances * num of
backends,
+ // we don't need to split the file.
+ // Otherwise, split the file to avoid local shuffle.
+ public static boolean needSplitForCountPushdown(int parallelism, int
numBackends, long totalFileNum) {
+ return totalFileNum < parallelism * numBackends;
+ }
+
+ public static List<Split> splitFile(
+ LocationPath path,
+ long fileSplitSize,
+ BlockLocation[] blockLocations,
+ long length,
+ long modificationTime,
+ boolean splittable,
+ List<String> partitionValues,
+ SplitCreator splitCreator)
+ throws IOException {
+ if (blockLocations == null) {
+ blockLocations = new BlockLocation[0];
+ }
+ List<Split> result = Lists.newArrayList();
+ TFileCompressType compressType =
Util.inferFileCompressTypeByPath(path.get());
+ if (!splittable || compressType != TFileCompressType.PLAIN) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Path {} is not splittable.", path);
+ }
+ String[] hosts = blockLocations.length == 0 ? null :
blockLocations[0].getHosts();
+ result.add(splitCreator.create(path, 0, length, length,
fileSplitSize,
+ modificationTime, hosts, partitionValues));
+ return result;
+ }
+ long bytesRemaining;
+ for (bytesRemaining = length; (double) bytesRemaining / (double)
fileSplitSize > 1.1D;
+ bytesRemaining -= fileSplitSize) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(splitCreator.create(path, length - bytesRemaining,
fileSplitSize,
+ length, fileSplitSize, modificationTime, hosts,
partitionValues));
+ }
+ if (bytesRemaining != 0L) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(splitCreator.create(path, length - bytesRemaining,
bytesRemaining,
+ length, fileSplitSize, modificationTime, hosts,
partitionValues));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Path {} includes {} splits.", path, result.size());
+ }
+ return result;
+ }
+
+ private static int getBlockIndex(BlockLocation[] blkLocations, long
offset) {
+ if (blkLocations == null || blkLocations.length == 0) {
+ return -1;
+ }
+ for (int i = 0; i < blkLocations.length; ++i) {
+ if (blkLocations[i].getOffset() <= offset
+ && offset < blkLocations[i].getOffset() +
blkLocations[i].getLength()) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1L;
+ throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
+ }
+
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index 928854b91d1..a26abc7fc5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -69,7 +69,7 @@ public class SplitAssignment {
}
public void init() throws UserException {
- splitGenerator.startSplit();
+ splitGenerator.startSplit(backendPolicy.numBackends());
synchronized (assignLock) {
while (sampleSplit == null && waitFirstSplit()) {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
index 4df30459db7..6df84d2f0f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
@@ -23,6 +23,6 @@ import org.apache.doris.spi.Split;
import java.util.List;
public interface SplitCreator {
- Split create(LocationPath path, long start, long length, long fileLength,
+ Split create(LocationPath path, long start, long length, long fileLength,
long fileSplitSize,
long modificationTime, String[] hosts, List<String>
partitionValues);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index c4a373bc85b..34ff3911445 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -30,8 +30,9 @@ import java.util.List;
public interface SplitGenerator {
/**
* Get all file splits if the producer doesn't support batch mode.
+ * @param numBackends the number of backends, this is useful when
determine the number of splits.
*/
- default List<Split> getSplits() throws UserException {
+ default List<Split> getSplits(int numBackends) throws UserException {
// todo: remove this interface if batch mode is stable
throw new NotImplementedException("Not implement");
}
@@ -51,7 +52,7 @@ public interface SplitGenerator {
return -1;
}
- default void startSplit() {
+ default void startSplit(int numBackends) {
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 84ead8fe43c..88dfb8f3575 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -44,12 +45,14 @@ import
org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TPushAggOp;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -98,15 +101,13 @@ public class HiveScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
- super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
needCheckColumnPriv);
- hmsTable = (HMSExternalTable) desc.getTable();
- brokerName = hmsTable.getCatalog().bindBrokerName();
+ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
+ this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
needCheckColumnPriv, sv);
}
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- StatisticalType statisticalType, boolean
needCheckColumnPriv) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv) {
+ super(id, desc, planNodeName, statisticalType, needCheckColumnPriv,
sv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
}
@@ -163,7 +164,7 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
- public List<Split> getSplits() throws UserException {
+ public List<Split> getSplits(int numBackends) throws UserException {
long start = System.currentTimeMillis();
try {
if (!partitionInit) {
@@ -174,7 +175,7 @@ public class HiveScanNode extends FileQueryScanNode {
.getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
- getFileSplitByPartitions(cache, prunedPartitions, allFiles,
bindBrokerName);
+ getFileSplitByPartitions(cache, prunedPartitions, allFiles,
bindBrokerName, numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
@@ -193,7 +194,7 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
- public void startSplit() {
+ public void startSplit(int numBackends) {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
@@ -214,12 +215,12 @@ public class HiveScanNode extends FileQueryScanNode {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
- cache,
Collections.singletonList(partition), allFiles, bindBrokerName);
+ cache,
Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
if (allFiles.size() > numSplitsPerPartition.get())
{
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
- } catch (IOException e) {
+ } catch (Exception e) {
batchException.set(new
UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
@@ -263,7 +264,7 @@ public class HiveScanNode extends FileQueryScanNode {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
- List<Split> allFiles, String
bindBrokerName) throws IOException {
+ List<Split> allFiles, String bindBrokerName, int numBackends)
throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
@@ -276,11 +277,34 @@ public class HiveScanNode extends FileQueryScanNode {
splitAllFiles(allFiles, hiveFileStatuses);
return;
}
+
+ /**
+ * If the push down aggregation operator is COUNT,
+ * we don't need to split the file because for parquet/orc format,
only metadata is read.
+ * If we split the file, we will read metadata of a file multiple
times, which is not efficient.
+ *
+ * - Hive Transactional Table may need merge on read, so do not apply
this optimization.
+ * - If the file format is not parquet/orc, eg, text, we need to split
the file to increase the parallelism.
+ */
+ boolean needSplit = true;
+ if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT
+ && hiveTransaction != null) {
+ int totalFileNum = 0;
+ for (FileCacheValue fileCacheValue : fileCaches) {
+ if (fileCacheValue.getFiles() != null) {
+ totalFileNum += fileCacheValue.getFiles().size();
+ }
+ }
+ int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
+ }
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
- allFiles.addAll(splitFile(status.getPath(),
status.getBlockSize(),
+ allFiles.addAll(FileSplitter.splitFile(status.getPath(),
+ // set block size to Long.MAX_VALUE to avoid
splitting the file.
+ getRealFileSplitSize(needSplit ?
status.getBlockSize() : Long.MAX_VALUE),
status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
isSplittable, fileCacheValue.getPartitionValues(),
new
HiveSplitCreator(fileCacheValue.getAcidInfo())));
@@ -292,7 +316,7 @@ public class HiveScanNode extends FileQueryScanNode {
private void splitAllFiles(List<Split> allFiles,
List<HiveMetaStoreCache.HiveFileStatus>
hiveFileStatuses) throws IOException {
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
- allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
+ allFiles.addAll(FileSplitter.splitFile(status.getPath(),
getRealFileSplitSize(status.getBlockSize()),
status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
index 5dd63e734c9..58bfb32e617 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
@@ -54,9 +54,13 @@ public class HiveSplit extends FileSplit {
@Override
public Split create(LocationPath path, long start, long length, long
fileLength,
+ long fileSplitSize,
long modificationTime, String[] hosts,
List<String> partitionValues) {
- return new HiveSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues, acidInfo);
+ HiveSplit split = new HiveSplit(path, start, length, fileLength,
modificationTime,
+ hosts, partitionValues, acidInfo);
+ split.setTargetSplitSize(fileSplitSize);
+ return split;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index b2cad8ab710..e1dfaa40aef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -125,8 +125,8 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation,
- SessionVariable sessionVariable) {
- super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv);
+ SessionVariable sv) {
+ super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv, sv);
isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowTable) {
@@ -390,7 +390,7 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
- public List<Split> getSplits() throws UserException {
+ public List<Split> getSplits(int numBackends) throws UserException {
if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
return getIncrementalSplits();
}
@@ -406,7 +406,7 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
- public void startSplit() {
+ public void startSplit(int numBackends) {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f7b58158d1a..c78140b9d3c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -36,7 +36,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -87,7 +87,13 @@ public class IcebergScanNode extends FileQueryScanNode {
private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();
- private boolean pushDownCount = false;
+ // If tableLevelPushDownCount is true, means we can do count push down opt
at table level.
+ // which means all splits have no position/equality delete files,
+ // so for query like "select count(*) from ice_tbl", we can get count from
snapshot row count info directly.
+ // If tableLevelPushDownCount is false, means we can't do count push down
opt at table level,
+ // But for part of splits which have no position/equality delete files, we
can still do count push down opt.
+ // And for split level count push down opt, the flag is set in each split.
+ private boolean tableLevelPushDownCount = false;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
/**
@@ -96,8 +102,8 @@ public class IcebergScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
- super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
+ super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv);
ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
@@ -140,8 +146,8 @@ public class IcebergScanNode extends FileQueryScanNode {
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
- if (pushDownCount) {
- fileDesc.setRowCount(icebergSplit.getRowCount());
+ if (tableLevelPushDownCount) {
+ fileDesc.setRowCount(icebergSplit.getTableLevelRowCount());
}
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
@@ -177,11 +183,12 @@ public class IcebergScanNode extends FileQueryScanNode {
}
@Override
- public List<Split> getSplits() throws UserException {
- return
HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(),
this::doGetSplits);
+ public List<Split> getSplits(int numBackends) throws UserException {
+ return
HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(),
+ () -> doGetSplits(numBackends));
}
- private List<Split> doGetSplits() throws UserException {
+ private List<Split> doGetSplits(int numBackends) throws UserException {
TableScan scan = icebergTable.newScan();
// set snapshot
@@ -209,9 +216,10 @@ public class IcebergScanNode extends FileQueryScanNode {
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
- CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize);
+ long realFileSplitSize = getRealFileSplitSize(0);
+ CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
- TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
+ TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1,
0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
@@ -250,6 +258,7 @@ public class IcebergScanNode extends FileQueryScanNode {
source.getCatalog().getProperties(),
partitionValues,
originalPath);
+ split.setTargetSplitSize(realFileSplitSize);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
@@ -268,11 +277,11 @@ public class IcebergScanNode extends FileQueryScanNode {
}
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
- pushDownCount = true;
+ tableLevelPushDownCount = true;
List<Split> pushDownCountSplits;
if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
- int parallelNum =
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
- pushDownCountSplits = splits.subList(0,
Math.min(splits.size(), parallelNum));
+ int minSplits =
sessionVariable.getParallelExecInstanceNum() * numBackends;
+ pushDownCountSplits = splits.subList(0,
Math.min(splits.size(), minSplits));
} else {
pushDownCountSplits =
Collections.singletonList(splits.get(0));
}
@@ -282,7 +291,6 @@ public class IcebergScanNode extends FileQueryScanNode {
}
selectedPartitionNum = partitionPathSet.size();
- splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
@@ -422,8 +430,8 @@ public class IcebergScanNode extends FileQueryScanNode {
int size = splits.size();
long countPerSplit = totalCount / size;
for (int i = 0; i < size - 1; i++) {
- ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
+ ((IcebergSplit)
splits.get(i)).setTableLevelRowCount(countPerSplit);
}
- ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit +
totalCount % size);
+ ((IcebergSplit) splits.get(size -
1)).setTableLevelRowCount(countPerSplit + totalCount % size);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 580d3cf1bb2..0520612935a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -37,7 +37,8 @@ public class IcebergSplit extends FileSplit {
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
- private long rowCount = -1;
+ // tableLevelRowCount will be set only table-level count push down opt is
available.
+ private long tableLevelRowCount = -1;
// File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(LocationPath file, long start, long length, long
fileLength, String[] hosts,
@@ -50,14 +51,6 @@ public class IcebergSplit extends FileSplit {
this.selfSplitWeight = length;
}
- public long getRowCount() {
- return rowCount;
- }
-
- public void setRowCount(long rowCount) {
- this.rowCount = rowCount;
- }
-
public void setDeleteFileFilters(List<IcebergDeleteFileFilter>
deleteFileFilters) {
this.deleteFileFilters = deleteFileFilters;
this.selfSplitWeight +=
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 9fa22a0fffa..e4bb8b5e9dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -44,7 +44,7 @@ import
org.apache.doris.datasource.property.constants.MCProperties;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
@@ -107,21 +107,23 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
// For new planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
- SelectedPartitions selectedPartitions, boolean
needCheckColumnPriv) {
+ SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
+ SessionVariable sv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
- selectedPartitions, needCheckColumnPriv);
+ selectedPartitions, needCheckColumnPriv, sv);
}
// For old planner
- public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
+ SessionVariable sv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
- SelectedPartitions.NOT_PRUNED, needCheckColumnPriv);
+ SelectedPartitions.NOT_PRUNED, needCheckColumnPriv, sv);
}
private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
StatisticalType statisticalType, SelectedPartitions
selectedPartitions,
- boolean needCheckColumnPriv) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ boolean needCheckColumnPriv, SessionVariable sv) {
+ super(id, desc, planNodeName, statisticalType, needCheckColumnPriv,
sv);
table = (MaxComputeExternalTable) desc.getTable();
this.selectedPartitions = selectedPartitions;
}
@@ -214,7 +216,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
return false;
}
- int numPartitions =
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+ int numPartitions = sessionVariable.getNumPartitionsInBatchMode();
return numPartitions > 0
&& selectedPartitions != SelectedPartitions.NOT_PRUNED
&& selectedPartitions.selectedPartitions.size() >=
numPartitions;
@@ -226,7 +228,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
@Override
- public void startSplit() {
+ public void startSplit(int numBackends) {
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
this.selectedPartitionNum =
selectedPartitions.selectedPartitions.size();
@@ -241,8 +243,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
(key, value) -> requiredPartitionSpecs.add(new
PartitionSpec(key))
);
-
- int batchNumPartitions =
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+ int batchNumPartitions = sessionVariable.getNumPartitionsInBatchMode();
Executor scheduleExecutor =
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
AtomicReference<UserException> batchException = new
AtomicReference<>(null);
@@ -546,7 +547,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
return new HashMap<>();
}
- List<Split> getSplitByTableSession(TableBatchReadSession
tableBatchReadSession) throws java.io.IOException {
+ private List<Split> getSplitByTableSession(TableBatchReadSession
tableBatchReadSession) throws IOException {
List<Split> result = new ArrayList<>();
String scanSessionSerialize = serializeSession(tableBatchReadSession);
InputSplitAssigner assigner =
tableBatchReadSession.getInputSplitAssigner();
@@ -595,9 +596,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
return result;
}
-
@Override
- public List<Split> getSplits() throws UserException {
+ public List<Split> getSplits(int numBackends) throws UserException {
List<Split> result = new ArrayList<>();
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5009ec3c904..28efbc58f51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
@@ -36,6 +37,7 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
@@ -101,15 +103,16 @@ public class PaimonScanNode extends FileQueryScanNode {
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
- private SessionVariable sessionVariable;
private String serializedTable;
+ private boolean pushDownCount = false;
+ private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
- SessionVariable sessionVariable) {
- super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv);
- this.sessionVariable = sessionVariable;
+ SessionVariable sv) {
+ super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv, sv);
}
@Override
@@ -199,7 +202,7 @@ public class PaimonScanNode extends FileQueryScanNode {
}
@Override
- public List<Split> getSplits() throws UserException {
+ public List<Split> getSplits(int numBackends) throws UserException {
boolean forceJniScanner = sessionVariable.isForceJniScanner();
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType
.valueOf(sessionVariable.getIgnoreSplitType());
@@ -211,6 +214,8 @@ public class PaimonScanNode extends FileQueryScanNode {
List<org.apache.paimon.table.source.Split> paimonSplits =
readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
+
+ boolean applyCountPushdown = getPushDownAggNoGroupingOp() ==
TPushAggOp.COUNT;
// Just for counting the number of selected partitions for this paimon
table
Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
@@ -238,9 +243,9 @@ public class PaimonScanNode extends FileQueryScanNode {
LocationPath locationPath = new
LocationPath(file.path(),
source.getCatalog().getProperties());
try {
- List<Split> dorisSplits = splitFile(
+ List<Split> dorisSplits =
FileSplitter.splitFile(
locationPath,
- 0,
+ getRealFileSplitSize(0),
null,
file.length(),
-1,
@@ -261,25 +266,7 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
} else {
- for (RawFile file : rawFiles) {
- LocationPath locationPath = new
LocationPath(file.path(),
- source.getCatalog().getProperties());
- try {
- splits.addAll(
- splitFile(
- locationPath,
- 0,
- null,
- file.length(),
- -1,
- true,
- null,
-
PaimonSplit.PaimonSplitCreator.DEFAULT));
- ++rawFileSplitNum;
- } catch (IOException e) {
- throw new UserException("Paimon error to split
file: " + e.getMessage(), e);
- }
- }
+ createRawFileSplits(rawFiles, splits,
applyCountPushdown ? Long.MAX_VALUE : 0);
}
} else {
if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
@@ -297,14 +284,34 @@ public class PaimonScanNode extends FileQueryScanNode {
}
splitStats.add(splitStat);
}
+
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
- // We should set fileSplitSize at the end because fileSplitSize may be
modified
- // in splitFile.
- splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
+ private void createRawFileSplits(List<RawFile> rawFiles, List<Split>
splits, long blockSize) throws UserException {
+ for (RawFile file : rawFiles) {
+ LocationPath locationPath = new LocationPath(file.path(),
+ source.getCatalog().getProperties());
+ try {
+ splits.addAll(
+ FileSplitter.splitFile(
+ locationPath,
+ getRealFileSplitSize(blockSize),
+ null,
+ file.length(),
+ -1,
+ true,
+ null,
+ PaimonSplit.PaimonSplitCreator.DEFAULT));
+ ++rawFileSplitNum;
+ } catch (IOException e) {
+ throw new UserException("Paimon error to split file: " +
e.getMessage(), e);
+ }
+ }
+ }
+
private String getFileFormat(String path) {
return
FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 3ab38c7db28..988f043ad0e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -38,7 +38,6 @@ public class PaimonSplit extends FileSplit {
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;
-
public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
@@ -100,10 +99,14 @@ public class PaimonSplit extends FileSplit {
long start,
long length,
long fileLength,
+ long fileSplitSize,
long modificationTime,
String[] hosts,
List<String> partitionValues) {
- return new PaimonSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
+ PaimonSplit split = new PaimonSplit(path, start, length,
fileLength,
+ modificationTime, hosts, partitionValues);
+ split.setTargetSplitSize(fileSplitSize);
+ return split;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index b0f0406c215..5e650930365 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -29,7 +29,9 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.FileSplit.FileSplitCreator;
+import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
@@ -40,6 +42,7 @@ import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPushAggOp;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -63,8 +66,8 @@ public class TVFScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
- super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE,
needCheckColumnPriv);
+ public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
+ super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE,
needCheckColumnPriv, sv);
table = (FunctionGenTable) this.desc.getTable();
tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
}
@@ -126,16 +129,27 @@ public class TVFScanNode extends FileQueryScanNode {
}
@Override
- public List<Split> getSplits() throws UserException {
+ public List<Split> getSplits(int numBackends) throws UserException {
List<Split> splits = Lists.newArrayList();
if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) {
return splits;
}
+
List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
+
+ // Push down count optimization.
+ boolean needSplit = true;
+ if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) {
+ int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ int totalFileNum = fileStatuses.size();
+ needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
+ }
+
for (TBrokerFileStatus fileStatus : fileStatuses) {
Map<String, String> prop = Maps.newHashMap();
try {
- splits.addAll(splitFile(new LocationPath(fileStatus.getPath(),
prop), fileStatus.getBlockSize(),
+ splits.addAll(FileSplitter.splitFile(new
LocationPath(fileStatus.getPath(), prop),
+ getRealFileSplitSize(needSplit ?
fileStatus.getBlockSize() : Long.MAX_VALUE),
null, fileStatus.getSize(),
fileStatus.getModificationTime(),
fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e5ab9dae411..92bc509955d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -192,6 +192,7 @@ import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
@@ -584,15 +585,16 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
ExternalTable table = fileScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
+ SessionVariable sv = ConnectContext.get().getSessionVariable();
// TODO(cmy): determine the needCheckColumnPriv param
ScanNode scanNode;
if (table instanceof HMSExternalTable) {
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
break;
case HIVE:
- scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
@@ -604,13 +606,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
throw new RuntimeException("do not support DLA type " +
((HMSExternalTable) table).getDlaType());
}
} else if (table instanceof IcebergExternalTable) {
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
} else if (table instanceof PaimonExternalTable) {
- scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
- ConnectContext.get().getSessionVariable());
+ scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- fileScan.getSelectedPartitions(), false);
+ fileScan.getSelectedPartitions(), false, sv);
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
@@ -934,7 +935,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TupleDescriptor tupleDescriptor = generateTupleDesc(slots,
tvfRelation.getFunction().getTable(), context);
TableValuedFunctionIf catalogFunction =
tvfRelation.getFunction().getCatalogFunction();
- ScanNode scanNode =
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor);
+ SessionVariable sv = ConnectContext.get().getSessionVariable();
+ ScanNode scanNode =
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
scanNode.setNereidsId(tvfRelation.getId());
Utils.execWithUncheckedException(scanNode::init);
context.getRuntimeTranslator().ifPresent(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0acdfd67bce..d37f6c729f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -73,6 +73,7 @@ import
org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.thrift.TPushAggOp;
@@ -1917,8 +1918,8 @@ public class SingleNodePlanner {
*/
private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef,
SelectStmt selectStmt)
throws UserException {
- ScanNode scanNode = null;
-
+ SessionVariable sv = ConnectContext.get().getSessionVariable();
+ ScanNode scanNode;
switch (tblRef.getTable().getType()) {
case OLAP:
case MATERIALIZED_VIEW:
@@ -1956,7 +1957,7 @@ public class SingleNodePlanner {
scanNode = new JdbcScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), false);
break;
case TABLE_VALUED_FUNCTION:
- scanNode = ((TableValuedFunctionRef)
tblRef).getScanNode(ctx.getNextNodeId());
+ scanNode = ((TableValuedFunctionRef)
tblRef).getScanNode(ctx.getNextNodeId(), sv);
break;
case HMS_EXTERNAL_TABLE:
TableIf table = tblRef.getDesc().getTable();
@@ -1969,13 +1970,13 @@ public class SingleNodePlanner {
+ "please set enable_nereids_planner =
true to enable new optimizer");
}
scanNode = new HudiScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true,
- Optional.empty(), Optional.empty(),
ConnectContext.get().getSessionVariable());
+ Optional.empty(), Optional.empty(), sv);
break;
case ICEBERG:
- scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true, sv);
break;
case HIVE:
- scanNode = new HiveScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new HiveScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true, sv);
((HiveScanNode)
scanNode).setTableSample(tblRef.getTableSample());
break;
default:
@@ -1983,14 +1984,13 @@ public class SingleNodePlanner {
}
break;
case ICEBERG_EXTERNAL_TABLE:
- scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true, sv);
break;
case PAIMON_EXTERNAL_TABLE:
- scanNode = new PaimonScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true,
- ConnectContext.get().getSessionVariable());
+ scanNode = new PaimonScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true, sv);
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
- scanNode = new MaxComputeScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new MaxComputeScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true, sv);
break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index fc2a6d6dd3e..7fc7e5cc5e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.planner.DataGenScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TDataGenFunctionName;
import java.util.List;
@@ -32,7 +33,7 @@ public abstract class DataGenTableValuedFunction extends
TableValuedFunctionIf {
public abstract TDataGenFunctionName getDataGenFunctionName();
@Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
return new DataGenScanNode(id, desc, this);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e4c52971813..967838d34f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -52,6 +52,7 @@ import org.apache.doris.proto.Types.PStructField;
import org.apache.doris.proto.Types.PTypeDesc;
import org.apache.doris.proto.Types.PTypeNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@@ -300,8 +301,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
@Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
- return new TVFScanNode(id, desc, false);
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
+ return new TVFScanNode(id, desc, false, sv);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 3bd262f467d..324e17d4f24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -30,6 +30,7 @@ import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TFileType;
import java.util.ArrayList;
@@ -83,7 +84,7 @@ public class GroupCommitTableValuedFunction extends
ExternalFileTableValuedFunct
}
@Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
return new GroupCommitScanNode(id, desc, tableId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
index b884dab3882..a9847c7eadd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,7 +48,7 @@ public class JdbcQueryTableValueFunction extends
QueryTableValueFunction {
}
@Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(),
desc.getTable().getFullSchema(),
TableType.JDBC);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index a7e25bc7f82..7bd28f363e7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.tvf.source.MetadataScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
@@ -60,7 +61,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
public abstract TMetaScanRange getMetaScanRange();
@Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
return new MetadataScanNode(id, desc, this);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
index cb0f5100229..07a125836b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
@@ -28,6 +28,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -87,5 +88,5 @@ public abstract class QueryTableValueFunction extends
TableValuedFunctionIf {
public abstract List<Column> getTableColumns() throws AnalysisException;
@Override
- public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+ public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index d4faa460195..eb323a76672 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import java.util.List;
import java.util.Map;
@@ -88,7 +89,7 @@ public abstract class TableValuedFunctionIf {
public abstract List<Column> getTableColumns() throws AnalysisException;
- public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+ public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv);
public void checkAuth(ConnectContext ctx) {
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
index f3b44964915..a394836625d 100644
--- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
+++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
@@ -578,6 +578,26 @@ bbb
-- !c109 --
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3 3_1
+4 4_1
+
+-- !c115 --
+3 3_1
+4 4_1
+
-- !all --
1 2 3 4 5 6 7 8 9.1 10.1
11.10 2020-02-02 13str 14varchar a true aaaa
2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1
110.10 2020-03-02 130str 140varchar b false bbbb
2023-08-14T08:32:52.821
@@ -1157,6 +1177,26 @@ bbb
-- !c109 --
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3 3_1
+4 4_1
+
+-- !c115 --
+3 3_1
+4 4_1
+
-- !all --
1 2 3 4 5 6 7 8 9.1 10.1
11.10 2020-02-02 13str 14varchar a true aaaa
2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1
110.10 2020-03-02 130str 140varchar b false bbbb
2023-08-14T08:32:52.821
@@ -1736,6 +1776,26 @@ bbb
-- !c109 --
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3 3_1
+4 4_1
+
+-- !c115 --
+3 3_1
+4 4_1
+
-- !all --
1 2 3 4 5 6 7 8 9.1 10.1
11.10 2020-02-02 13str 14varchar a true aaaa
2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1
110.10 2020-03-02 130str 140varchar b false bbbb
2023-08-14T08:32:52.821
@@ -2315,3 +2375,23 @@ bbb
-- !c109 --
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3 3_1
+4 4_1
+
+-- !c115 --
+3 3_1
+4 4_1
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
index 4d74e1406e7..31235b5278f 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -111,6 +111,6 @@ suite("test_iceberg_optimize_count",
"p0,external,doris,external_docker,external
} finally {
sql """ set enable_count_push_down_for_external_table=true; """
- sql """drop catalog if exists ${catalog_name}"""
+ // sql """drop catalog if exists ${catalog_name}"""
}
}
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
index 41afb02e0f9..9668cbb0950 100644
--- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
@@ -181,6 +181,13 @@ suite("test_paimon_catalog",
"p0,external,doris,external_docker,external_docker_
def c108= """ select id from tb_with_upper_case where id = 1 """
def c109= """ select id from tb_with_upper_case where id < 1 """
+ def c110 = """select count(*) from deletion_vector_orc;"""
+ def c111 = """select count(*) from deletion_vector_parquet;"""
+ def c112 = """select count(*) from deletion_vector_orc where id >
2;"""
+ def c113 = """select count(*) from deletion_vector_parquet where
id > 2;"""
+ def c114 = """select * from deletion_vector_orc where id > 2;"""
+ def c115 = """select * from deletion_vector_parquet where id >
2;"""
+
String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
String catalog_name = "ctl_test_paimon_catalog"
String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
@@ -289,6 +296,13 @@ suite("test_paimon_catalog",
"p0,external,doris,external_docker,external_docker_
qt_c107 c107
qt_c108 c108
qt_c109 c109
+
+ qt_c110 c110
+ qt_c111 c111
+ qt_c112 c112
+ qt_c113 c113
+ qt_c114 c114
+ qt_c115 c115
}
test_cases("false", "false")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]