This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch short-circuit-in
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/short-circuit-in by this push:
new e00b9400562 [Enhancement](Short Circuit) short circuit query supports
`IN` (#39468)
e00b9400562 is described below
commit e00b9400562a23bdf8ee416f47366b53534ce2c0
Author: Xr Ling <[email protected]>
AuthorDate: Wed Jan 1 22:19:08 2025 +0800
[Enhancement](Short Circuit) short circuit query supports `IN` (#39468)
## Proposed changes
short circuit query support `IN`
<!--Describe your changes.-->
---------
Co-authored-by: lihangyu <[email protected]>
Co-authored-by: Liuyushiii <[email protected]>
Co-authored-by: eldenmoon <[email protected]>
---
be/src/olap/base_tablet.cpp | 1 -
be/src/service/internal_service.cpp | 29 +
be/src/service/internal_service.h | 5 +
be/src/service/point_query_executor.cpp | 5 +
.../org/apache/doris/catalog/PartitionKey.java | 23 +
.../LogicalResultSinkToShortCircuitPointQuery.java | 3 +-
.../doris/planner/HashDistributionPruner.java | 30 +-
.../org/apache/doris/planner/OlapScanNode.java | 81 ++-
.../PartitionPruneV2ForShortCircuitPlan.java | 41 +-
.../doris/planner/PartitionPrunerV2Base.java | 52 +-
.../doris/planner/RangePartitionPrunerV2.java | 66 ++-
.../org/apache/doris/qe/PointQueryExecutor.java | 599 +++++++++++++++++----
.../org/apache/doris/rpc/BackendServiceClient.java | 5 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 12 +
gensrc/proto/internal_service.proto | 11 +
.../data/point_query_p0/test_point_IN_query.out | 27 +
.../data/point_query_p0/test_point_query.out | 179 ++++++
.../point_query_p0/test_point_query_partition.out | 25 +
.../data/point_query_p0/test_rowstore.out | 30 ++
.../data/point_query_p0/test_rowstore_query.out | 6 +-
.../prepared_stmt_p0/prepared_stmt_in_list.out | 6 +
.../org/apache/doris/regression/suite/Suite.groovy | 6 +-
.../test_row_store_page_size.groovy | 4 +-
.../test_dynamic_partition_point_query.groovy | 122 +++++
.../point_query_p0/test_point_IN_query.groovy | 85 +++
.../suites/point_query_p0/test_point_query.groovy | 400 ++++++++++++++
.../point_query_p0/test_point_query_ck.groovy | 80 ++-
.../test_point_query_partition.groovy | 33 +-
.../suites/point_query_p0/test_rowstore.groovy | 70 +++
.../point_query_p0/test_rowstore_query.groovy | 3 +-
.../prepared_stmt_p0/prepared_stmt_in_list.groovy | 39 ++
31 files changed, 1910 insertions(+), 168 deletions(-)
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 40c5443477d..dc3b50688b6 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -495,7 +495,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
TabletSchema* latest
}
auto& segments = segment_caches[i]->get_segments();
DCHECK_EQ(segments.size(), num_segments);
-
for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, schema,
with_seq_col, with_rowid,
&loc, stats,
encoded_seq_value);
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index fb0b2f090bc..b58b697036d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -948,6 +948,35 @@ void
PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro
}
}
+void
PInternalService::tablet_batch_fetch_data(google::protobuf::RpcController*
controller,
+ const
PTabletBatchKeyLookupRequest* batchRequest,
+ PTabletBatchKeyLookupResponse*
batchResponse,
+ google::protobuf::Closure*
done) {
+ int request_count = batchRequest->sub_key_lookup_req_size();
+ batchResponse->mutable_sub_key_lookup_res()->Reserve(request_count);
+ [[maybe_unused]] auto* cntl = static_cast<brpc::Controller*>(controller);
+ bool ret =
+ _light_work_pool.try_offer([this, batchRequest, batchResponse,
done, request_count]() {
+ Status st = Status::OK();
+ brpc::ClosureGuard guard(done);
+ for (int i = 0; i < request_count; ++i) {
+ batchResponse->add_sub_key_lookup_res();
+ const PTabletKeyLookupRequest* request =
&batchRequest->sub_key_lookup_req(i);
+ PTabletKeyLookupResponse* response =
+ batchResponse->mutable_sub_key_lookup_res(i);
+ Status status = _tablet_fetch_data(request, response);
+ status.to_protobuf(response->mutable_status());
+ if (!status.ok()) {
+ st = status;
+ }
+ }
+ st.to_protobuf(batchResponse->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(batchResponse, done, _light_work_pool);
+ }
+}
+
void PInternalService::test_jdbc_connection(google::protobuf::RpcController*
controller,
const PJdbcTestConnectionRequest*
request,
PJdbcTestConnectionResult* result,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 66a0f867393..0c683ccc3ff 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -226,6 +226,11 @@ public:
PTabletKeyLookupResponse* response,
google::protobuf::Closure* done) override;
+ void tablet_batch_fetch_data(google::protobuf::RpcController* controller,
+ const PTabletBatchKeyLookupRequest*
batchRequest,
+ PTabletBatchKeyLookupResponse* batchResponse,
+ google::protobuf::Closure* done) override;
+
void test_jdbc_connection(google::protobuf::RpcController* controller,
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index ea991e158a1..3f77a9f2d2b 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -262,6 +262,11 @@ Status PointQueryExecutor::init(const
PTabletKeyLookupRequest* request,
auto cache_handle = LookupConnectionCache::instance()->get(uuid);
_binary_row_format = request->is_binary_row();
_tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id()));
+
+ if (_tablet->tablet_meta()->replica_id() != request->replica_id()) {
+ return Status::OK();
+ }
+
if (cache_handle != nullptr) {
_reusable = cache_handle;
_profile_metrics.hit_lookup_cache = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index 29bfda8b201..854cdd27f50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -58,6 +58,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
@@ -79,6 +80,24 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
types = Lists.newArrayList();
}
+ private PartitionKey(PartitionKey other) {
+ this.keys = new ArrayList<>(other.keys.size());
+ for (LiteralExpr expr : other.keys) {
+ try {
+ String value = expr.getStringValue();
+ if ("null".equalsIgnoreCase(value)) {
+ this.keys.add(NullLiteral.create(expr.getType()));
+ } else {
+ this.keys.add(LiteralExpr.create(value, expr.getType()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create partition key failed: " +
e.getMessage());
+ }
+ }
+ this.originHiveKeys = new ArrayList<>(other.originHiveKeys);
+ this.types = new ArrayList<>(other.types);
+ }
+
public void setDefaultListPartition(boolean isDefaultListPartitionKey) {
this.isDefaultListPartitionKey = isDefaultListPartitionKey;
}
@@ -205,6 +224,10 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
return createListPartitionKeyWithTypes(values, types, false);
}
+ public static PartitionKey clone(PartitionKey other) {
+ return new PartitionKey(other);
+ }
+
public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) {
keys.add(keyValue);
types.add(keyType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
index c087dcbb37b..56103f863d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LogicalResultSinkToShortCircuitPointQuery.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -53,7 +54,7 @@ public class LogicalResultSinkToShortCircuitPointQuery
implements RewriteRuleFac
private boolean
filterMatchShortCircuitCondition(LogicalFilter<LogicalOlapScan> filter) {
return filter.getConjuncts().stream().allMatch(
// all conjuncts match with pattern `key = ?`
- expression -> (expression instanceof EqualTo)
+ expression -> ((expression instanceof EqualTo) || expression
instanceof InPredicate)
&&
(removeCast(expression.child(0)).isKeyColumnFromTable()
|| (expression.child(0) instanceof SlotReference
&& ((SlotReference)
expression.child(0)).getName().equals(Column.DELETE_SIGN)))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
index 2a60e4029a6..38bfe6820fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashDistributionPruner.java
@@ -26,9 +26,8 @@ import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.Config;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
@@ -49,8 +48,6 @@ import java.util.Set;
* If depth is larger than 'max_distribution_pruner_recursion_depth', all
buckets will be return without pruning.
*/
public class HashDistributionPruner implements DistributionPruner {
- private static final Logger LOG =
LogManager.getLogger(HashDistributionPruner.class);
-
// partition list, sort by the hash code
private List<Long> bucketsList;
// partition columns
@@ -61,6 +58,18 @@ public class HashDistributionPruner implements
DistributionPruner {
private boolean isBaseIndexSelected;
+ /*
+ * This map maintains a relationship between distribution keys and their
corresponding tablet IDs.
+ * For example, if the distribution columns are (k1, k2, k3),
+ * and the tuple (1, 2, 3) is hashed into bucket 1001,
+ * then the `distributionKey2TabletIDs` would map the key (1, 2, 3) to the
tablet ID 1001.
+ * (1, 2, 3) -> 1001
+ * Map structure:
+ * - Key: PartitionKey, representing a specific combination of
distribution columns (e.g., k1, k2, k3).
+ * - Value: Set<Long>, containing the tablet IDs associated with the
corresponding distribution key.
+ */
+ private Map<PartitionKey, Set<Long>> distributionKey2TabletIDs =
Maps.newHashMap();
+
public HashDistributionPruner(List<Long> bucketsList, List<Column> columns,
Map<String, PartitionColumnFilter> filters, int
hashMod, boolean isBaseIndexSelected) {
this.bucketsList = bucketsList;
@@ -70,13 +79,21 @@ public class HashDistributionPruner implements
DistributionPruner {
this.isBaseIndexSelected = isBaseIndexSelected;
}
+ public Map<PartitionKey, Set<Long>> getDistributionKeysTabletIDs() {
+ return distributionKey2TabletIDs;
+ }
+
// columnId: which column to compute
// hashKey: the key which to compute hash value
public Collection<Long> prune(int columnId, PartitionKey hashKey, int
complex) {
if (columnId == distributionColumns.size()) {
// compute Hash Key
long hashValue = hashKey.getHashValue();
- return Lists.newArrayList(bucketsList.get((int) ((hashValue &
0xffffffff) % hashMod)));
+ List<Long> result =
+ Lists.newArrayList(bucketsList.get((int) ((hashValue &
0xffffffff) % hashMod)));
+
distributionKey2TabletIDs.computeIfAbsent(PartitionKey.clone(hashKey),
+ k ->
Sets.newHashSet(result)).addAll(result);
+ return result;
}
Column keyColumn = distributionColumns.get(columnId);
String columnName = isBaseIndexSelected ? keyColumn.getName()
@@ -119,9 +136,6 @@ public class HashDistributionPruner implements
DistributionPruner {
Collection<Long> subList = prune(columnId + 1, hashKey,
newComplex);
resultSet.addAll(subList);
hashKey.popColumn();
- if (resultSet.size() >= bucketsList.size()) {
- break;
- }
}
return resultSet;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 92175523f22..3756dc985c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
-import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
@@ -52,6 +51,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
@@ -98,9 +98,13 @@ import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -192,7 +196,21 @@ public class OlapScanNode extends ScanNode {
private Set<Long> sampleTabletIds = Sets.newHashSet();
private TableSample tableSample;
+ private HashSet<Long> scanBackendIds = new HashSet<>();
+
+ private PartitionPruner partitionPruner = null;
+
private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
+
+ // Maps partition column names to a RangeMap that associates ColumnBound
ranges with lists of partition IDs,
+ // similar to the implementation in PartitionPrunerV2Base.
+ private Map<String, RangeMap<ColumnBound, List<Long>>>
partitionCol2PartitionID = Maps.newHashMap();
+
+ private Map<PartitionKey, Set<Long>> distributionKeys2TabletID =
Maps.newHashMap();
+
+ /// tablet id -> (backend id -> replica)
+ private Table<Long, Long, Replica> scanBackendReplicaTable =
HashBasedTable.create();
+
// a bucket seq may map to many tablets, and each tablet has a
// TScanRangeLocations.
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations
= ArrayListMultimap.create();
@@ -257,6 +275,14 @@ public class OlapScanNode extends ScanNode {
return scanBackendIds;
}
+ public Map<String, RangeMap<ColumnBound, List<Long>>>
getPartitionCol2PartitionID() {
+ return partitionCol2PartitionID;
+ }
+
+ public Map<PartitionKey, Set<Long>> getDistributionKeys2TabletID() {
+ return distributionKeys2TabletID;
+ }
+
public void setSampleTabletIds(List<Long> sampleTablets) {
if (sampleTablets != null) {
this.sampleTabletIds.addAll(sampleTablets);
@@ -292,6 +318,10 @@ public class OlapScanNode extends ScanNode {
return scanTabletIds;
}
+ public Table<Long, Long, Replica> getScanBackendReplicaTable() {
+ return scanBackendReplicaTable;
+ }
+
public void setForceOpenPreAgg(boolean forceOpenPreAgg) {
this.forceOpenPreAgg = forceOpenPreAgg;
}
@@ -656,9 +686,9 @@ public class OlapScanNode extends ScanNode {
cardinality = (long) statsDeriveResult.getRowCount();
}
+ // get the pruned partition IDs
private Collection<Long> partitionPrune(PartitionInfo partitionInfo,
PartitionNames partitionNames) throws AnalysisException {
- PartitionPruner partitionPruner = null;
Map<Long, PartitionItem> keyItemMap;
if (partitionNames != null) {
keyItemMap = Maps.newHashMap();
@@ -675,13 +705,12 @@ public class OlapScanNode extends ScanNode {
if (partitionInfo.getType() == PartitionType.RANGE) {
if (isPointQuery() && partitionInfo.getPartitionColumns().size()
== 1) {
// short circuit, a quick path to find partition
- ColumnRange filterRange =
columnNameToRange.get(partitionInfo.getPartitionColumns().get(0).getName());
- LiteralExpr lowerBound =
filterRange.getRangeSet().get().asRanges().stream()
- .findFirst().get().lowerEndpoint().getValue();
- LiteralExpr upperBound =
filterRange.getRangeSet().get().asRanges().stream()
- .findFirst().get().upperEndpoint().getValue();
+ Column col = partitionInfo.getPartitionColumns().get(0);
+ // todo: support range query
+ Set<Range<ColumnBound>> filterRanges =
+
columnNameToRange.get(col.getName()).getRangeSet().get().asRanges();
cachedPartitionPruner.update(keyItemMap);
- return cachedPartitionPruner.prune(lowerBound, upperBound);
+ return cachedPartitionPruner.prune(filterRanges,
col.getName(), partitionCol2PartitionID);
}
partitionPruner = new RangePartitionPrunerV2(keyItemMap,
partitionInfo.getPartitionColumns(), columnNameToRange);
@@ -699,12 +728,22 @@ public class OlapScanNode extends ScanNode {
switch (distributionInfo.getType()) {
case HASH: {
HashDistributionInfo info = (HashDistributionInfo)
distributionInfo;
- distributionPruner = new
HashDistributionPruner(table.getTabletIdsInOrder(),
+ distributionPruner =
+ new HashDistributionPruner(table.getTabletIdsInOrder(),
info.getDistributionColumns(),
columnFilters,
info.getBucketNum(),
getSelectedIndexId() == olapTable.getBaseIndexId());
- return distributionPruner.prune();
+ HashDistributionPruner hashPruner = (HashDistributionPruner)
distributionPruner;
+ Collection<Long> resultIDs = hashPruner.prune();
+ Map<PartitionKey, Set<Long>> newPrunedIDs =
hashPruner.getDistributionKeysTabletIDs();
+ for (Map.Entry<PartitionKey, Set<Long>> entry :
newPrunedIDs.entrySet()) {
+ distributionKeys2TabletID.merge(entry.getKey(),
entry.getValue(), (existingSet, newSet) -> {
+ existingSet.addAll(newSet);
+ return existingSet;
+ });
+ }
+ return resultIDs;
}
case RANDOM: {
return null;
@@ -960,6 +999,7 @@ public class OlapScanNode extends ScanNode {
collectedStat = true;
}
scanBackendIds.add(backend.getId());
+ scanBackendReplicaTable.put(tabletId, backend.getId(),
replica);
// For skipping missing version of tablet, we only select the
backend with the highest last
// success version replica to save as much data as possible.
if (skipMissingVersion) {
@@ -1030,10 +1070,20 @@ public class OlapScanNode extends ScanNode {
// Step1: compute partition ids
PartitionNames partitionNames = ((BaseTableRef)
desc.getRef()).getPartitionNames();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
- if (partitionInfo.getType() == PartitionType.RANGE ||
partitionInfo.getType() == PartitionType.LIST) {
- selectedPartitionIds = partitionPrune(partitionInfo,
partitionNames);
- } else {
- selectedPartitionIds = olapTable.getPartitionIds();
+ switch (partitionInfo.getType()) {
+ case RANGE:
+ selectedPartitionIds = partitionPrune(partitionInfo,
partitionNames);
+ if (isPointQuery() && partitionPruner instanceof
RangePartitionPrunerV2) {
+ RangePartitionPrunerV2 rangePartitionPruner =
(RangePartitionPrunerV2) partitionPruner;
+ this.partitionCol2PartitionID =
rangePartitionPruner.getPartitionCol2PartitionID();
+ }
+ break;
+ case LIST:
+ selectedPartitionIds = partitionPrune(partitionInfo,
partitionNames);
+ break;
+ default:
+ selectedPartitionIds = olapTable.getPartitionIds();
+ break;
}
selectedPartitionIds =
olapTable.selectNonEmptyPartitionIds(selectedPartitionIds);
selectedPartitionNum = selectedPartitionIds.size();
@@ -1336,6 +1386,8 @@ public class OlapScanNode extends ScanNode {
// Lazy evaluation
selectedIndexId = olapTable.getBaseIndexId();
// Only key columns
+ distributionKeys2TabletID.clear();
+ partitionCol2PartitionID.clear();
computeColumnsFilter(olapTable.getBaseSchemaKeyColumns(),
olapTable.getPartitionInfo());
computePartitionInfo();
scanBackendIds.clear();
@@ -1343,6 +1395,7 @@ public class OlapScanNode extends ScanNode {
bucketSeq2locations.clear();
scanReplicaIds.clear();
sampleTabletIds.clear();
+ scanBackendReplicaTable.clear();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
index 0b98cac9724..fe6c7474bc7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPruneV2ForShortCircuitPlan.java
@@ -23,18 +23,24 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.AnalysisException;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
public class PartitionPruneV2ForShortCircuitPlan extends PartitionPrunerV2Base
{
private static final Logger LOG =
LogManager.getLogger(PartitionPruneV2ForShortCircuitPlan.class);
// map to record literal range to find specific partition
- private RangeMap<LiteralExpr, Long> partitionRangeMapByLiteral = new
RangeMap<>();
+ private RangeMap<ColumnBound, List<Long>> partitionRangeMap =
TreeRangeMap.create();
// last timestamp partitionRangeMapByLiteral updated
private long lastPartitionRangeMapUpdateTimestampMs = 0;
@@ -42,19 +48,36 @@ public class PartitionPruneV2ForShortCircuitPlan extends
PartitionPrunerV2Base {
super();
}
+ public static <C extends Comparable<C>, V> Set<V>
+ getOverlappingRangeValues(RangeMap<C, List<V>> partRangeMap,
Set<Range<C>> ranges) {
+ Set<V> partitionIds = Sets.newHashSet();
+ for (Range<C> range : ranges) {
+ Map<Range<C>, List<V>> overlappingRanges =
partRangeMap.subRangeMap(range).asMapOfRanges();
+ for (Map.Entry<Range<C>, List<V>> entry :
overlappingRanges.entrySet()) {
+ partitionIds.addAll(entry.getValue());
+ }
+ }
+ return partitionIds;
+ }
+
+ public RangeMap<ColumnBound, List<Long>>
getPartitionColValue2PartitionID() {
+ return partitionRangeMap;
+ }
+
public boolean update(Map<Long, PartitionItem> keyItemMap) {
// interval to update partitionRangeMapByLiteral
long partitionRangeMapUpdateIntervalS = 10;
if (System.currentTimeMillis() - lastPartitionRangeMapUpdateTimestampMs
> partitionRangeMapUpdateIntervalS * 1000) {
- partitionRangeMapByLiteral = new RangeMap<>();
+ partitionRangeMap = TreeRangeMap.create();
// recalculate map
for (Entry<Long, PartitionItem> entry : keyItemMap.entrySet()) {
Range<PartitionKey> range = entry.getValue().getItems();
LiteralExpr partitionLowerBound = (LiteralExpr)
range.lowerEndpoint().getKeys().get(0);
LiteralExpr partitionUpperBound = (LiteralExpr)
range.upperEndpoint().getKeys().get(0);
- Range<LiteralExpr> partitionRange =
Range.closedOpen(partitionLowerBound, partitionUpperBound);
- partitionRangeMapByLiteral.put(partitionRange, entry.getKey());
+ Range<ColumnBound> partitionRange =
+ Range.closedOpen(ColumnBound.of(partitionLowerBound),
ColumnBound.of(partitionUpperBound));
+ partitionRangeMap.put(partitionRange,
Lists.newArrayList(entry.getKey()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("update partitionRangeMapByLiteral");
@@ -65,9 +88,13 @@ public class PartitionPruneV2ForShortCircuitPlan extends
PartitionPrunerV2Base {
return false;
}
- public Collection<Long> prune(LiteralExpr lowerBound, LiteralExpr
upperBound) throws AnalysisException {
- Range<LiteralExpr> filterRangeValue = Range.closed(lowerBound,
upperBound);
- return
partitionRangeMapByLiteral.getOverlappingRangeValues(filterRangeValue);
+ public Collection<Long> prune(Set<Range<ColumnBound>> partitionColumnRange,
+ String partitionColName,
+ Map<String, RangeMap<ColumnBound,
List<Long>>> partitionCol2PartitionID) {
+ Set<Long> overlappingRangeValues =
getOverlappingRangeValues(partitionRangeMap, partitionColumnRange);
+ partitionCol2PartitionID.putIfAbsent(
+ partitionColName, partitionRangeMap);
+ return overlappingRangeValues;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
index 1d9f163ca80..60fcc523c38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
import java.util.Collection;
import java.util.Collections;
@@ -48,6 +50,19 @@ public abstract class PartitionPrunerV2Base implements
PartitionPruner {
// currently only used for list partition
private Map.Entry<Long, PartitionItem> defaultPartition;
+ /*
+ * This map maintains the relationship between partition columns and their
corresponding partition IDs.
+ * For example, if the partition columns are (k1, k2, k3), and there is a
partition `p0` with the range
+ * [(1, 5, 10), (5, 10, 20)), then the `partitionCol2PartitionID` map
should be:
+ * k1 -> [1, 5) -> p0
+ * Map structure:
+ * - Key: String, representing the name of a partition column (e.g., k1).
+ * - Value: RangeMap<ColumnBound, List<Long>>, where each range of column
bounds is mapped to a list of
+ * partition IDs. For instance, the range [1, 5) for column `k1`
would map to partition ID `p0`.
+ */
+ // todo: `List<Long>` is not neccessary, `Long` is enough
+ protected Map<String, RangeMap<ColumnBound, List<Long>>>
partitionCol2PartitionID = Maps.newHashMap();
+
// Only called in PartitionPruneV2ByShortCircuitPlan constructor
PartitionPrunerV2Base() {
this.idToPartitionItem = null;
@@ -76,6 +91,10 @@ public abstract class PartitionPrunerV2Base implements
PartitionPruner {
findDefaultPartition(idToPartitionItem);
}
+ public Map<String, RangeMap<ColumnBound, List<Long>>>
getPartitionCol2PartitionID() {
+ return partitionCol2PartitionID;
+ }
+
private Collection<Long> handleDefaultPartition(Collection<Long> result) {
if (this.defaultPartition != null) {
Set<Long> r = result.stream().collect(Collectors.toSet());
@@ -107,11 +126,12 @@ public abstract class PartitionPrunerV2Base implements
PartitionPruner {
@Override
public Collection<Long> prune() throws AnalysisException {
Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();
- for (Column column : partitionColumns) {
+ for (Column column : partitionColumns) { // partition col is key
ColumnRange columnRange = columnNameToRange.get(column.getName());
if (columnRange == null) {
columnToFilters.put(column, FinalFilters.noFilters());
} else {
+ // add the partiton&key col
columnToFilters.put(column, getFinalFilters(columnRange,
column));
}
}
@@ -160,22 +180,32 @@ public abstract class PartitionPrunerV2Base implements
PartitionPruner {
* partitions.
*/
private Collection<Long> pruneSingleColumnPartition(Map<Column,
FinalFilters> columnToFilters) {
- FinalFilters finalFilters =
columnToFilters.get(partitionColumns.get(0));
+ Column partitionCol = partitionColumns.get(0);
+ FinalFilters finalFilters = columnToFilters.get(partitionCol);
switch (finalFilters.type) {
case CONSTANT_FALSE_FILTERS:
return Collections.emptySet();
case HAVE_FILTERS:
genSingleColumnRangeMap();
Preconditions.checkNotNull(singleColumnRangeMap);
- return finalFilters.filters.stream()
- .map(filter -> {
- RangeMap<ColumnBound, UniqueId> filtered =
singleColumnRangeMap.subRangeMap(filter);
- return filtered.asMapOfRanges().values().stream()
- .map(UniqueId::getPartitionId)
- .collect(Collectors.toSet());
- })
- .flatMap(Set::stream)
- .collect(Collectors.toSet());
+ partitionCol2PartitionID.put(partitionCol.getName(),
TreeRangeMap.create());
+ Set<Long> resultPartID = Sets.newHashSet();
+ finalFilters.filters.forEach(filter -> {
+ RangeMap<ColumnBound, UniqueId> filtered =
singleColumnRangeMap.subRangeMap(filter);
+
+ filtered.asMapOfRanges().forEach((range, partID) -> {
+ RangeMap<ColumnBound, List<Long>> rangeMap =
+
partitionCol2PartitionID.get(partitionCol.getName());
+ List<Long> partitionIds =
rangeMap.get(range.lowerEndpoint());
+ if (partitionIds == null) {
+ partitionIds = Lists.newArrayList();
+ rangeMap.put(range, partitionIds);
+ }
+ partitionIds.add(partID.getPartitionId());
+ resultPartID.add(partID.getPartitionId());
+ });
+ });
+ return resultPartID;
case NO_FILTERS:
default:
return idToPartitionItem.keySet();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
index 8acba72f156..5d523b01bfb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java
@@ -57,13 +57,43 @@ public class RangePartitionPrunerV2 extends
PartitionPrunerV2Base {
public static RangeMap<ColumnBound, UniqueId>
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) {
RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
- idToPartitionItem.forEach((id, item) -> {
+ idToPartitionItem.forEach((id, item) -> { // partition id , key range
Range<PartitionKey> range = item.getItems();
candidate.put(mapPartitionKeyRange(range, 0), new
RangePartitionUniqueId(id));
});
return candidate;
}
+ public static <C extends Comparable<C>> Range<C> buildRange(C lowerBound,
BoundType lowerBoundType,
+ C upperBound,
BoundType upperBoundType) {
+ if (lowerBound.compareTo(upperBound) > 0) {
+ // swap
+ C temp = lowerBound;
+ lowerBound = upperBound;
+ upperBound = temp;
+
+ BoundType tempBoundType = lowerBoundType;
+ lowerBoundType = upperBoundType;
+ upperBoundType = tempBoundType;
+ }
+ if (lowerBoundType == BoundType.CLOSED && upperBoundType ==
BoundType.OPEN) {
+ // [lowerBound, upperBound)
+ return Range.closedOpen(lowerBound, upperBound);
+ } else if (lowerBoundType == BoundType.OPEN && upperBoundType ==
BoundType.CLOSED) {
+ // (lowerBound, upperBound]
+ return Range.openClosed(lowerBound, upperBound);
+ } else if (lowerBoundType == BoundType.CLOSED && upperBoundType ==
BoundType.CLOSED) {
+ // [lowerBound, upperBound]
+ return Range.closed(lowerBound, upperBound);
+ } else if (lowerBoundType == BoundType.OPEN && upperBoundType ==
BoundType.OPEN) {
+ // (lowerBound, upperBound)
+ return Range.open(lowerBound, upperBound);
+ } else {
+ throw new IllegalArgumentException("Unsupported BoundType
combination: "
+ + lowerBoundType + " and " + upperBoundType);
+ }
+ }
+
/**
* This is just like the logic in v1 version, but we support disjunctive
predicates here.
*/
@@ -154,8 +184,38 @@ public class RangePartitionPrunerV2 extends
PartitionPrunerV2Base {
&& filter.hasUpperBound() &&
filter.upperBoundType() == BoundType.CLOSED
&& filter.lowerEndpoint() ==
filter.upperEndpoint()) {
// Equal to predicate, e.g., col=1, the filter range
is [1, 1].
- minKey.pushColumn(filter.lowerEndpoint().getValue(),
column.getDataType());
- maxKey.pushColumn(filter.upperEndpoint().getValue(),
column.getDataType());
+ ColumnBound lowerFilter = filter.lowerEndpoint();
+ ColumnBound upperFilter = filter.upperEndpoint();
+ minKey.pushColumn(lowerFilter.getValue(),
column.getDataType());
+ maxKey.pushColumn(upperFilter.getValue(),
column.getDataType());
+
+ // Locate the partition to which the filter belongs
+ List<Long> partID = Lists.newArrayList();
+ for (Map.Entry<Range<PartitionKey>, Long>
rangeMapEntry : rangeMap.asMapOfRanges().entrySet()) {
+ Range<PartitionKey> partitionColRange =
rangeMapEntry.getKey();
+ PartitionKey upperPartitionKeys =
partitionColRange.upperEndpoint();
+ int partitionLess = upperPartitionKeys.getKeys()
+
.get(columnIdx).compareTo(lowerFilter.getValue());
+ if (partitionLess < 0) {
+ continue;
+ }
+ PartitionKey lowerPartitionKeys =
partitionColRange.lowerEndpoint();
+ int partitionGreater = lowerPartitionKeys.getKeys()
+
.get(columnIdx).compareTo(upperFilter.getValue());
+ if (partitionGreater > 0) {
+ break;
+ }
+ Range<ColumnBound> keyColRange = buildRange(
+
ColumnBound.of(lowerPartitionKeys.getKeys().get(columnIdx)),
+ partitionColRange.lowerBoundType(),
+
ColumnBound.of(upperPartitionKeys.getKeys().get(columnIdx)),
+ partitionColRange.upperBoundType());
+ if (filter.isConnected(keyColRange)) {
+ partID.add(rangeMapEntry.getValue());
+ }
+ }
+ partitionCol2PartitionID.putIfAbsent(column.getName(),
TreeRangeMap.create());
+
partitionCol2PartitionID.get(column.getName()).put(filter, partID);
result.addAll(doPruneMulti(columnToFilters, rangeMap,
columnIdx + 1, minKey, maxKey));
minKey.popColumn();
maxKey.popColumn();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
index 9e4030b768b..d65c6032b86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -19,19 +19,26 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Replica;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.KeyTuple;
@@ -48,6 +55,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
@@ -55,31 +65,78 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class PointQueryExecutor implements CoordInterface {
private static final Logger LOG =
LogManager.getLogger(PointQueryExecutor.class);
- private long tabletID = 0;
+ private static final ConcurrentHashMap<Class<? extends Expr>,
ConjunctHandler> handlers = new ConcurrentHashMap<>();
+
+ private class RoundRobinScheduler<T> {
+ private final List<T> list;
+ private AtomicInteger currentIndex;
+
+ public RoundRobinScheduler(List<T> list) {
+ this.list = list;
+ this.currentIndex = new AtomicInteger(0);
+ }
+
+ public T next() {
+ if (list.isEmpty()) {
+ return null;
+ }
+ int index = currentIndex.getAndUpdate(i -> (i + 1) % list.size());
+ return list.get(index);
+ }
+ }
+
+ private List<Long> scanTabletIDs = new ArrayList<>();
private long timeoutMs = Config.point_query_timeout_ms; // default 10s
private boolean isCancel = false;
private List<Backend> candidateBackends;
+
+ // key: tablet id, value: backend and replica id
+ private HashMap<Long, Pair<Backend, Long>> pickedCandidateReplicas =
Maps.newHashMap();
+
+ RoundRobinScheduler<Backend> roundRobinscheduler = null;
+
+ // tablet id -> backend id -> replica
+ Table<Long, Long, Replica> replicaMetaTable = null;
+
private final int maxMsgSizeOfResultReceiver;
// used for snapshot read in cloud mode
- private List<Long> snapshotVisibleVersions;
+ // Key: cloud partition id, Value: snapshot visible version
+ private HashMap<CloudPartition, Long> snapshotVisibleVersions;
private final ShortCircuitQueryContext shortCircuitQueryContext;
+ Map<PartitionKey, Set<Long>> distributionKeys2TabletID = null;
+
+ // Maps partition column names to a RangeMap that associates ColumnBound
ranges with lists of partition IDs,
+ // similar to the implementation in PartitionPrunerV2Base.
+ Map<String, RangeMap<ColumnBound, List<Long>>> partitionCol2PartitionID =
null;
+
+ List<Set<Long>> keyTupleIndex2TabletID = null;
+
+ List<List<String>> allKeyTuples = null;
+
+ private List<Integer> distributionKeyColumns = Lists.newArrayList();
+
+ private List<Integer> partitionKeyColumns = Lists.newArrayList();
+
public PointQueryExecutor(ShortCircuitQueryContext ctx, int
maxMessageSize) {
ctx.sanitize();
this.shortCircuitQueryContext = ctx;
@@ -97,10 +154,13 @@ public class PointQueryExecutor implements CoordInterface {
partitions.add((CloudPartition) table.getPartition(id));
}
}
- snapshotVisibleVersions =
CloudPartition.getSnapshotVisibleVersion(partitions);
- // Only support single partition at present
- Preconditions.checkState(snapshotVisibleVersions.size() == 1);
- LOG.debug("set cloud version {}", snapshotVisibleVersions.get(0));
+ snapshotVisibleVersions = (snapshotVisibleVersions == null) ?
Maps.newHashMap() : snapshotVisibleVersions;
+ List<Long> versionList =
CloudPartition.getSnapshotVisibleVersion(partitions);
+ for (int i = 0; i < versionList.size(); ++i) {
+ snapshotVisibleVersions.put(partitions.get(i), versionList.get(i));
+ }
+
+ LOG.debug("set cloud version {}", snapshotVisibleVersions);
}
void setScanRangeLocations() throws Exception {
@@ -111,8 +171,9 @@ public class PointQueryExecutor implements CoordInterface {
if (scanNode.getScanTabletIds().isEmpty()) {
return;
}
- Preconditions.checkState(scanNode.getScanTabletIds().size() == 1);
- this.tabletID = scanNode.getScanTabletIds().get(0);
+ this.distributionKeys2TabletID =
scanNode.getDistributionKeys2TabletID();
+ this.partitionCol2PartitionID = scanNode.getPartitionCol2PartitionID();
+ this.scanTabletIDs = scanNode.getScanTabletIds();
// update partition version if cloud mode
if (Config.isCloudMode()
@@ -128,10 +189,9 @@ public class PointQueryExecutor implements CoordInterface {
candidateBackends.add(backend);
}
}
- // Random read replicas
- Collections.shuffle(this.candidateBackends);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("set scan locations, backend ids {}, tablet id {}",
candidateBackends, tabletID);
+ LOG.debug("set scan locations, backend ids {}, tablet ids {}",
candidateBackends, scanTabletIDs);
}
}
@@ -159,40 +219,260 @@ public class PointQueryExecutor implements
CoordInterface {
.getMysqlChannel(), null, null);
}
- private static void updateScanNodeConjuncts(OlapScanNode scanNode,
List<Expr> conjunctVals) {
- for (int i = 0; i < conjunctVals.size(); ++i) {
- BinaryPredicate binaryPredicate = (BinaryPredicate)
scanNode.getConjuncts().get(i);
- if (binaryPredicate.getChild(0) instanceof LiteralExpr) {
- binaryPredicate.setChild(0, conjunctVals.get(i));
- } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) {
- binaryPredicate.setChild(1, conjunctVals.get(i));
+ /*
+ * Interface for handling different conjunct types
+ */
+ private interface ConjunctHandler {
+ int handle(Expr expr, List<Expr> conjunctVals, int
handledConjunctVals) throws AnalysisException;
+ }
+
+ private static class InPredicateHandler implements ConjunctHandler {
+ public static final InPredicateHandler INSTANCE = new
InPredicateHandler();
+
+ private InPredicateHandler() {
+ }
+
+ @Override
+ public int handle(Expr expr, List<Expr> conjunctVals, int
handledConjunctVals) throws AnalysisException {
+ InPredicate inPredicate = (InPredicate) expr;
+ if (inPredicate.isNotIn()) {
+ throw new AnalysisException("Not support NOT IN predicate in
point query");
+ }
+ for (int j = 1; j < inPredicate.getChildren().size(); ++j) {
+ if (inPredicate.getChild(j) instanceof LiteralExpr) {
+ inPredicate.setChild(j,
conjunctVals.get(handledConjunctVals++));
+ } else {
+ Preconditions.checkState(false, "Should contains literal
in " + inPredicate.toSqlImpl());
+ }
+ }
+ return handledConjunctVals;
+ }
+ }
+
+ private static class BinaryPredicateHandler implements ConjunctHandler {
+ public static final BinaryPredicateHandler INSTANCE = new
BinaryPredicateHandler();
+
+ private BinaryPredicateHandler() {
+ }
+
+ @Override
+ public int handle(Expr expr, List<Expr> conjunctVals, int
handledConjunctVals) throws AnalysisException {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+ Expr left = binaryPredicate.getChild(0);
+ Expr right = binaryPredicate.getChild(1);
+
+ if (isDeleteSign(left) || isDeleteSign(right)) {
+ return handledConjunctVals;
+ }
+
+ if (isLiteralExpr(left)) {
+ binaryPredicate.setChild(0,
conjunctVals.get(handledConjunctVals++));
+ } else if (isLiteralExpr(right)) {
+ binaryPredicate.setChild(1,
conjunctVals.get(handledConjunctVals++));
} else {
Preconditions.checkState(false, "Should contains literal in "
+ binaryPredicate.toSqlImpl());
}
+ return handledConjunctVals;
+ }
+
+ private boolean isLiteralExpr(Expr expr) {
+ return expr instanceof LiteralExpr;
+ }
+
+ private boolean isDeleteSign(Expr expr) {
+ return expr instanceof SlotRef && ((SlotRef)
expr).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN);
+ }
+ }
+
+ private static void initHandler() {
+ handlers.put(InPredicate.class, InPredicateHandler.INSTANCE);
+ handlers.put(BinaryPredicate.class, BinaryPredicateHandler.INSTANCE);
+ }
+
+ private static void updateScanNodeConjuncts(OlapScanNode scanNode,
List<Expr> conjunctVals) {
+ List<Expr> conjuncts = scanNode.getConjuncts();
+ if (handlers.isEmpty()) {
+ initHandler();
+ }
+ int handledConjunctVals = 0;
+ for (Expr expr : conjuncts) {
+ ConjunctHandler handler = handlers.get(expr.getClass());
+ if (handler == null) {
+ throw new AnalysisException("Not support conjunct type " +
expr.getClass().getName());
+ }
+ handledConjunctVals = handler.handle(expr, conjunctVals,
handledConjunctVals);
}
+
+ Preconditions.checkState(handledConjunctVals == conjunctVals.size());
}
public void setTimeout(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
- void addKeyTuples(
- InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
- // TODO handle IN predicates
- Map<String, Expr> columnExpr = Maps.newHashMap();
- KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
- for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) {
+ /*
+ * According to the current ordered key tuple, based on the leftmost
partition principle,
+ * get the partition ID to which it belongs
+ */
+ private Set<Long> getLeftMostPartitionIDs(List<String> orderedKeyTuple,
+ List<Column> keyColumns) {
+ Set<Long> leftMostPartitionIDs = Sets.newHashSet();
+ for (int i = 0; i < partitionKeyColumns.size(); ++i) {
+ int colIdx = partitionKeyColumns.get(i);
+ String partitionColName = keyColumns.get(colIdx).getName();
+ try {
+ ColumnBound partitionKey =
ColumnBound.of(LiteralExpr.create(orderedKeyTuple.get(colIdx),
+ keyColumns.get(colIdx).getType()));
+ List<Long> partitionIDs = Lists.newArrayList(
+ Optional.ofNullable(
+
partitionCol2PartitionID.get(partitionColName).get(partitionKey))
+ .orElse(Collections.emptyList()));
+ // Add the first partition column directly
+ if (i == 0) {
+ leftMostPartitionIDs.addAll(partitionIDs);
+ continue;
+ }
+ if (leftMostPartitionIDs.isEmpty() || partitionIDs == null) {
+ break;
+ }
+ partitionIDs.retainAll(leftMostPartitionIDs);
+ if (partitionIDs.isEmpty()) {
+ break;
+ }
+ } catch (Exception e) {
+ throw new AnalysisException("Failed to create partition key
for key tuple: " + orderedKeyTuple);
+ }
+ }
+ return leftMostPartitionIDs;
+ }
+
+ private void pickCandidateBackends() {
+ for (Long tabletID : scanTabletIDs) {
+ roundRobinPickReplica(tabletID);
+ }
+ }
+
+ private void roundRobinPickReplica(Long tabletID) {
+ while (true) {
+ Backend backend = roundRobinscheduler.next();
+ if (backend == null) {
+ break;
+ }
+ Map<Long, Replica> beWithReplica = replicaMetaTable.row(tabletID);
+ if (!beWithReplica.containsKey(backend.getId())) {
+ continue;
+ }
+ pickedCandidateReplicas
+ .putIfAbsent(tabletID,
+ Pair.of(backend,
beWithReplica.get(backend.getId()).getId()));
+ break;
+ }
+ }
+
+ // Use the leftmost matching partitions to filter out tablets that do not
belong to these partitions
+ private void addTabletIDsForKeyTuple(List<String> orderedKeyTuple,
List<Column> keyColumns,
+ OlapTable olapTable, Set<Long>
leftMostPartitionIDs) {
+ // get part of the key tuple using distribution columns
+ List<String> keyTupleForDistributionPrune = Lists.newArrayList();
+ for (Integer idx : distributionKeyColumns) {
+ keyTupleForDistributionPrune.add(orderedKeyTuple.get(idx));
+ }
+ Set<Long> tabletIDs = Sets.newHashSet();
+ for (PartitionKey key : distributionKeys2TabletID.keySet()) {
+ List<String> distributionKeys = Lists.newArrayList();
+ for (LiteralExpr expr : key.getKeys()) {
+ distributionKeys.add(expr.getStringValue());
+ }
+ if (distributionKeys.equals(keyTupleForDistributionPrune)) {
+ Set<Long> originTabletIDs =
Sets.newHashSet(distributionKeys2TabletID.get(key));
+ // If partitions are not explicitly created, this condition
holds true
+ if (leftMostPartitionIDs.isEmpty()) {
+ tabletIDs.addAll(originTabletIDs);
+ } else {
+ Set<Long> prunedTabletIDs = Sets.newHashSet();
+ for (Long partitionID : leftMostPartitionIDs) {
+ Partition partition =
olapTable.getPartition(partitionID);
+ MaterializedIndex selectedTable =
+
partition.getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId());
+ // filter out tablets that do not belong to this
partition
+ selectedTable.getTablets().forEach(tablet -> {
+ if (originTabletIDs.contains(tablet.getId())) {
+ prunedTabletIDs.add(tablet.getId());
+ }
+ });
+ tabletIDs.addAll(prunedTabletIDs);
+ }
+ }
+ break;
+ }
+ }
+ keyTupleIndex2TabletID.add(tabletIDs.isEmpty() ? null : tabletIDs);
+ }
+
+ // Get all possible key tuple combinations
+ void getAllKeyTupleCombination(List<Expr> conjuncts, int index,
+ List<String> currentKeyTuple,
+ List<List<String>> result,
+ List<String> columnExpr,
+ List<Column> keyColumns) {
+ if (index == conjuncts.size()) {
+ List<String> orderedKeyTuple = new
ArrayList<>(currentKeyTuple.size());
+ OlapTable olapTable =
shortCircuitQueryContext.scanNode.getOlapTable();
+
+ // add key tuple in keys order
+ for (Column column : keyColumns) {
+ int colIdx = columnExpr.indexOf(column.getName());
+ String currentKey = currentKeyTuple.get(colIdx);
+ orderedKeyTuple.add(currentKey);
+ }
+ result.add(Lists.newArrayList(orderedKeyTuple));
+ Set<Long> leftMostPartitionIDs =
getLeftMostPartitionIDs(orderedKeyTuple, keyColumns);
+ addTabletIDsForKeyTuple(orderedKeyTuple, keyColumns, olapTable,
leftMostPartitionIDs);
+ return;
+ }
+
+ Expr expr = conjuncts.get(index);
+ if (expr instanceof BinaryPredicate) {
BinaryPredicate predicate = (BinaryPredicate) expr;
Expr left = predicate.getChild(0);
Expr right = predicate.getChild(1);
SlotRef columnSlot = left.unwrapSlotRef();
- columnExpr.put(columnSlot.getColumnName(), right);
+ if (left instanceof SlotRef && ((SlotRef)
left).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN)) {
+ getAllKeyTupleCombination(conjuncts, index + 1,
currentKeyTuple, result, columnExpr, keyColumns);
+ return;
+ }
+ columnExpr.add(columnSlot.getColumnName());
+ currentKeyTuple.add(right.getStringValue());
+ getAllKeyTupleCombination(conjuncts, index + 1, currentKeyTuple,
result, columnExpr, keyColumns);
+ currentKeyTuple.remove(currentKeyTuple.size() - 1);
+ columnExpr.remove(columnExpr.size() - 1);
+ } else if (expr instanceof InPredicate) {
+ InPredicate inPredicate = (InPredicate) expr;
+ if (inPredicate.isNotIn()) {
+ throw new AnalysisException("Not support NOT IN predicate in
point query");
+ }
+ SlotRef columnSlot = inPredicate.getChild(0).unwrapSlotRef();
+ columnExpr.add(columnSlot.getColumnName());
+ for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
+ currentKeyTuple.add(inPredicate.getChild(i).getStringValue());
+ getAllKeyTupleCombination(conjuncts, index + 1,
currentKeyTuple, result, columnExpr, keyColumns);
+ currentKeyTuple.remove(currentKeyTuple.size() - 1);
+ }
+ columnExpr.remove(columnExpr.size() - 1);
+ } else {
+ throw new AnalysisException("Not support conjunct type " +
expr.getClass().getName());
}
- // add key tuple in keys order
- for (Column column :
shortCircuitQueryContext.scanNode.getOlapTable().getBaseSchemaKeyColumns()) {
-
kBuilder.addKeyColumnRep(columnExpr.get(column.getName()).getStringValue());
+ }
+
+ List<List<String>> addAllKeyTuples(List<Column> keyColumns) {
+ List<Expr> conjuncts =
shortCircuitQueryContext.scanNode.getConjuncts();
+ List<List<String>> keyTuples = Lists.newArrayList();
+ if (keyTupleIndex2TabletID == null) {
+ keyTupleIndex2TabletID = Lists.newArrayList();
}
- requestBuilder.addKeyTuples(kBuilder);
+ getAllKeyTupleCombination(conjuncts, 0, new ArrayList<>(), keyTuples,
Lists.newArrayList(), keyColumns);
+ Preconditions.checkState(keyTuples.size() ==
keyTupleIndex2TabletID.size());
+ return keyTuples;
}
@Override
@@ -208,21 +488,71 @@ public class PointQueryExecutor implements CoordInterface
{
if (candidateBackends == null || candidateBackends.isEmpty()) {
return new RowBatch();
}
- Iterator<Backend> backendIter = candidateBackends.iterator();
- RowBatch rowBatch = null;
- int tryCount = 0;
- int maxTry = Math.min(Config.max_point_query_retry_time,
candidateBackends.size());
+
+ // pick candidate backends
+ OlapScanNode scanNode = shortCircuitQueryContext.scanNode;
+ replicaMetaTable = scanNode.getScanBackendReplicaTable();
+ roundRobinscheduler = new RoundRobinScheduler<>(candidateBackends);
+ pickCandidateBackends();
+
+ OlapTable olapTable = scanNode.getOlapTable();
+ List<Column> keyColumns = olapTable.getBaseSchemaKeyColumns();
+ for (int i = 0; i < keyColumns.size(); ++i) {
+ Column column = keyColumns.get(i);
+ if (olapTable.isPartitionColumn(column.getName())) {
+ partitionKeyColumns.add(i);
+ }
+ if (olapTable.isDistributionColumn(column.getName())) {
+ distributionKeyColumns.add(i);
+ }
+ }
+ RowBatch rowBatch = new RowBatch();
Status status = new Status();
- do {
- Backend backend = backendIter.next();
- rowBatch = getNextInternal(status, backend);
- if (rowBatch != null) {
+ this.allKeyTuples = addAllKeyTuples(keyColumns);
+ TResultBatch resultBatch = new TResultBatch();
+ resultBatch.setRows(Lists.newArrayList());
+ List<byte[]> batchSerialResult = Lists.newArrayList();
+ Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder>
batchRequestBuilders =
+ buildBatchRequest(status);
+ // send batch request
+ if (batchRequestBuilders.isEmpty()) {
+ status.updateStatus(TStatusCode.OK, "");
+ rowBatch.setEos(true);
+ return rowBatch;
+ }
+ for (Map.Entry<Backend,
InternalService.PTabletBatchKeyLookupRequest.Builder> entry :
+ batchRequestBuilders.entrySet()) {
+ List<byte[]> subSerialResult = batchGetNext(status,
entry.getKey(), entry.getValue());
+
+ if (!status.ok()) {
break;
}
- if (++tryCount >= maxTry) {
- break;
+ batchSerialResult.addAll(subSerialResult);
+ }
+
+ // todo: maybe there is a better way
+ if (!batchSerialResult.isEmpty()) {
+ TDeserializer deserializer = new TDeserializer(
+ new
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+ for (byte[] serialResult : batchSerialResult) {
+ TResultBatch tmpResultBatch = new TResultBatch();
+ try {
+ deserializer.deserialize(tmpResultBatch, serialResult);
+ tmpResultBatch.getRows().forEach(row -> {
+ resultBatch.addToRows(row);
+ });
+ } catch (TException e) {
+ if (e.getMessage().contains("MaxMessageSize reached")) {
+ throw new TException("MaxMessageSize reached, try
increase max_msg_size_of_result_receiver");
+ } else {
+ throw e;
+ }
+ }
}
- } while (true);
+ rowBatch.setBatch(resultBatch);
+ }
+ rowBatch.setEos(true);
+
// handle status code
if (!status.ok()) {
if (Strings.isNullOrEmpty(status.getErrorMsg())) {
@@ -250,45 +580,106 @@ public class PointQueryExecutor implements
CoordInterface {
// only handles in getNext()
}
- private RowBatch getNextInternal(Status status, Backend backend) throws
TException {
- long timeoutTs = System.currentTimeMillis() + timeoutMs;
- RowBatch rowBatch = new RowBatch();
- InternalService.PTabletKeyLookupResponse pResult = null;
- try {
-
Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable);
-
- InternalService.PTabletKeyLookupRequest.Builder requestBuilder
- = InternalService.PTabletKeyLookupRequest.newBuilder()
- .setTabletId(tabletID)
- .setDescTbl(shortCircuitQueryContext.serializedDescTable)
-
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
-
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
- .setIsBinaryRow(ConnectContext.get().command ==
MysqlCommand.COM_STMT_EXECUTE);
- if (snapshotVisibleVersions != null &&
!snapshotVisibleVersions.isEmpty()) {
- requestBuilder.setVersion(snapshotVisibleVersions.get(0));
- }
- // Only set cacheID for prepared statement excute phase,
- // otherwise leading to many redundant cost in BE side
- if (shortCircuitQueryContext.cacheID != null
- && ConnectContext.get().command ==
MysqlCommand.COM_STMT_EXECUTE) {
- InternalService.UUID.Builder uuidBuilder =
InternalService.UUID.newBuilder();
-
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
-
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
- requestBuilder.setUuid(uuidBuilder);
+ private void collectBatchRequests(
+ Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder>
batchRequests,
+ KeyTuple.Builder kBuilder, Set<Long> tabletIDsOfKeyTuple) {
+ // check containsKey
+ for (Long tabletID : tabletIDsOfKeyTuple) {
+
Preconditions.checkState(pickedCandidateReplicas.containsKey(tabletID));
+ Pair<Backend, Long> beWithReplicaID =
pickedCandidateReplicas.get(tabletID);
+ Backend candidate = beWithReplicaID.first;
+ batchRequests.putIfAbsent(
+ candidate,
+ InternalService.PTabletBatchKeyLookupRequest.newBuilder());
+ buildSubRequest(tabletID, kBuilder, beWithReplicaID.second,
+ batchRequests.get(candidate));
+ }
+ }
+
+ // Find the tabletID, backend, and replica corresponding to each keyTuple,
+ // and then add them to batchRequests. Each backend corresponds to a
batchRequest.
+ private Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder>
+ buildBatchRequest(Status status) throws TException {
+ Map<Backend, InternalService.PTabletBatchKeyLookupRequest.Builder>
batchRequestBuilders =
+ Maps.newHashMap();
+ for (int i = 0; i < keyTupleIndex2TabletID.size(); ++i) {
+ if (keyTupleIndex2TabletID.get(i) == null) {
+ continue;
+ }
+ KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
+ for (String key : this.allKeyTuples.get(i)) {
+ kBuilder.addKeyColumnRep(key);
+ }
+ collectBatchRequests(batchRequestBuilders, kBuilder,
keyTupleIndex2TabletID.get(i));
+ }
+ return batchRequestBuilders;
+ }
+
+ // Build a request about a keyTuple, that is, SubRequest
+ private void buildSubRequest(
+ Long prunedTabletIdsOfBe, KeyTuple.Builder kBuilder, Long
replicaID,
+ InternalService.PTabletBatchKeyLookupRequest.Builder
pBatchRequestBuilder) {
+ InternalService.PTabletKeyLookupRequest.Builder requestBuilder
+ = InternalService.PTabletKeyLookupRequest.newBuilder()
+ .setDescTbl(shortCircuitQueryContext.serializedDescTable)
+ .setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
+
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
+ .setIsBinaryRow(ConnectContext.get().command ==
MysqlCommand.COM_STMT_EXECUTE);
+
+ // TODO: optimize me
+ if (snapshotVisibleVersions != null &&
!snapshotVisibleVersions.isEmpty()) {
+ Long versionToSet = -1L;
+ for (Map.Entry<CloudPartition, Long> entry :
snapshotVisibleVersions.entrySet()) {
+ MaterializedIndex selectedTable =
+
entry.getKey().getIndex(shortCircuitQueryContext.scanNode.getSelectedIndexId());
+ if
(selectedTable.getTabletIdsInOrder().contains(prunedTabletIdsOfBe)) {
+ versionToSet = entry.getValue();
+ break;
+ }
}
- addKeyTuples(requestBuilder);
+ requestBuilder.setVersion(versionToSet);
+ }
+ // Only set cacheID for prepared statement excute phase,
+ // otherwise leading to many redundant cost in BE side
+ if (shortCircuitQueryContext.cacheID != null
+ && ConnectContext.get().command ==
MysqlCommand.COM_STMT_EXECUTE) {
+ InternalService.UUID.Builder uuidBuilder =
InternalService.UUID.newBuilder();
+
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
+
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
+ requestBuilder.setUuid(uuidBuilder);
+ }
+ requestBuilder.addKeyTuples(kBuilder);
+ requestBuilder.setTabletId(prunedTabletIdsOfBe);
+ requestBuilder.setReplicaId(replicaID);
+ pBatchRequestBuilder.addSubKeyLookupReq(requestBuilder);
+ }
+
+ private List<byte[]> batchGetNext(
+ Status status, Backend backend,
+ InternalService.PTabletBatchKeyLookupRequest.Builder
pBatchRequestBuilder) throws TException {
+ TResultBatch resultBatch = new TResultBatch();
+ List<byte[]> result = Lists.newArrayList();
+ resultBatch.setRows(Lists.newArrayList());
+
+
Preconditions.checkState(pBatchRequestBuilder.getSubKeyLookupReqCount() > 0);
- InternalService.PTabletKeyLookupRequest request =
requestBuilder.build();
- Future<InternalService.PTabletKeyLookupResponse> futureResponse =
-
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(),
request);
+ // batch fetch data
+ InternalService.PTabletBatchKeyLookupRequest pBatchRequest =
pBatchRequestBuilder.build();
+ long timeoutTs = System.currentTimeMillis() + timeoutMs;
+ InternalService.PTabletBatchKeyLookupResponse pBatchResult = null;
+ try {
+ Future<InternalService.PTabletBatchKeyLookupResponse>
futureBatchResponse =
+ BackendServiceProxy.getInstance()
+ .batchFetchTabletDataAsync(backend.getBrpcAddress(),
pBatchRequest);
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
- LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
+ LOG.warn("batch fetch result timeout {}",
backend.getBrpcAddress());
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request
timeout");
return null;
}
try {
- pResult = futureResponse.get(timeoutTs - currentTs,
TimeUnit.MILLISECONDS);
+ // todo: get the result asynchrously
+ pBatchResult = futureBatchResponse.get(timeoutTs - currentTs,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue to get result
LOG.warn("future get interrupted Exception");
@@ -297,18 +688,18 @@ public class PointQueryExecutor implements CoordInterface
{
return null;
}
} catch (TimeoutException e) {
- futureResponse.cancel(true);
+ futureBatchResponse.cancel(true);
LOG.warn("fetch result timeout {}, addr {}", timeoutTs -
currentTs, backend.getBrpcAddress());
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch
result timeout");
return null;
}
} catch (RpcException e) {
- LOG.warn("query fetch rpc exception {}, e {}",
backend.getBrpcAddress(), e);
+ LOG.warn("query batch fetch rpc exception {}, e {}",
backend.getBrpcAddress(), e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
return null;
} catch (ExecutionException e) {
- LOG.warn("query fetch execution exception {}, addr {}", e,
backend.getBrpcAddress());
+ LOG.warn("query batch fetch execution exception {}, addr {}", e,
backend.getBrpcAddress());
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not
retry querying.
status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
@@ -318,43 +709,37 @@ public class PointQueryExecutor implements CoordInterface
{
}
return null;
}
- Status resultStatus = new Status(pResult.getStatus());
- if (resultStatus.getErrorCode() != TStatusCode.OK) {
- status.updateStatus(resultStatus.getErrorCode(),
resultStatus.getErrorMsg());
- return null;
- }
- if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
- LOG.debug("get empty rowbatch");
- rowBatch.setEos(true);
- status.updateStatus(TStatusCode.OK, "");
- return rowBatch;
- } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
- byte[] serialResult = pResult.getRowBatch().toByteArray();
- TResultBatch resultBatch = new TResultBatch();
- TDeserializer deserializer = new TDeserializer(
- new
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
- try {
- deserializer.deserialize(resultBatch, serialResult);
- } catch (TException e) {
- if (e.getMessage().contains("MaxMessageSize reached")) {
- throw new TException("MaxMessageSize reached, try increase
max_msg_size_of_result_receiver");
- } else {
- throw e;
- }
+ // handle the response
+ boolean isOK = true;
+ for (InternalService.PTabletKeyLookupResponse subResponse :
pBatchResult.getSubKeyLookupResList()) {
+ Status resultStatus = new Status(subResponse.getStatus());
+ if (resultStatus.getErrorCode() != TStatusCode.OK) {
+ status.updateStatus(resultStatus.getErrorCode(),
resultStatus.getErrorMsg());
+ return null;
}
- rowBatch.setBatch(resultBatch);
- rowBatch.setEos(true);
+ if (subResponse.hasEmptyBatch() && subResponse.getEmptyBatch()) {
+ LOG.debug("get empty rowbatch");
+ continue;
+ } else if (subResponse.hasRowBatch() &&
subResponse.getRowBatch().size() > 0) {
+ byte[] serialResult = subResponse.getRowBatch().toByteArray();
+ result.add(serialResult);
+ continue;
+ } else {
+ Preconditions.checkState(false, "No row batch or empty batch
found");
+ }
+
+ if (isCancel) {
+ status.updateStatus(TStatusCode.CANCELLED, "cancelled");
+ isOK = false;
+ break;
+ }
+ }
+ if (isOK) {
status.updateStatus(TStatusCode.OK, "");
- return rowBatch;
- } else {
- Preconditions.checkState(false, "No row batch or empty batch
found");
}
- if (isCancel) {
- status.updateStatus(TStatusCode.CANCELLED, "cancelled");
- }
- return rowBatch;
+ return result;
}
public void cancel() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 54c5e68144c..9cde1897654 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -98,6 +98,11 @@ public class BackendServiceClient {
return stub.tabletFetchData(request);
}
+ public Future<InternalService.PTabletBatchKeyLookupResponse>
batchFetchTabletDataAsync(
+ InternalService.PTabletBatchKeyLookupRequest batchRequest) {
+ return stub.tabletBatchFetchData(batchRequest);
+ }
+
public InternalService.PFetchDataResult
fetchDataSync(InternalService.PFetchDataRequest request) {
return blockingStub.fetchData(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 053a7428b52..5fe13a82d0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -307,6 +307,18 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PTabletBatchKeyLookupResponse>
batchFetchTabletDataAsync(
+ TNetworkAddress address,
InternalService.PTabletBatchKeyLookupRequest batchRequest) throws RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.batchFetchTabletDataAsync(batchRequest);
+ } catch (Throwable e) {
+ LOG.warn("batch fetch tablet data catch a exception,
address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public InternalService.PFetchDataResult fetchDataSync(
TNetworkAddress address, InternalService.PFetchDataRequest
request) throws RpcException {
try {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 547b2588168..867c0679dac 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -339,6 +339,11 @@ message PTabletKeyLookupRequest {
optional int64 version = 7;
// serilized from TQueryOptions
optional bytes query_options = 8;
+ optional int64 replica_id = 9;
+}
+
+message PTabletBatchKeyLookupRequest {
+ repeated PTabletKeyLookupRequest sub_key_lookup_req = 1;
}
message PTabletKeyLookupResponse {
@@ -347,6 +352,11 @@ message PTabletKeyLookupResponse {
optional bool empty_batch = 3;
}
+message PTabletBatchKeyLookupResponse {
+ optional PStatus status = 1;
+ repeated PTabletKeyLookupResponse sub_key_lookup_res = 2;
+}
+
//Add message definition to fetch and update cache
enum PCacheStatus {
DEFAULT = 0;
@@ -1030,6 +1040,7 @@ service PBackendService {
rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns
(PGetFileCacheMetaResponse);
rpc tablet_fetch_data(PTabletKeyLookupRequest) returns
(PTabletKeyLookupResponse);
+ rpc tablet_batch_fetch_data(PTabletBatchKeyLookupRequest) returns
(PTabletBatchKeyLookupResponse);
rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns
(PFetchColIdsResponse);
rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns
(PGetTabletVersionsResponse);
rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns
(PReportStreamLoadStatusResponse);
diff --git a/regression-test/data/point_query_p0/test_point_IN_query.out
b/regression-test/data/point_query_p0/test_point_IN_query.out
new file mode 100644
index 00000000000..6f1f4ae7b33
--- /dev/null
+++ b/regression-test/data/point_query_p0/test_point_IN_query.out
@@ -0,0 +1,27 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1_EQ_sql --
+123 100 110 zxcd
+
+-- !1_EQ_sql --
+6333 2642 480 zxc
+
+-- !1_EQ_sql --
+1231 1220 210 zxc
+
+-- !2_EQ_sql --
+222 100 115 zxc
+
+-- !2_EQ_sql --
+1231 1220 210 zxc
+
+-- !2_EQ_sql --
+1231 1220 210 zxc
+
+-- !3_EQ_sql --
+323 49 240 zxc
+
+-- !0_EQ_sql --
+12 12 120 zxc
+123 100 110 zxcd
+222 100 115 zxc
+
diff --git a/regression-test/data/point_query_p0/test_point_query.out
b/regression-test/data/point_query_p0/test_point_query.out
index 1cc4142e39f..f22bf31942e 100644
--- a/regression-test/data/point_query_p0/test_point_query.out
+++ b/regression-test/data/point_query_p0/test_point_query.out
@@ -160,3 +160,182 @@
-- !sql --
-10 20 aabc update val
+-- !case_1_sql --
+123 132 a
+
+-- !case_1_sql --
+123 132 a
+123 222 b
+
+-- !case_1_sql --
+1 1 d
+123 132 a
+123 222 b
+
+-- !case_2_sql --
+
+-- !case_2_sql --
+222 150 c
+
+-- !case_2_sql --
+123 120 a
+
+-- !case_2_sql --
+400 250 g
+400 260 f
+
+-- !case_2_sql --
+222 150 c
+400 250 g
+400 260 f
+
+-- !case_3_sql --
+123 100 a
+
+-- !case_3_sql --
+
+-- !case_3_sql --
+100 100 aaa
+100 120 aaaaa
+
+-- !case_3_sql --
+123 100 a
+
+-- !case_3_sql --
+400 250 d
+400 280 e
+
+-- !case_3_sql --
+123 100 a
+350 200 c
+400 250 d
+
+-- !case_4_sql --
+123 100 110 a
+
+-- !case_4_sql --
+
+-- !case_4_sql --
+1231 1220 210 d
+
+-- !case_4_sql --
+123 100 110 a
+
+-- !case_4_sql --
+123 100 110 a
+1231 1220 210 d
+222 100 115 b
+
+-- !case_5_sql --
+123 100 110 a
+
+-- !case_5_sql --
+
+-- !case_5_sql --
+1231 1220 210 d
+
+-- !case_5_sql --
+123 100 110 a
+
+-- !case_5_sql --
+123 100 110 a
+1231 1220 210 d
+222 100 115 b
+633 2642 480 g
+6333 2642 480 h
+
+-- !case_6_sql --
+123 132 a
+
+-- !case_6_sql --
+123 132 a
+123 222 b
+
+-- !case_6_sql --
+1 1 d
+123 132 a
+123 222 b
+2 2 e
+3 3 f
+4 4 i
+
+-- !case_7_sql --
+123 100 110 a
+
+-- !case_7_sql --
+
+-- !case_7_sql --
+1231 1220 210 d
+
+-- !case_7_sql --
+123 100 110 a
+
+-- !case_7_sql --
+123 100 110 a
+1231 1220 210 d
+222 100 115 b
+633 2642 480 g
+6333 2642 480 h
+
+-- !case_8_sql --
+123 100 a
+
+-- !case_8_sql --
+
+-- !case_8_sql --
+123 100 a
+
+-- !case_8_sql --
+400 250 d
+400 280 e
+
+-- !case_8_sql --
+123 100 a
+350 200 c
+400 250 d
+
+-- !case_9_sql --
+123 100 110 zxcd
+
+-- !case_9_sql --
+222 100 115 zxc
+
+-- !case_9_sql --
+323 49 240 zxc
+
+-- !case_9_sql --
+123 100 110 zxcd
+
+-- !case_9_sql --
+1231 1220 210 zxc
+
+-- !case_9_sql --
+6333 2642 480 zxc
+
+-- !case_9_sql --
+633 2642 480 zxc
+
+-- !case_9_sql --
+123 100 110 zxcd
+
+-- !case_9_sql --
+1231 1220 210 zxc
+222 100 115 zxc
+
+-- !case_9_sql --
+12 12 120 zxc
+123 100 110 zxcd
+222 100 115 zxc
+
+-- !case_9_sql --
+1231 1220 210 zxc
+
+-- !case_9_sql --
+123 100 110 zxcd
+222 100 115 zxc
+
+-- !case_9_sql --
+123 100 110 zxcd
+1231 1220 210 zxc
+222 100 115 zxc
+
diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out
b/regression-test/data/point_query_p0/test_point_query_partition.out
index bef064984c8..5c20cde01d9 100644
--- a/regression-test/data/point_query_p0/test_point_query_partition.out
+++ b/regression-test/data/point_query_p0/test_point_query_partition.out
@@ -31,6 +31,15 @@
-- !point_select --
+-- !point_in_select --
+-1 c
+1 a
+11 d
+2 b
+33 f
+45 g
+999 h
+
-- !point_selectxxx --
686612 686612 686612 \N \N \N \N \N \N \N
\N
@@ -46,3 +55,19 @@
-- !point_selecteee --
686613 686613 686613 \N \N \N \N \N \N \N
\N
+-- !point_in_selectxxx --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
+-- !point_in_selectyyy --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
+-- !point_in_selectmmm --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
+-- !point_in_selecteee --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
diff --git a/regression-test/data/point_query_p0/test_rowstore.out
b/regression-test/data/point_query_p0/test_rowstore.out
index 34e40867d6a..f0558d616c0 100644
--- a/regression-test/data/point_query_p0/test_rowstore.out
+++ b/regression-test/data/point_query_p0/test_rowstore.out
@@ -28,6 +28,11 @@
-- !point_select --
33333333333333333333333333333333 3
+-- !point_in_select --
+11111111111111111111111111111111111111 3
+222222222222222222222222222222222 3
+33333333333333333333333333333333 3
+
-- !point_select --
3
@@ -37,6 +42,11 @@
-- !point_select --
3
+-- !point_in_select --
+3
+3
+3
+
-- !point_select --
33333333333333333333333333333333
@@ -55,6 +65,11 @@
-- !point_select --
3
+-- !point_in_select --
+3
+3
+3
+
-- !point_select --
2021-02-01T11:11:11
@@ -64,6 +79,11 @@
-- !point_select --
2023-02-01T11:11:11
+-- !point_in_select --
+2021-02-01T11:11:11
+2022-02-01T11:11:11
+2023-02-01T11:11:11
+
-- !point_select --
2017-10-01T11:11:11.021 2017-10-01T11:11:11.170
2017-10-01T11:11:11.110111 30
@@ -88,6 +108,16 @@
-- !point_select --
2017-10-01T11:11:11.028 \N \N 34
+-- !point_in_select --
+2017-10-01T11:11:11.021 2017-10-01T11:11:11.170
2017-10-01T11:11:11.110111 30
+2017-10-01T11:11:11.022 2017-10-01T11:11:11.160
2017-10-01T11:11:11.100111 31
+2017-10-01T11:11:11.023 2017-10-01T11:11:11.150
2017-10-01T11:11:11.130111 31
+2017-10-01T11:11:11.024 2017-10-01T11:11:11.140
2017-10-01T11:11:11.120111 32
+2017-10-01T11:11:11.025 2017-10-01T11:11:11.100
2017-10-01T11:11:11.140111 32
+2017-10-01T11:11:11.026 2017-10-01T11:11:11.110
2017-10-01T11:11:11.150111 33
+2017-10-01T11:11:11.027 \N \N 34
+2017-10-01T11:11:11.028 \N \N 34
+
-- !sql --
1 abc 1111919.123456789190000000
diff --git a/regression-test/data/point_query_p0/test_rowstore_query.out
b/regression-test/data/point_query_p0/test_rowstore_query.out
index b43e0263960..e4656d631a7 100644
--- a/regression-test/data/point_query_p0/test_rowstore_query.out
+++ b/regression-test/data/point_query_p0/test_rowstore_query.out
@@ -2,6 +2,10 @@
-- !sql --
1 abc 1111919.123456789190000000
--- !sql --
+-- !point_sql --
+2 def 1111919.123456789190000000
+
+-- !point_in_sql --
+1 abc 1111919.123456789190000000
2 def 1111919.123456789190000000
diff --git a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
index fa90a56523c..0d640ff2b06 100644
--- a/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
+++ b/regression-test/data/prepared_stmt_p0/prepared_stmt_in_list.out
@@ -127,3 +127,9 @@
5 1304 36548425 15229335116 991129292901.111380000 dd
2120-01-02 2024-01-01T12:36:38 652.692 5022-01-01T11:30:38
5022-01-01
6 1305 56054803 18031831909 100320.111390000 haha
abcd 2220-01-02 2025-01-01T12:36:38 2.7692 6022-01-01T11:30:38
6022-01-01
+-- !stmt_read11_1 --
+1 1300 55356821
+2 1301 56052706
+3 1302 55702967
+4 1303 56054326
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 4f9409308d5..85feb96380c 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1355,9 +1355,9 @@ class Suite implements GroovyInterceptable {
quickRunTest(tag, sql, isOrder)
}
- void quickExecute(String tag, PreparedStatement stmt) {
+ void quickExecute(String tag, PreparedStatement stmt, boolean isOrder =
false) {
logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
- quickRunTest(tag, stmt)
+ quickRunTest(tag, stmt, isOrder)
}
@Override
@@ -1369,6 +1369,8 @@ class Suite implements GroovyInterceptable {
return quickTest(name.substring("order_qt_".length()), (args as
Object[])[0] as String, true)
} else if (name.startsWith("qe_")) {
return quickExecute(name.substring("qe_".length()), (args as
Object[])[0] as PreparedStatement)
+ } else if (name.startsWith("order_qe_")) {
+ return quickExecute(name.substring("order_qe_".length()), (args as
Object[])[0] as PreparedStatement, true)
} else if (name.startsWith("assert") && name.length() >
"assert".length()) {
// delegate to junit Assertions dynamically
return Assertions."$name"(*args) // *args: spread-dot
diff --git
a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
index 4be53ff17f6..4ac5b7f5ec9 100644
---
a/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
+++
b/regression-test/suites/cloud_p0/row_store_page_size/test_row_store_page_size.groovy
@@ -46,7 +46,7 @@ suite ("test_row_store_page_size_cloud") {
explain {
sql("select * from ps_table_1 where k1=1 and k2=1;")
- contains("SHORT")
+ contains("SHORT-CIRCUIT")
}
qt_select_star "select * from ps_table_1 where k1=1 and k2=1;"
@@ -81,7 +81,7 @@ suite ("test_row_store_page_size_cloud") {
explain {
sql("select * from ps_table_2 where k1=1 and k2=1;")
- contains("SHORT")
+ contains("SHORT-CIRCUIT")
}
qt_select_star "select * from ps_table_2 where k1=1 and k2=1;"
diff --git
a/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
new file mode 100644
index 00000000000..29cc084c3bd
--- /dev/null
+++
b/regression-test/suites/point_query_p0/test_dynamic_partition_point_query.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
+
+suite("test_dynamic_partition_point_query") {
+ sql "drop table if exists dy_par_pq"
+ sql """
+ CREATE TABLE IF NOT EXISTS dy_par_pq ( k1 date NOT NULL, k2
varchar(20) NOT NULL, k3 int NOT NULL )
+ UNIQUE KEY(k1)
+ PARTITION BY RANGE(k1) ( )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "dynamic_partition.enable"="true",
+ "dynamic_partition.end"="3",
+ "dynamic_partition.buckets"="10",
+ "dynamic_partition.start"="-3",
+ "dynamic_partition.prefix"="p",
+ "dynamic_partition.time_unit"="DAY",
+ "dynamic_partition.create_history_partition"="true",
+ "dynamic_partition.replication_allocation" =
"tag.location.default: 1",
+ "replication_allocation" = "tag.location.default: 1",
+ "store_row_column" = "true")
+ """
+ def result = sql "show tables like 'dy_par_pq'"
+ logger.info("${result}")
+ assertEquals(result.size(), 1)
+ result = sql_return_maparray "show partitions from dy_par_pq"
+ assertEquals(result.get(0).Buckets.toInteger(), 10)
+ def currentDate = LocalDate.now()
+ def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
+ def currenteDay = currentDate.format(formatter)
+ sql "insert into dy_par_pq values ('${currenteDay}', 'a', 1);"
+ def previous1Day = currentDate.minusDays(1).format(formatter)
+ sql "insert into dy_par_pq values ('${previous1Day}', 'b', 2);"
+ def previous2Day = currentDate.minusDays(2).format(formatter)
+ sql "insert into dy_par_pq values ('${previous2Day}', 'c', 3);"
+ def previous3Day = currentDate.minusDays(3).format(formatter)
+ sql "insert into dy_par_pq values ('${previous3Day}', 'd', 4);"
+ def next1Day = currentDate.plusDays(1).format(formatter)
+ sql "insert into dy_par_pq values ('${next1Day}', 'e', 5);"
+ def next2Day = currentDate.plusDays(2).format(formatter)
+ sql "insert into dy_par_pq values ('${next2Day}', 'f', 6);"
+ def next3Day = currentDate.plusDays(3).format(formatter)
+ sql "insert into dy_par_pq values ('${next3Day}', 'g', 7);"
+
+ result = sql """
+ select
+ *
+ from
+ dy_par_pq
+ where
+ k1 in (
+ '${currenteDay}', '${previous1Day}', '${previous2Day}',
'${previous3Day}',
+ '${next1Day}', '${next2Day}', '${next3Day}') ;
+ """
+ assertEquals(result.size(), 7)
+ explain {
+ sql """
+ select
+ *
+ from
+ dy_par_pq
+ where
+ k1 in (
+ '${currenteDay}', '${previous1Day}', '${previous2Day}',
'${previous3Day}',
+ '${next1Day}', '${next2Day}', '${next3Day}') ;
+ """
+ contains "SHORT-CIRCUIT"
+ }
+
+ def previous4Day = currentDate.minusDays(4).format(formatter)
+ def next4Day = currentDate.plusDays(4).format(formatter)
+ result = sql """
+ select
+ *
+ from
+ dy_par_pq
+ where
+ k1 in (
+ '${currenteDay}', '${previous4Day}', '${next4Day}') ;
+ """
+ assertEquals(result.size(), 1)
+
+ result = sql """
+ select
+ *
+ from
+ dy_par_pq
+ where
+ k1 = '${currenteDay}' ;
+ """
+ assertEquals(result.size(), 1)
+
+ result = sql """
+ select
+ *
+ from
+ dy_par_pq
+ where
+ k1 = '${next4Day}' ;
+ """
+ assertEquals(result.size(), 0)
+
+ sql "drop table dy_par_pq"
+
+}
diff --git a/regression-test/suites/point_query_p0/test_point_IN_query.groovy
b/regression-test/suites/point_query_p0/test_point_IN_query.groovy
new file mode 100644
index 00000000000..29e33787470
--- /dev/null
+++ b/regression-test/suites/point_query_p0/test_point_IN_query.groovy
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.math.BigDecimal;
+
+suite("test_point_IN_query", "p0") {
+ def tableName = "rs_in_table"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c int not null,
+ d string not null
+ )
+ unique key(a, b, c)
+ partition by RANGE(a, b, c)
+ (
+ partition p values [(1, 1, 1), (100, 100, 100)),
+ partition p0 values [(100, 100, 100), (200, 210, 220)),
+ partition p1 values [(200, 210, 220), (300, 250, 290)),
+ partition p2 values [(300, 250, 290), (350, 290, 310)),
+ partition p3 values [(350, 290, 310), (400, 350, 390)),
+ partition p4 values [(400, 350, 390), (800, 400, 450)),
+ partition p5 values [(800, 400, 450), (2000, 500, 500)),
+ partition p6 values [(2000, 500, 500), (5000, 600, 600)),
+ partition p7 values [(5000, 600, 600), (9999, 9999, 9999))
+ )
+ distributed by hash(a, c)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 100, 110, "zxcd");
+ insert into ${tableName} values(222, 100, 115, "zxc");
+ insert into ${tableName} values(12, 12, 120, "zxc");
+ insert into ${tableName} values(1231, 1220, 210, "zxc");
+ insert into ${tableName} values(323, 49, 240, "zxc");
+ insert into ${tableName} values(843, 7342, 370, "zxcde");
+ insert into ${tableName} values(633, 2642, 480, "zxc");
+ insert into ${tableName} values(6333, 2642, 480, "zxc");
+ """
+
+ order_qt_1_EQ_sql "select * from ${tableName} where a = 123 and b in (100,
12, 1220, 7342, 999, 2642) and c in (110, 115, 120, 480);"
+ order_qt_1_EQ_sql "select * from ${tableName} where a in (12, 222, 1231,
6333) and b = 2642 and c in (210, 110, 115, 210, 480);"
+ order_qt_1_EQ_sql "select * from ${tableName} where a in (123, 1, 222,
1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c = 210;"
+ explain {
+ sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and b
= 2642 and c in (210, 110, 115, 210, 480);"
+ contains "SHORT-CIRCUIT"
+ }
+
+ order_qt_2_EQ_sql "select * from ${tableName} where a = 222 and b = 100
and c in (110, 115, 120, 480);"
+ order_qt_2_EQ_sql "select * from ${tableName} where a = 1231 and b in
(100, 12, 1220, 7342, 999, 2642) and c = 210;"
+ order_qt_2_EQ_sql "select * from ${tableName} where a in (12, 222, 1231,
6333) and b = 1220 and c = 210;"
+ explain {
+ sql "select * from ${tableName} where a in (12, 222, 1231, 6333) and
b = 1220 and c = 210;"
+ contains "SHORT-CIRCUIT"
+ }
+
+ order_qt_3_EQ_sql "select * from ${tableName} where a = 323 and b = 49 and
c = 240;"
+ order_qt_0_EQ_sql "select * from ${tableName} where a in (123, 222, 12)
and b in (100, 12) and c in (110, 115, 120, 210);"
+ explain {
+ sql "select * from ${tableName} where a in (123, 222, 12) and b in
(100, 12) and c in (110, 115, 120, 210);"
+ contains "SHORT-CIRCUIT"
+ }
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+}
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy
b/regression-test/suites/point_query_p0/test_point_query.groovy
index 99998a24ed6..ab71187e7e3 100644
--- a/regression-test/suites/point_query_p0/test_point_query.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query.groovy
@@ -341,4 +341,404 @@ suite("test_point_query", "nonConcurrent") {
qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3
= 'aabc';"
sql "update table_3821461 set value = 'update value' where col1 = -10 or
col1 = 20;"
qt_sql """select * from table_3821461 where col1 = -10 and col2 = 20 and
loc3 = 'aabc'"""
+
+ tableName = "in_table_1"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 1: Default partitioning, part of the primary key is a bucket column
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c string not null
+ )
+ unique key(a, b)
+ distributed by hash(a) buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 132, "a");
+ insert into ${tableName} values(123, 222, "b");
+ insert into ${tableName} values(22, 2, "c");
+ insert into ${tableName} values(1, 1, "d");
+ insert into ${tableName} values(2, 2, "e");
+ insert into ${tableName} values(3, 3, "f");
+ insert into ${tableName} values(4, 4, "i");
+ """
+ qt_case_1_sql "select * from ${tableName} where a = 123 and b = 132;"
+ explain {
+ sql("select * from ${tableName} where a = 123 and b in (132, 1, 222,
333);")
+ contains "SHORT-CIRCUIT"
+ }
+ order_qt_case_1_sql "select * from ${tableName} where a = 123 and b in
(132, 1, 222, 333);"
+ explain {
+ sql("select * from ${tableName} where a in (123, 1, 222) and b in
(132, 1, 222, 333);")
+ contains "SHORT-CIRCUIT"
+ }
+ order_qt_case_1_sql "select * from ${tableName} where a in (123, 1, 222)
and b in (132, 1, 222, 333);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ // Case 2: Partition columns, bucket columns, and primary keys are the same
+ tableName = "in_table_2"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c string not null
+ )
+ unique key(a, b)
+ partition by RANGE(a, b)
+ (
+ partition p0 values [(100, 100), (200, 140)),
+ partition p1 values [(200, 140), (300, 170)),
+ partition p2 values [(300, 170), (400, 250)),
+ partition p3 values [(400, 250), (420, 300)),
+ partition p4 values [(420, 300), (500, 400))
+ )
+ distributed by hash(a, b) buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 120, "a");
+ insert into ${tableName} values(150, 120, "b");
+ insert into ${tableName} values(222, 150, "c");
+ insert into ${tableName} values(333, 200, "e");
+ insert into ${tableName} values(400, 260, "f");
+ insert into ${tableName} values(400, 250, "g");
+ insert into ${tableName} values(440, 350, "h");
+ insert into ${tableName} values(450, 320, "i");
+ """
+
+ qt_case_2_sql "select * from ${tableName} where a = 123 and b = 100;"
+ qt_case_2_sql "select * from ${tableName} where a = 222 and b = 150;"
+ order_qt_case_2_sql "select * from ${tableName} where a = 123 and b in
(132, 120, 222, 333);"
+ order_qt_case_2_sql "select * from ${tableName} where a = 400 and b in
(260, 250, 300);"
+ order_qt_case_2_sql "select * from ${tableName} where a in (400, 222, 100)
and b in (260, 250, 100, 150);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_3"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 3: The partition column is the same as the primary key, and the
bucket column is part of the primary key.
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c string not null
+ )
+ unique key(a, b)
+ partition by RANGE(a, b)
+ (
+ partition p0 values [(100, 100), (100, 140)),
+ partition p1 values [(100, 140), (200, 140)),
+ partition p2 values [(200, 140), (300, 170)),
+ partition p3 values [(300, 170), (400, 250)),
+ partition p4 values [(400, 250), (420, 300)),
+ partition p5 values [(420, 300), (500, 400))
+ )
+ distributed by hash(a)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(100, 100, "aaa");
+ insert into ${tableName} values(100, 120, "aaaaa");
+ insert into ${tableName} values(123, 100, "a");
+ insert into ${tableName} values(150, 100, "b");
+ insert into ${tableName} values(350, 200, "c");
+ insert into ${tableName} values(400, 250, "d");
+ insert into ${tableName} values(400, 280, "e");
+ insert into ${tableName} values(450, 350, "f");
+ """
+ qt_case_3_sql "select * from ${tableName} where a = 123 and b = 100;"
+ qt_case_3_sql "select * from ${tableName} where a = 222 and b = 100;"
+ order_qt_case_3_sql "select * from ${tableName} where a = 100 and b in
(132, 100, 222, 120);"
+ order_qt_case_3_sql "select * from ${tableName} where a = 123 and b in
(132, 100, 222, 333);"
+ order_qt_case_3_sql "select * from ${tableName} where a = 400 and b in
(250, 280, 300);"
+ order_qt_case_3_sql "select * from ${tableName} where a in (123, 1, 350,
400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_4"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 4: Bucket columns and partition columns are both partial primary
keys,
+ // and there is no overlap between bucket columns and partition columns
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c int not null,
+ d string not null
+ )
+ unique key(a, b, c)
+ partition by RANGE(c)
+ (
+ partition p0 values [(100), (200)),
+ partition p1 values [(200), (300)),
+ partition p2 values [(300), (400)),
+ partition p3 values [(400), (500))
+ )
+ distributed by hash(a, b)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+
+ sql """
+ insert into ${tableName} values(123, 100, 110, "a");
+ insert into ${tableName} values(222, 100, 115, "b");
+ insert into ${tableName} values(12, 12, 120, "c");
+ insert into ${tableName} values(1231, 1220, 210, "d");
+ insert into ${tableName} values(323, 49, 240, "e");
+ insert into ${tableName} values(843, 7342, 370, "f");
+ insert into ${tableName} values(633, 2642, 480, "g");
+ insert into ${tableName} values(6333, 2642, 480, "h");
+ """
+
+ qt_case_4_sql "select * from ${tableName} where a = 123 and b = 100 and c
= 110;"
+ qt_case_4_sql "select * from ${tableName} where a = 123 and b = 101 and c
= 110;"
+ qt_case_4_sql "select * from ${tableName} where a = 1231 and b = 1220 and
c = 210;"
+ order_qt_case_4_sql "select * from ${tableName} where a = 123 and b in
(132, 100, 222, 333) and c in (110, 115, 120);"
+ order_qt_case_4_sql "select * from ${tableName} where a in (123, 1, 222,
1231, 420, 500) and b in (132, 100, 222, 1220, 300) and c in (210, 110, 115,
210);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_5"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 5: Bucket columns and partition columns are both partial primary
keys,
+ // and bucket columns and partition columns overlap
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c int not null,
+ d string not null
+ )
+ unique key(a, b, c)
+ partition by RANGE(a)
+ (
+ partition p0 values [(0), (100)),
+ partition p1 values [(100), (200)),
+ partition p2 values [(200), (300)),
+ partition p3 values [(300), (400)),
+ partition p4 values [(400), (500)),
+ partition p5 values [(500), (900)),
+ partition p6 values [(900), (1200)),
+ partition p7 values [(1200), (9000))
+ )
+ distributed by hash(a, b)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+
+ sql """
+ insert into ${tableName} values(123, 100, 110, "a");
+ insert into ${tableName} values(222, 100, 115, "b");
+ insert into ${tableName} values(12, 12, 120, "c");
+ insert into ${tableName} values(1231, 1220, 210, "d");
+ insert into ${tableName} values(323, 49, 240, "e");
+ insert into ${tableName} values(843, 7342, 370, "f");
+ insert into ${tableName} values(633, 2642, 480, "g");
+ insert into ${tableName} values(6333, 2642, 480, "h");
+ """
+
+ qt_case_5_sql "select * from ${tableName} where a = 123 and b = 100 and c
= 110;"
+ qt_case_5_sql "select * from ${tableName} where a = 123 and b = 101 and c
= 110;"
+ qt_case_5_sql "select * from ${tableName} where a = 1231 and b = 1220 and
c = 210;"
+ order_qt_case_5_sql "select * from ${tableName} where a = 123 and b in
(132, 100, 222, 333) and c in (110, 115, 120);"
+ order_qt_case_5_sql "select * from ${tableName} where a in (123, 12, 222,
1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c
in (210, 110, 115, 210, 480);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_6"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 6: Default partitioning, primary keys are all bucket columns
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c string not null
+ )
+ unique key(a, b)
+ distributed by hash(a, b) buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 132, "a");
+ insert into ${tableName} values(123, 222, "b");
+ insert into ${tableName} values(22, 2, "c");
+ insert into ${tableName} values(1, 1, "d");
+ insert into ${tableName} values(2, 2, "e");
+ insert into ${tableName} values(3, 3, "f");
+ insert into ${tableName} values(4, 4, "i");
+ """
+ qt_case_6_sql "select * from ${tableName} where a = 123 and b = 132;"
+ order_qt_case_6_sql "select * from ${tableName} where a = 123 and b in
(132, 1, 222, 333);"
+ order_qt_case_6_sql "select * from ${tableName} where a in (123, 1, 222,
2, 3, 4) and b in (132, 1, 222, 333, 2, 3, 4);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_7"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 7: Partition and bucket columns are the same, but only part of the
primary key
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c int not null,
+ d string not null
+ )
+ unique key(a, b, c)
+ partition by RANGE(a)
+ (
+ partition p0 values [(0), (100)),
+ partition p1 values [(100), (200)),
+ partition p2 values [(200), (300)),
+ partition p3 values [(300), (400)),
+ partition p4 values [(400), (500)),
+ partition p5 values [(500), (900)),
+ partition p6 values [(900), (1200)),
+ partition p7 values [(1200), (9000))
+ )
+ distributed by hash(a)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+
+ sql """
+ insert into ${tableName} values(123, 100, 110, "a");
+ insert into ${tableName} values(222, 100, 115, "b");
+ insert into ${tableName} values(12, 12, 120, "c");
+ insert into ${tableName} values(1231, 1220, 210, "d");
+ insert into ${tableName} values(323, 49, 240, "e");
+ insert into ${tableName} values(843, 7342, 370, "f");
+ insert into ${tableName} values(633, 2642, 480, "g");
+ insert into ${tableName} values(6333, 2642, 480, "h");
+ """
+
+ qt_case_7_sql "select * from ${tableName} where a = 123 and b = 100 and c
= 110;"
+ qt_case_7_sql "select * from ${tableName} where a = 123 and b = 101 and c
= 110;"
+ qt_case_7_sql "select * from ${tableName} where a = 1231 and b = 1220 and
c = 210;"
+ order_qt_case_7_sql "select * from ${tableName} where a = 123 and b in
(132, 100, 222, 333) and c in (110, 115, 120);"
+ order_qt_case_7_sql "select * from ${tableName} where a in (123, 12, 222,
1231, 420, 500, 6333, 633, 843) and b in (132, 100, 222, 1220, 300, 2642) and c
in (210, 110, 115, 210, 480);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_8"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 8: The bucket column is the same as the primary key, and the
partition column is part of the primary key.
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c string not null
+ )
+ unique key(a, b)
+ partition by RANGE(a)
+ (
+ partition p0 values [(100), (200)),
+ partition p1 values [(200), (300)),
+ partition p2 values [(300), (400)),
+ partition p3 values [(400), (420)),
+ partition p4 values [(420), (500))
+ )
+ distributed by hash(a, b)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 100, "a");
+ insert into ${tableName} values(150, 100, "b");
+ insert into ${tableName} values(350, 200, "c");
+ insert into ${tableName} values(400, 250, "d");
+ insert into ${tableName} values(400, 280, "e");
+ insert into ${tableName} values(450, 350, "f");
+ """
+ qt_case_8_sql "select * from ${tableName} where a = 123 and b = 100;"
+ qt_case_8_sql "select * from ${tableName} where a = 222 and b = 100;"
+ order_qt_case_8_sql "select * from ${tableName} where a = 123 and b in
(132, 100, 222, 333);"
+ order_qt_case_8_sql "select * from ${tableName} where a = 400 and b in
(250, 280, 300);"
+ order_qt_case_8_sql "select * from ${tableName} where a in (123, 1, 350,
400, 420, 500, 1000) and b in (132, 100, 222, 200, 350, 250);"
+
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+
+ tableName = "in_table_9"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
+ // Case 9: Partition leftmost match
+ sql """
+ create table ${tableName} (
+ a int not null,
+ b int not null,
+ c int not null,
+ d string not null
+ )
+ unique key(a, b, c)
+ partition by RANGE(a, b, c)
+ (
+ partition p values [(1, 1, 1), (100, 100, 100)),
+ partition p0 values [(100, 100, 100), (200, 210, 220)),
+ partition p1 values [(200, 210, 220), (300, 250, 290)),
+ partition p2 values [(300, 250, 290), (350, 290, 310)),
+ partition p3 values [(350, 290, 310), (400, 350, 390)),
+ partition p4 values [(400, 350, 390), (800, 400, 450)),
+ partition p5 values [(800, 400, 450), (2000, 500, 500)),
+ partition p6 values [(2000, 500, 500), (5000, 600, 600)),
+ partition p7 values [(5000, 600, 600), (9999, 9999, 9999))
+ )
+ distributed by hash(a, c)
+ buckets 16
+ PROPERTIES(
+ "replication_num" = "1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """
+ insert into ${tableName} values(123, 100, 110, "zxcd");
+ insert into ${tableName} values(222, 100, 115, "zxc");
+ insert into ${tableName} values(12, 12, 120, "zxc");
+ insert into ${tableName} values(1231, 1220, 210, "zxc");
+ insert into ${tableName} values(323, 49, 240, "zxc");
+ insert into ${tableName} values(843, 7342, 370, "zxcde");
+ insert into ${tableName} values(633, 2642, 480, "zxc");
+ insert into ${tableName} values(6333, 2642, 480, "zxc");
+ """
+
+ order_qt_case_9_sql "select * from ${tableName} where a=123 and b=100 and
c=110;"
+ order_qt_case_9_sql "select * from ${tableName} where a=222 and b=100 and
c=115;"
+ order_qt_case_9_sql "select * from ${tableName} where a=323 and b=49 and
c=240;"
+ order_qt_case_9_sql "select * from ${tableName} where b=100 and a=123 and
c=110;"
+ order_qt_case_9_sql "select * from ${tableName} where a=1231 and b=1220
and c=210;"
+ order_qt_case_9_sql "select * from ${tableName} where a=6333 and b=2642
and c=480;"
+ order_qt_case_9_sql "select * from ${tableName} where a=633 and b=2642 and
c=480;"
+ order_qt_case_9_sql "select * from ${tableName} where a=123 and b in
(132,100,222,333) and c in (110, 115, 120);"
+ order_qt_case_9_sql "select * from ${tableName} where a in (222,1231) and
b in (100,1220,2642) and c in (115,210,480);"
+ order_qt_case_9_sql "select * from ${tableName} where a in (123,222,12)
and b in (100,12) and c in (110,115,120,210);"
+ order_qt_case_9_sql "select * from ${tableName} where a=1231 and b in
(20490,1220,300) and c = 210;"
+ order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222,
400,420, 500) and b in (132,100,222, 200,300) and c in (110,115,210);"
+ order_qt_case_9_sql "select * from ${tableName} where a in (123,1,222,
1231,420, 500) and b in (132,100,222, 1220,300) and c in (210,110,115,210);"
+ sql """DROP TABLE IF EXISTS ${tableName}"""
}
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_point_query_ck.groovy
b/regression-test/suites/point_query_p0/test_point_query_ck.groovy
index f7c53c7207e..189c0f22d1a 100644
--- a/regression-test/suites/point_query_p0/test_point_query_ck.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query_ck.groovy
@@ -182,6 +182,32 @@ suite("test_point_query_ck") {
stmt.setString(3, "dd")
qe_point_select stmt
+ // IN query
+ def stmt_in = prepareStatement "SELECT /*+
SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 IN (?, ?,
?, ?) AND k2 IN (?, ?, ?) AND k3 IN (?, ?, ?, ?)"
+ assertEquals(stmt_in.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt_in.setInt(1, 1231)
+ stmt_in.setInt(2, 1237)
+ stmt_in.setInt(3, 251)
+ stmt_in.setInt(4, 252)
+ stmt_in.setBigDecimal(5, new BigDecimal("119291.11"))
+ stmt_in.setBigDecimal(6, new BigDecimal("120939.11130"))
+ stmt_in.setBigDecimal(7, new BigDecimal("12222.99121135"))
+ stmt_in.setString(8, "ddd")
+ stmt_in.setString(9, 'xxx')
+ stmt_in.setString(10, generateString(251))
+ stmt_in.setString(11, generateString(252))
+ order_qe_point_in_select stmt_in
+ stmt_in.close()
+
+ stmt_in = prepareStatement "SELECT /*+
SET_VAR(enable_nereids_planner=true) */ * FROM ${tableName} WHERE k1 = ? AND k2
IN (?, ?) AND k3 IN (?, ?)"
+ assertEquals(stmt_in.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt_in.setInt(1, 1235)
+ stmt_in.setBigDecimal(2, new BigDecimal("991129292901.11138"))
+ stmt_in.setBigDecimal(3, new BigDecimal("120939.11130"))
+ stmt_in.setString(4, "dd")
+ stmt_in.setString(5, "a ddd")
+ order_qe_point_in_select stmt_in
+
def stmt_fn = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) from ${tableName}
where k1 = ? and k2 =? and k3 = ?"
assertEquals(stmt_fn.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
stmt_fn.setInt(1, 1231)
@@ -191,6 +217,19 @@ suite("test_point_query_ck") {
qe_point_select stmt_fn
qe_point_select stmt_fn
+ // IN query
+ def stmt_fn_in = prepareStatement "SELECT /*+
SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) FROM ${tableName}
WHERE k1 IN (?, ?) AND k2 IN (?, ?) AND k3 IN (?, ?)"
+ assertEquals(stmt_fn_in.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt_fn_in.setInt(1, 1235)
+ stmt_fn_in.setInt(2, 1231)
+ stmt_fn_in.setBigDecimal(3, new BigDecimal("119291.11"))
+ stmt_fn_in.setBigDecimal(4, new
BigDecimal("991129292901.11138"))
+ stmt_fn_in.setString(5, "dd")
+ stmt_fn_in.setString(6, "ddd")
+ order_qe_point_in_select stmt_fn_in
+ order_qe_point_in_select stmt_fn_in
+ order_qe_point_in_select stmt_fn_in
+
nprep_sql """
ALTER table ${tableName} ADD COLUMN new_column0 INT default
"0";
"""
@@ -201,34 +240,53 @@ suite("test_point_query_ck") {
stmt.setString(3, "a ddd")
qe_point_select stmt
qe_point_select stmt
+ order_qe_point_in_select stmt_in
+ order_qe_point_in_select stmt_in
// invalidate cache
// sql "sync"
nprep_sql """ INSERT INTO ${tableName} VALUES(1235,
120939.11130, "a ddd", "xxxxxx", "2030-01-02", "2020-01-01 12:36:38",
22.822, "7022-01-01 11:30:38", 0, 1929111.1111,[119291.19291], ["111", "222",
"333"], 2) """
qe_point_select stmt
qe_point_select stmt
qe_point_select stmt
+ order_qe_point_in_select stmt_in
+ order_qe_point_in_select stmt_in
+ order_qe_point_in_select stmt_in
nprep_sql """
- ALTER table ${tableName} ADD COLUMN new_column1 INT default
"0";
+ ALTER table ${tableName} ADD COLUMN new_column1 INT
default "0";
"""
qe_point_select stmt
qe_point_select stmt
+ order_qe_point_in_select stmt_in
+ order_qe_point_in_select stmt_in
nprep_sql """
- ALTER table ${tableName} DROP COLUMN new_column1;
+ ALTER table ${tableName} DROP COLUMN new_column1;
"""
qe_point_select stmt
qe_point_select stmt
-
- nprep_sql """
- ALTER table ${tableName} ADD COLUMN new_column1 INT default
"0";
+ order_qe_point_in_select stmt_in
+ order_qe_point_in_select stmt_in
+ sql """
+ ALTER table ${tableName} ADD COLUMN new_column1 INT
default "0";
"""
- sql "select 1"
- qe_point_select stmt
+ qe_point_select stmt
+ order_qe_point_in_select stmt_in
}
+
// disable useServerPrepStmts
def result2 = connect(user, password, context.config.jdbcUrl) {
qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'"""
qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'"""
qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */
hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 =
120939.11130 and k3 = 'a ddd'"""
+ order_qt_in_sql """
+ SELECT
+ /*+ SET_VAR(enable_nereids_planner=true) */ *
+ FROM
+ ${tableName}
+ WHERE
+ k1 IN (1231, 1237) AND
+ k2 IN (119291.11, 120939.11130) AND
+ k3 IN ('ddd', 'a ddd')
+ """
// prepared text
// sql """ prepare stmt1 from select * from ${tableName}
where k1 = % and k2 = % and k3 = % """
// qt_sql """execute stmt1 using (1231, 119291.11, 'ddd')"""
@@ -257,8 +315,14 @@ suite("test_point_query_ck") {
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "false"
);"""
- sql """insert into ${tableName} values (0, "1", "2", "3")"""
+ sql """
+ insert into ${tableName} values (0, "1", "2", "3");
+ insert into ${tableName} values (1, "2", "3", "4");
+ insert into ${tableName} values (2, "3", "4", "5");
+ insert into ${tableName} values (3, "4", "5", "6");
+ """
qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where customer_key = 0"""
+ order_qt_in_sql """select /*+
SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where customer_key
in (0, 1, 2, 3, 4, 5, 6)"""
}
}
tableName = "test_ODS_EBA_LLREPORT_ck"
diff --git
a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
index 848729b4423..a98ccff0393 100644
--- a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
@@ -114,6 +114,24 @@ suite("test_point_query_partition") {
qe_point_select stmt
}
+ // IN query
+ def result2 = connect(user=user, password=password, url=prepare_url) {
+ def stmt = prepareStatement "SELECT * FROM ${tableName} WHERE k1 IN
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
+ assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt.setInt(1, 1)
+ stmt.setInt(2, 2)
+ stmt.setInt(3, 11)
+ stmt.setInt(4, -1)
+ stmt.setInt(5, 12)
+ stmt.setInt(6, 34)
+ stmt.setInt(7, 33)
+ stmt.setInt(8, 45)
+ stmt.setInt(9, 666)
+ stmt.setInt(10, 999)
+ stmt.setInt(11, 1000)
+ order_qe_point_in_select stmt
+ }
+
sql "DROP TABLE IF EXISTS regression_test_serving_p0.customer";
sql """
CREATE TABLE regression_test_serving_p0.customer (
@@ -135,8 +153,8 @@ suite("test_point_query_partition") {
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"store_row_column" = "true"
- );
- """
+ );
+ """
sql """insert into regression_test_serving_p0.customer(customer_key,
customer_value_0, customer_value_1) values(686612, "686612", "686612")"""
sql """insert into regression_test_serving_p0.customer(customer_key,
customer_value_0, customer_value_1) values(686613, "686613", "686613")"""
def result3 = connect(user, password, prepare_url) {
@@ -149,4 +167,15 @@ suite("test_point_query_partition") {
qe_point_selectmmm stmt
qe_point_selecteee stmt
}
+
+ // IN query
+ def result4 = connect(user=user, password=password, url=prepare_url) {
+ def stmt = prepareStatement "SELECT /*+
SET_VAR(enable_nereids_planner=true) */ * FROM
regression_test_serving_p0.customer WHERE customer_key IN (?, ?)"
+ stmt.setInt(1, 686612)
+ stmt.setInt(2, 686613)
+ order_qe_point_in_selectxxx stmt
+ order_qe_point_in_selectyyy stmt
+ order_qe_point_in_selectmmm stmt
+ order_qe_point_in_selecteee stmt
+ }
}
\ No newline at end of file
diff --git a/regression-test/suites/point_query_p0/test_rowstore.groovy
b/regression-test/suites/point_query_p0/test_rowstore.groovy
index 13279e3ce87..8d943f2cee8 100644
--- a/regression-test/suites/point_query_p0/test_rowstore.groovy
+++ b/regression-test/suites/point_query_p0/test_rowstore.groovy
@@ -204,14 +204,26 @@ suite("test_rowstore", "p0,nonConcurrent") {
assertEquals(stmt.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
qe_point_select stmt
}
+ def prep_in_sql = { sql_str, in_list ->
+ def stmt = prepareStatement sql_str
+ for (int i = 0; i < in_list.size(); i++) {
+ stmt.setInt(i + 1, in_list[i])
+ }
+ assertEquals(stmt.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
+ order_qe_point_in_select stmt
+ }
def sql_str = "select v1, v2 from table_with_column_group where k1 = ?"
+ def sql_in_str = "select v1, v2 from table_with_column_group where k1
in (?, ?, ?, ?)"
prep_sql sql_str, 1
prep_sql sql_str, 2
prep_sql sql_str, 3
+ prep_in_sql sql_in_str, [1, 2, 3, 10]
sql_str = "select v2 from table_with_column_group where k1 = ?"
+ sql_in_str = "select v2 from table_with_column_group where k1 in (?,
?, ?, ?)"
prep_sql sql_str, 1
prep_sql sql_str, 2
prep_sql sql_str, 3
+ prep_in_sql sql_in_str, [1, 2, 3, 10]
sql_str = "select v1 from table_with_column_group where k1 = ?"
prep_sql sql_str, 3
sql_str = "select v2, v1 from table_with_column_group where k1 = ?"
@@ -222,14 +234,18 @@ suite("test_rowstore", "p0,nonConcurrent") {
prep_sql sql_str, 1
sql_str = "select v2 from table_with_column_group2 where k1 = ?"
+ sql_in_str = "select v2 from table_with_column_group2 where k1 in (?,
?, ?, ?)"
prep_sql sql_str, 1
prep_sql sql_str, 2
prep_sql sql_str, 3
+ prep_in_sql sql_in_str, [1, 2, 3, 10]
sql_str = "select v4 from table_with_column_group3 where k1 = ?"
+ sql_in_str = "select v4 from table_with_column_group3 where k1 in (?,
?, ?, ?)"
prep_sql sql_str, 1
prep_sql sql_str, 2
prep_sql sql_str, 3
+ prep_in_sql sql_in_str, [1, 2, 3, 10]
def setPrepareStmtArgs = {stmt, user_id, date, datev2, datetimev2_1,
datetimev2_2, city, age, sex ->
java.text.SimpleDateFormat formater = new
java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS");
@@ -242,6 +258,39 @@ suite("test_rowstore", "p0,nonConcurrent") {
stmt.setInt(7, age)
stmt.setInt(8, sex)
}
+ def setInPrepareStmtArgs = {stmt, user_id_list, date_list, datev2_list,
+ datetimev2_1_list, datetimev2_2_list,
city_list, age_list, sex_list ->
+ java.text.SimpleDateFormat formater = new
java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS");
+ def total_size = user_id_list.size() + date_list.size() +
datev2_list.size() +
+ datetimev2_1_list.size() +
datetimev2_2_list.size() + city_list.size() +
+ age_list.size() + sex_list.size()
+ def idx = 0
+ for (int i = 0; i < user_id_list.size(); i++) {
+ stmt.setInt(++idx, user_id_list[i])
+ }
+ for (int i = 0; i < date_list.size(); i++) {
+ stmt.setDate(++idx, java.sql.Date.valueOf(date_list[i]))
+ }
+ for (int i = 0; i < datev2_list.size(); i++) {
+ stmt.setDate(++idx, java.sql.Date.valueOf(datev2_list[i]))
+ }
+ for (int i = 0; i < datetimev2_1_list.size(); i++) {
+ stmt.setTimestamp(++idx, new
java.sql.Timestamp(formater.parse(datetimev2_1_list[i]).getTime()))
+ }
+ for (int i = 0; i < datetimev2_2_list.size(); i++) {
+ stmt.setTimestamp(++idx, new
java.sql.Timestamp(formater.parse(datetimev2_2_list[i]).getTime()))
+ }
+ for (int i = 0; i < city_list.size(); i++) {
+ stmt.setString(++idx, city_list[i])
+ }
+ for (int i = 0; i < age_list.size(); i++) {
+ stmt.setInt(++idx, age_list[i])
+ }
+ for (int i = 0; i < sex_list.size(); i++) {
+ stmt.setInt(++idx, sex_list[i])
+ }
+ assertEquals(idx, total_size)
+ }
def stmt = prepareStatement """ SELECT
datetimev2_1,datetime_val1,datetime_val2,max_dwell_time FROM
table_with_column_group_xxx t where user_id = ? and date = ? and datev2 = ? and
datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """
setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01', '2017-10-01
11:11:11.21', '2017-10-01 11:11:11.11', 'Beijing', 10, 1
@@ -260,6 +309,27 @@ suite("test_rowstore", "p0,nonConcurrent") {
qe_point_select stmt
setPrepareStmtArgs stmt, 4, '2017-10-01', '2017-10-01', '2017-10-01
11:11:11.28', '2017-10-01 11:11:11.18', 'Beijing', 10, 1
qe_point_select stmt
+
+ def in_stmt = prepareStatement """
+ SELECT
+ datetimev2_1,datetime_val1,datetime_val2,max_dwell_time
+ FROM
+ table_with_column_group_xxx t
+ WHERE
+ user_id IN (?, ?, ?, ?, ?) AND
+ date IN (?, ?) AND
+ datev2 IN (?, ?) AND
+ datetimev2_1 IN (?, ?, ?, ?, ?, ?, ?, ?) AND
+ datetimev2_2 IN (?, ?, ?, ?, ?, ?, ?, ?) AND
+ city IN (?, ?) AND
+ age IN (?, ?, ?, ?, ?) AND
+ sex IN (?, ?);
+ """
+ setInPrepareStmtArgs in_stmt , [1, 2, 3, 4, 5], ['2017-10-01',
'2017-10-02'], ['2017-10-01', '2017-10-03'],
+ ['2017-10-01 11:11:11.21', '2017-10-01 11:11:11.22', '2017-10-01
11:11:11.23', '2017-10-01 11:11:11.24', '2017-10-01 11:11:11.25', '2017-10-01
11:11:11.26', '2017-10-01 11:11:11.27', '2017-10-01 11:11:11.28'],
+ ['2017-10-01 11:11:11.11', '2017-10-01 11:11:11.12', '2017-10-01
11:11:11.13', '2017-10-01 11:11:11.14', '2017-10-01 11:11:11.15', '2017-10-01
11:11:11.16', '2017-10-01 11:11:11.17', '2017-10-01 11:11:11.18'],
+ ['Beijing', 'Shanghai'], [10, 11, 12, 13, 14], [0, 1]
+ order_qe_point_in_select in_stmt
}
sql "DROP TABLE IF EXISTS table_with_column_group4"
diff --git a/regression-test/suites/point_query_p0/test_rowstore_query.groovy
b/regression-test/suites/point_query_p0/test_rowstore_query.groovy
index db5f74f3f61..bd4d51ab29b 100644
--- a/regression-test/suites/point_query_p0/test_rowstore_query.groovy
+++ b/regression-test/suites/point_query_p0/test_rowstore_query.groovy
@@ -39,5 +39,6 @@ suite("test_rowstore", "p0") {
sql """insert into ${tableName} values (1, 'abc', 1111919.12345678919)"""
qt_sql """select * from ${tableName}"""
sql """insert into ${tableName} values (2, 'def', 1111919.12345678919)"""
- qt_sql """select * from ${tableName} where k1 = 2"""
+ qt_point_sql """select * from ${tableName} where k1 = 2"""
+ qt_point_in_sql """select * from ${tableName} where k1 in (1, 2, 3, 4,
5)"""
}
\ No newline at end of file
diff --git
a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
index 54ec1efa4b3..e6c161a1a56 100644
--- a/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
+++ b/regression-test/suites/prepared_stmt_p0/prepared_stmt_in_list.groovy
@@ -190,5 +190,44 @@ suite("test_prepared_stmt_in_list", "nonConcurrent") {
stmt_read10.setString(4, '5022-01-01 11:30:38')
stmt_read10.setString(5, '6022-01-01 11:30:38')
qe_stmt_read10_2 stmt_read10
+
+ table_name = "tbl_prepared_stmt_in_list2"
+ sql """DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` tinyint NULL COMMENT "",
+ `k2` smallint NULL COMMENT "",
+ `k3` int NULL COMMENT ""
+ ) ENGINE=OLAP
+ UNIQUE KEY(`k1`, `k2`)
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 16
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "light_schema_change" = "true",
+ "storage_format" = "V2",
+ "store_row_column" = "true"
+ )
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES(1, 1300, 55356821) """
+ sql """ INSERT INTO ${tableName} VALUES(2, 1301, 56052706) """
+ sql """ INSERT INTO ${tableName} VALUES(3, 1302, 55702967) """
+ sql """ INSERT INTO ${tableName} VALUES(4, 1303, 56054326) """
+ sql """ INSERT INTO ${tableName} VALUES(5, 1304, 36548425) """
+ sql """ INSERT INTO ${tableName} VALUES(6, 1305, 56054803) """
+ sql """ INSERT INTO ${tableName} VALUES(7, 1306, 56055112) """
+ sql """sync"""
+
+ def stmt_read11 = prepareStatement "select * from ${tableName} WHERE
`k1` IN (?, ?, ?, ?, ?) AND `k2` IN (?, ?, ?, ?)"
+ stmt_read11.setByte(1, (byte) 1)
+ stmt_read11.setByte(2, (byte) 2)
+ stmt_read11.setByte(3, (byte) 3)
+ stmt_read11.setByte(4, (byte) 4)
+ stmt_read11.setByte(5, (byte) 5)
+ stmt_read11.setShort(6, (short) 1300)
+ stmt_read11.setShort(7, (short) 1301)
+ stmt_read11.setShort(8, (short) 1302)
+ stmt_read11.setShort(9, (short) 1303)
+ order_qe_stmt_read11_1 stmt_read11
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]