This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant by this push:
new a0c28dc7d6 [improve](variant)Desc variant (#23111)
a0c28dc7d6 is described below
commit a0c28dc7d61711e23fa7fa264c009f430b307aa2
Author: Chenyang Sun <[email protected]>
AuthorDate: Thu Aug 17 17:32:59 2023 +0800
[improve](variant)Desc variant (#23111)
support desc variant with remote fetch
---------
Co-authored-by: eldenmoon <[email protected]>
---
be/src/olap/rowset/segment_v2/column_reader.h | 5 +-
be/src/olap/rowset/segment_v2/segment.cpp | 6 +-
be/src/service/internal_service.cpp | 106 +++++
be/src/service/internal_service.h | 5 +
fe/fe-core/src/main/cup/sql_parser.cup | 4 +-
.../org/apache/doris/analysis/DescribeStmt.java | 28 +-
.../java/org/apache/doris/catalog/OlapTable.java | 19 +
.../apache/doris/common/proc/IndexInfoProcDir.java | 3 +
.../doris/common/proc/IndexSchemaProcNode.java | 8 +-
.../common/proc/RemoteIndexSchemaProcDir.java | 113 ++++++
.../common/proc/RemoteIndexSchemaProcNode.java | 72 ++++
.../common/util/FetchRemoteTabletSchemaUtil.java | 320 ++++++++++++++++
.../org/apache/doris/rpc/BackendServiceClient.java | 5 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 12 +
gensrc/proto/internal_service.proto | 18 +
regression-test/data/variant_p0/desc.out | 101 +++++
regression-test/suites/variant_p0/desc.groovy | 174 +++++++++
regression-test/suites/variant_p0/load.groovy | 426 ++++++++++-----------
18 files changed, 1200 insertions(+), 225 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index 1d21c2ac91..c27daa43ec 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -493,8 +493,8 @@ private:
// Cache the sub column iterators and columns to reduce data read amplification
class CachedStreamIterator : public ColumnIterator {
public:
- CachedStreamIterator(SubstreamCache* stream_cache, const
vectorized::PathInData& path)
- : _stream_cache(stream_cache), _path(path) {}
+ CachedStreamIterator(const vectorized::PathInData& path)
+ : _path(path) {}
~CachedStreamIterator() override = default;
@@ -527,7 +527,6 @@ private:
_rows_read += nrows;
return Status::OK();
}
- SubstreamCache* _stream_cache;
vectorized::PathInData _path;
size_t _rows_read = 0;
// could duplicate with nodes under _path, since node_func is idempotent
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index eef524bfa3..3aa0ce2e65 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -416,12 +416,12 @@ Status Segment::new_iterator_with_path(const
TabletColumn& tablet_column,
if (node != nullptr && node->is_scalar() && node->children.empty()) {
// Direct read extracted columns
const auto* node =
_sub_column_tree.find_leaf(tablet_column.path_info());
- auto cache_iter = new CachedStreamIterator(stream_cache,
tablet_column.path_info());
+ auto cache_iter = new CachedStreamIterator(tablet_column.path_info());
RETURN_IF_ERROR(add_stream(cache_iter, node));
iter->reset(cache_iter);
} else if (node != nullptr && !node->children.empty()) {
// None leave node need merge with root
- auto* stream_iter = new CachedStreamIterator(stream_cache,
tablet_column.path_info());
+ auto* stream_iter = new
CachedStreamIterator(tablet_column.path_info());
std::vector<const SubcolumnColumnReaders::Node*> leaves;
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
@@ -446,7 +446,7 @@ Status Segment::new_iterator_with_path(const TabletColumn&
tablet_column,
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
return Status::OK();
}
- auto cache_iter = new CachedStreamIterator(stream_cache,
tablet_column.path_info());
+ auto cache_iter = new CachedStreamIterator(tablet_column.path_info());
RETURN_IF_ERROR(add_stream(cache_iter, node));
iter->reset(cache_iter);
}
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 610c45aa4d..ac692472fa 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -25,6 +25,7 @@
#include <butil/errno.h>
#include <butil/iobuf.h>
#include <fcntl.h>
+#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Status_types.h>
@@ -93,6 +94,7 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
+#include "service/backend_options.h"
#include "service/point_query_executor.h"
#include "util/async_io.h"
#include "util/brpc_client_cache.h"
@@ -112,6 +114,7 @@
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
+#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
@@ -720,6 +723,109 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
response->mutable_status()->set_status_code(TStatusCode::OK);
}
+template <class RPCResponse>
+struct AsyncRPCContext {
+ RPCResponse response;
+ brpc::Controller cntl;
+ brpc::CallId cid;
+};
+
+void
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
+ const
PFetchRemoteSchemaRequest* request,
+
PFetchRemoteSchemaResponse* response,
+
google::protobuf::Closure* done) {
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ auto merge_schema =
+ [](const std::vector<TabletSchemaSPtr>& input_schema) ->
TabletSchemaSPtr {
+ TabletSchemaSPtr merged_schema = std::make_shared<TabletSchema>();
+ vectorized::schema_util::get_least_common_schema(input_schema,
merged_schema);
+ return merged_schema;
+ };
+
+ brpc::ClosureGuard closure_guard(done);
+ Status st = Status::OK();
+ if (request->is_coordinator()) {
+ // Spawn rpc request to none coordinator nodes, and finally merge
them all
+ PFetchRemoteSchemaRequest remote_request(*request);
+ // set it none coordinator to get merged schema
+ remote_request.set_is_coordinator(false);
+ using PFetchRemoteTabletSchemaRpcContext =
AsyncRPCContext<PFetchRemoteSchemaResponse>;
+ std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts(
+ request->tablet_location_size());
+ for (int i = 0; i < request->tablet_location_size(); ++i) {
+ std::string host = request->tablet_location(i).host();
+ int32_t brpc_port = request->tablet_location(i).brpc_port();
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ host, brpc_port));
+ stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl,
&remote_request,
+ &rpc_contexts[i].response,
brpc::DoNothing());
+ }
+ std::vector<TabletSchemaSPtr> schemas;
+ for (auto& rpc_context : rpc_contexts) {
+ brpc::Join(rpc_context.cntl.call_id());
+ if (rpc_context.cntl.Failed()) {
+ LOG(WARNING) << "fetch_remote_tablet_schema rpc err:"
+ << rpc_context.cntl.ErrorText();
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ rpc_context.cntl.remote_side());
+ st = Status::InternalError("fetch_remote_tablet_schema rpc
err: {}",
+ rpc_context.cntl.ErrorText());
+ break;
+ }
+ if (rpc_context.response.status().status_code() != 0) {
+ st = Status::create(rpc_context.response.status());
+ break;
+ }
+ if (rpc_context.response.has_merged_schema()) {
+ TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
+ schema->init_from_pb(rpc_context.response.merged_schema());
+ schemas.push_back(schema);
+ }
+ }
+ if (!schemas.empty()) {
+ // merge all
+ TabletSchemaSPtr merged_schema = merge_schema(schemas);
+ merged_schema->to_schema_pb(response->mutable_merged_schema());
+ }
+ st.to_protobuf(response->mutable_status());
+ return;
+ }
+
+ // This is not a coordinator, get it's tablet and merge schema
+ std::vector<int64_t> target_tablets;
+ for (int i = 0; i < request->tablet_location_size(); ++i) {
+ const auto& location = request->tablet_location(i);
+ auto backend = BackendOptions::get_local_backend();
+ // If this is the target backend
+ if (backend.host == location.host() && config::brpc_port ==
location.brpc_port()) {
+ target_tablets.assign(location.tablet_id().begin(),
location.tablet_id().end());
+ break;
+ }
+ }
+ if (!target_tablets.empty()) {
+ std::vector<TabletSchemaSPtr> tablet_schemas;
+ for (int64_t tablet_id : target_tablets) {
+ TabletSharedPtr tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
+ if (tablet == nullptr) {
+ // just ignore
+ continue;
+ }
+ tablet_schemas.push_back(tablet->tablet_schema());
+ }
+
+ // merge all
+ TabletSchemaSPtr merged_schema = merge_schema(tablet_schemas);
+ merged_schema->to_schema_pb(response->mutable_merged_schema());
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(response, done, _heavy_work_pool);
+ }
+}
+
void
PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController*
controller,
const
PReportStreamLoadStatusRequest* request,
PReportStreamLoadStatusResponse* response,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 55b51cf40a..799038a834 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -189,6 +189,11 @@ public:
void glob(google::protobuf::RpcController* controller, const PGlobRequest*
request,
PGlobResponse* response, google::protobuf::Closure* done)
override;
+ void fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
+ const PFetchRemoteSchemaRequest* request,
+ PFetchRemoteSchemaResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index add419fa14..5466ca7f80 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -4423,9 +4423,9 @@ opt_explain_options ::=
// Describe statement
describe_stmt ::=
- describe_command table_name:table
+ describe_command table_name:table opt_partition_names:partitionNames
{:
- RESULT = new DescribeStmt(table, false);
+ RESULT = new DescribeStmt(table, false, partitionNames);
:}
| KW_SHOW KW_FIELDS KW_FROM table_name:table
{:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java
index f632c00f9c..2aa6b47c46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java
@@ -92,6 +92,7 @@ public class DescribeStmt extends ShowStmt {
private TableName dbTableName;
private ProcNodeInterface node;
+ private PartitionNames partitionNames;
List<List<String>> totalRows = new LinkedList<List<String>>();
@@ -106,6 +107,12 @@ public class DescribeStmt extends ShowStmt {
this.isAllTables = isAllTables;
}
+ public DescribeStmt(TableName dbTableName, boolean isAllTables,
PartitionNames partitionNames) {
+ this.dbTableName = dbTableName;
+ this.isAllTables = isAllTables;
+ this.partitionNames = partitionNames;
+ }
+
public DescribeStmt(TableValuedFunctionRef tableValuedFunctionRef) {
this.tableValuedFunctionRef = tableValuedFunctionRef;
this.isTableValuedFunction = true;
@@ -156,6 +163,13 @@ public class DescribeStmt extends ShowStmt {
return;
}
+ if (partitionNames != null) {
+ partitionNames.analyze(analyzer);
+ if (partitionNames.isTemp()) {
+ throw new AnalysisException("Do not support temp partitions");
+ }
+ }
+
dbTableName.analyze(analyzer);
if (!Env.getCurrentEnv().getAccessManager()
@@ -178,9 +192,21 @@ public class DescribeStmt extends ShowStmt {
if (table.getType() == TableType.OLAP) {
procString += ((OlapTable) table).getBaseIndexId();
} else {
+ if (partitionNames != null) {
+ throw new AnalysisException("Describe table[" +
dbTableName.getTbl() + "] failed");
+ }
procString += table.getId();
}
-
+ if (partitionNames != null) {
+ procString += "/";
+ StringBuilder builder = new StringBuilder();
+ for (String str : partitionNames.getPartitionNames()) {
+ builder.append(str);
+ builder.append(",");
+ }
+ builder.deleteCharAt(builder.length() - 1);
+ procString += builder.toString();
+ }
node = ProcService.getInstance().open(procString);
if (node == null) {
throw new AnalysisException("Describe table[" +
dbTableName.getTbl() + "] failed");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index aec2c4d997..1735df7358 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2276,4 +2276,23 @@ public class OlapTable extends Table {
}
}
}
+
+ public List<Tablet> getAllTablets() throws AnalysisException {
+ List<Tablet> tablets = Lists.newArrayList();
+ for (Partition partition : getPartitions()) {
+ for (Tablet tablet : partition.getBaseIndex().getTablets()) {
+ tablets.add(tablet);
+ }
+ }
+ return tablets;
+ }
+
+ public boolean hasVariantColumns() {
+ for (Column column : getBaseSchema()) {
+ if (column.getType().isVariantType()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java
index 4775ba23ef..24882a3e97 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java
@@ -128,6 +128,9 @@ public class IndexInfoProcDir implements ProcDirInterface {
throw new AnalysisException("Index " + idxId + " does not
exist");
}
bfColumns = olapTable.getCopiedBfColumns();
+ if (olapTable.hasVariantColumns()) {
+ return new RemoteIndexSchemaProcDir(table, schema,
bfColumns);
+ }
} else {
schema = table.getBaseSchema();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java
index 47da7a9d53..6f125217ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java
@@ -49,10 +49,8 @@ public class IndexSchemaProcNode implements
ProcNodeInterface {
this.bfColumns = bfColumns;
}
- @Override
- public ProcResult fetchResult() throws AnalysisException {
+ public static ProcResult createResult(List<Column> schema, Set<String>
bfColumns) throws AnalysisException {
Preconditions.checkNotNull(schema);
-
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
@@ -105,4 +103,8 @@ public class IndexSchemaProcNode implements
ProcNodeInterface {
return result;
}
+ @Override
+ public ProcResult fetchResult() throws AnalysisException {
+ return createResult(this.schema, this.bfColumns);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
new file mode 100644
index 0000000000..43dfe89d73
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/*
+ * SHOW PROC /dbs/dbId/tableId/index_schema/indexId"
+ * show index schema
+ */
+public class RemoteIndexSchemaProcDir implements ProcDirInterface {
+ public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
+ .add("Field").add("Type").add("Null").add("Key")
+ .add("Default").add("Extra")
+ .build();
+
+ private List<Column> schema;
+ private Set<String> bfColumns;
+ private TableIf table;
+
+ public RemoteIndexSchemaProcDir(TableIf table, List<Column> schema,
Set<String> bfColumns) {
+ this.table = table;
+ this.schema = schema;
+ this.bfColumns = bfColumns;
+ }
+
+ @Override
+ public ProcResult fetchResult() throws AnalysisException {
+ Preconditions.checkNotNull(table);
+ Preconditions.checkNotNull(schema);
+ List<Tablet> tablets = null;
+ table.readLock();
+ try {
+ OlapTable olapTable = (OlapTable) table;
+ tablets = olapTable.getAllTablets();
+ } finally {
+ table.readUnlock();
+ }
+ List<Column> remoteSchema = new
FetchRemoteTabletSchemaUtil(tablets).fetch();
+ if (remoteSchema == null || remoteSchema.isEmpty()) {
+ throw new AnalysisException("fetch remote tablet schema failed");
+ }
+ this.schema = remoteSchema;
+ return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
+ }
+
+ @Override
+ public boolean register(String name, ProcNodeInterface node) {
+ return false;
+ }
+
+ @Override
+ public ProcNodeInterface lookup(String partitionString) throws
AnalysisException {
+ Preconditions.checkNotNull(table);
+
+ List<String> partitionNameList = new
ArrayList<String>(Arrays.asList(partitionString.split(",")));
+ if (partitionNameList == null || partitionNameList.isEmpty()) {
+ throw new AnalysisException("Describe table[" + table.getName() +
"] failed");
+ }
+ List<Partition> partitions = Lists.newArrayList();
+ table.readLock();
+ try {
+ if (table.getType() == TableType.OLAP) {
+ OlapTable olapTable = (OlapTable) table;
+ for (String partitionName : partitionNameList) {
+ Partition partition =
olapTable.getPartition(partitionName);
+ if (partition == null) {
+ throw new AnalysisException("Partition " +
partitionName + " does not exist");
+ }
+ partitions.add(partition);
+ }
+ } else {
+ throw new AnalysisException("Describe table[" +
table.getName() + "] failed");
+ }
+ } catch (Throwable t) {
+ throw new AnalysisException("Describe table[" + table.getName() +
"] failed");
+ } finally {
+ table.readUnlock();
+ }
+ return new RemoteIndexSchemaProcNode(partitions, this.schema,
this.bfColumns);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
new file mode 100644
index 0000000000..f3a5760ade
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+/*
+ * SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName"
+ * show index schema
+ */
+public class RemoteIndexSchemaProcNode implements ProcNodeInterface {
+ public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
+ .add("Field").add("Type").add("Null").add("Key")
+ .add("Default").add("Extra")
+ .build();
+
+ private List<Partition> partitions;
+ private List<Column> schema;
+ private Set<String> bfColumns;
+
+ public RemoteIndexSchemaProcNode(List<Partition> partitions, List<Column>
schema, Set<String> bfColumns) {
+ this.partitions = partitions;
+ this.schema = schema;
+ this.bfColumns = bfColumns;
+ }
+
+ @Override
+ public ProcResult fetchResult() throws AnalysisException {
+ Preconditions.checkNotNull(schema);
+ Preconditions.checkNotNull(partitions);
+ List<Tablet> tablets = Lists.newArrayList();
+ for (Partition partition : partitions) {
+ MaterializedIndex idx = partition.getBaseIndex();
+ for (Tablet tablet : idx.getTablets()) {
+ tablets.add(tablet);
+ }
+ }
+ List<Column> remoteSchema = new
FetchRemoteTabletSchemaUtil(tablets).fetch();
+ if (remoteSchema == null || remoteSchema.isEmpty()) {
+ throw new AnalysisException("fetch remote tablet schema failed");
+ }
+ this.schema = remoteSchema;
+ return IndexSchemaProcNode.createResult(this.schema, this.bfColumns);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
new file mode 100644
index 0000000000..7145ac1acf
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
+import org.apache.doris.proto.InternalService.PTabletsLocation;
+import org.apache.doris.proto.OlapFile.ColumnPB;
+import org.apache.doris.proto.OlapFile.TabletSchemaPB;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public class FetchRemoteTabletSchemaUtil {
+ private static final Logger LOG =
LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
+
+ private List<Tablet> tablets;
+ private List<Column> columns;
+
+ public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
+ this.tablets = tablets;
+ this.columns = Lists.newArrayList();
+ }
+
+ public List<Column> fetch() {
+ // find be
+ Preconditions.checkNotNull(tablets);
+ Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
+ for (Tablet tablet : tablets) {
+ for (Replica replica : tablet.getReplicas()) {
+ Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
+ replica.getBackendId(), k ->
Sets.newHashSet());
+ tabletIds.add(tablet.getId());
+ }
+ }
+
+ // build PTabletsLocation
+ List<PTabletsLocation> locations = Lists.newArrayList();
+ List<Backend> coordinatorBackend = Lists.newArrayList();
+ for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
+ Long backendId = entry.getKey();
+ Set<Long> tabletIds = entry.getValue();
+ Backend backend =
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
+ if (coordinatorBackend.size() < 2) {
+ coordinatorBackend.add(backend);
+ }
+ PTabletsLocation.Builder locationBuilder =
PTabletsLocation.newBuilder()
+
.setHost(backend.getHost())
+
.setBrpcPort(backend.getBrpcPort());
+ PTabletsLocation location =
locationBuilder.addAllTabletId(tabletIds).build();
+ locations.add(location);
+ }
+ PFetchRemoteSchemaRequest.Builder requestBuilder =
PFetchRemoteSchemaRequest.newBuilder()
+
.addAllTabletLocation(locations)
+
.setIsCoordinator(true);
+ // send rpc to coordinatorBackend util succeed or 2 times
+ for (Backend be : coordinatorBackend) {
+ try {
+ PFetchRemoteSchemaRequest request = requestBuilder.build();
+ Future<PFetchRemoteSchemaResponse> future =
BackendServiceProxy.getInstance()
+
.fetchRemoteTabletSchemaAsync(be.getBrpcAdress(), request);
+ PFetchRemoteSchemaResponse response = null;
+ try {
+ response = future.get(60, TimeUnit.SECONDS);
+ TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
+ String errMsg;
+ if (code != TStatusCode.OK) {
+ if
(!response.getStatus().getErrorMsgsList().isEmpty()) {
+ errMsg =
response.getStatus().getErrorMsgsList().get(0);
+ } else {
+ errMsg = "fetchRemoteTabletSchemaAsync failed.
backend address: "
+ + be.getHost() + " : " + be.getBrpcPort();
+ }
+ throw new AnalysisException(errMsg);
+ }
+ fillColumns(response);
+ break;
+ } catch (AnalysisException e) {
+ // continue to get result
+ LOG.warn(e);
+ } catch (InterruptedException e) {
+ // continue to get result
+ LOG.warn("fetch remote schema future get interrupted
Exception");
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ // continue to get result
+ LOG.warn("fetch remote schema result timeout, addr {}",
be.getBrpcAdress());
+ }
+ } catch (RpcException e) {
+ LOG.warn("fetch remote schema result rpc exception {}, e {}",
be.getBrpcAdress(), e);
+ } catch (ExecutionException e) {
+ LOG.warn("fetch remote schema result execution exception {},
addr {}", e, be.getBrpcAdress());
+ }
+ }
+ return columns;
+ }
+
+ private void fillColumns(PFetchRemoteSchemaResponse response) throws
AnalysisException {
+ TabletSchemaPB schemaPB = response.getMergedSchema();
+ for (ColumnPB columnPB : schemaPB.getColumnList()) {
+ try {
+ Column remoteColumn = initColumnFromPB(columnPB);
+ columns.add(remoteColumn);
+ } catch (Exception e) {
+ throw new AnalysisException("default value to string failed");
+ }
+ }
+ int variantColumntIdx = 0;
+ for (Column column : columns) {
+ variantColumntIdx++;
+ if (column.getType().isVariantType()) {
+ break;
+ }
+ }
+ if (variantColumntIdx == columns.size()) {
+ return;
+ }
+ List<Column> subList = columns.subList(variantColumntIdx,
columns.size());
+ Collections.sort(subList, new Comparator<Column>() {
+ @Override
+ public int compare(Column c1, Column c2) {
+ return c1.getName().compareTo(c2.getName());
+ }
+ });
+ }
+
+ private Column initColumnFromPB(ColumnPB column) throws AnalysisException {
+ try {
+ AggregateType aggType =
getAggTypeFromAggName(column.getAggregation());
+ Type type = getTypeFromTypeName(column.getType());
+ String columnName = column.getName();
+ boolean isKey = column.getIsKey();
+ boolean isNullable = column.getIsNullable();
+ String defaultValue = column.getDefaultValue().toString("UTF-8");
+ if (defaultValue.equals("")) {
+ defaultValue = "NULL";
+ }
+ do {
+ if (type.isArrayType()) {
+ List<ColumnPB> childColumn =
column.getChildrenColumnsList();
+ if (childColumn == null || childColumn.size() != 1) {
+ break;
+ }
+ Column child = initColumnFromPB(childColumn.get(0));
+ type = new ArrayType(child.getType());
+ } else if (type.isMapType()) {
+ List<ColumnPB> childColumn =
column.getChildrenColumnsList();
+ if (childColumn == null || childColumn.size() != 2) {
+ break;
+ }
+ Column keyChild = initColumnFromPB(childColumn.get(0));
+ Column valueChild = initColumnFromPB(childColumn.get(1));
+ type = new MapType(keyChild.getType(),
valueChild.getType());
+ } else if (type.isStructType()) {
+ List<ColumnPB> childColumn =
column.getChildrenColumnsList();
+ if (childColumn == null) {
+ break;
+ }
+ List<Type> childTypes = Lists.newArrayList();
+ for (ColumnPB childPB : childColumn) {
+ childTypes.add(initColumnFromPB(childPB).getType());
+ }
+ type = new StructType(childTypes);
+ }
+ } while (false);
+ return new Column(columnName, type, isKey, aggType, isNullable,
+ defaultValue, "remote
schema");
+ } catch (Exception e) {
+ throw new AnalysisException("default value to string failed");
+ }
+ }
+
+ private Type getTypeFromTypeName(String typeName) {
+ Type type;
+ if (typeName.equals("TINYINT")) {
+ type = Type.TINYINT;
+ } else if (typeName.equals("SMALLINT")) {
+ type = Type.SMALLINT;
+ } else if (typeName.equals("INT")) {
+ type = Type.INT;
+ } else if (typeName.equals("BIGINT")) {
+ type = Type.BIGINT;
+ } else if (typeName.equals("LARGEINT")) {
+ type = Type.LARGEINT;
+ } else if (typeName.equals("UNSIGNED_TINYINT")) {
+ type = Type.BIGINT;
+ } else if (typeName.equals("UNSIGNED_SMALLINT")) {
+ type = Type.UNSUPPORTED;
+ } else if (typeName.equals("UNSIGNED_INT")) {
+ type = Type.UNSUPPORTED;
+ } else if (typeName.equals("UNSIGNED_BIGINT")) {
+ type = Type.UNSUPPORTED;
+ } else if (typeName.equals("FLOAT")) {
+ type = Type.FLOAT;
+ } else if (typeName.equals("DISCRETE_DOUBLE")) {
+ type = Type.DOUBLE;
+ } else if (typeName.equals("DOUBLE")) {
+ type = Type.DOUBLE;
+ } else if (typeName.equals("CHAR")) {
+ type = Type.CHAR;
+ } else if (typeName.equals("DATE")) {
+ type = Type.DATE;
+ } else if (typeName.equals("DATEV2")) {
+ type = Type.DATEV2;
+ } else if (typeName.equals("DATETIMEV2")) {
+ type = Type.DATETIMEV2;
+ } else if (typeName.equals("DATETIME")) {
+ type = Type.DATETIME;
+ } else if (typeName.equals("DECIMAL32")) {
+ type = Type.DECIMAL32;
+ } else if (typeName.equals("DECIMAL64")) {
+ type = Type.DECIMAL64;
+ } else if (typeName.equals("DECIMAL128I")) {
+ type = Type.DECIMAL128;
+ } else if (typeName.equals("DECIMAL")) {
+ type = Type.DECIMALV2;
+ } else if (typeName.equals("VARCHAR")) {
+ type = Type.VARCHAR;
+ } else if (typeName.equals("STRING")) {
+ type = Type.STRING;
+ } else if (typeName.equals("JSONB")) {
+ type = Type.JSONB;
+ } else if (typeName.equals("VARIANT")) {
+ type = Type.VARIANT;
+ } else if (typeName.equals("BOOLEAN")) {
+ type = Type.BOOLEAN;
+ } else if (typeName.equals("HLL")) {
+ type = Type.HLL;
+ } else if (typeName.equals("STRUCT")) {
+ type = Type.STRUCT;
+ } else if (typeName.equals("LIST")) {
+ type = Type.UNSUPPORTED;
+ } else if (typeName.equals("MAP")) {
+ type = Type.MAP;
+ } else if (typeName.equals("OBJECT")) {
+ type = Type.UNSUPPORTED;
+ } else if (typeName.equals("ARRAY")) {
+ type = Type.ARRAY;
+ } else if (typeName.equals("QUANTILE_STATE")) {
+ type = Type.QUANTILE_STATE;
+ } else if (typeName.equals("AGG_STATE")) {
+ type = Type.AGG_STATE;
+ } else {
+ type = Type.UNSUPPORTED;
+ }
+ return type;
+ }
+
+ private AggregateType getAggTypeFromAggName(String aggName) {
+ AggregateType aggType;
+ if (aggName.equals("NONE")) {
+ aggType = AggregateType.NONE;
+ } else if (aggName.equals("SUM")) {
+ aggType = AggregateType.SUM;
+ } else if (aggName.equals("MIN")) {
+ aggType = AggregateType.MIN;
+ } else if (aggName.equals("MAX")) {
+ aggType = AggregateType.MAX;
+ } else if (aggName.equals("REPLACE")) {
+ aggType = AggregateType.REPLACE;
+ } else if (aggName.equals("REPLACE_IF_NOT_NULL")) {
+ aggType = AggregateType.REPLACE_IF_NOT_NULL;
+ } else if (aggName.equals("HLL_UNION")) {
+ aggType = AggregateType.HLL_UNION;
+ } else if (aggName.equals("BITMAP_UNION")) {
+ aggType = AggregateType.BITMAP_UNION;
+ } else if (aggName.equals("QUANTILE_UNION")) {
+ aggType = AggregateType.QUANTILE_UNION;
+ } else if (!aggName.isEmpty()) {
+ aggType = AggregateType.GENERIC_AGGREGATION;
+ } else {
+ aggType = AggregateType.NONE;
+ }
+ return aggType;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index d012e757ef..a0a4fbaa83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -147,6 +147,11 @@ public class BackendServiceClient {
return stub.reportStreamLoadStatus(request);
}
+ public Future<InternalService.PFetchRemoteSchemaResponse>
fetchRemoteTabletSchemaAsync(
+ InternalService.PFetchRemoteSchemaRequest request) {
+ return stub.fetchRemoteTabletSchema(request);
+ }
+
public Future<InternalService.PGlobResponse>
glob(InternalService.PGlobRequest request) {
return stub.glob(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index d9fae1daf9..dedc832879 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -411,4 +411,16 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PFetchRemoteSchemaResponse>
fetchRemoteTabletSchemaAsync(
+ TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest
request) throws RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.fetchRemoteTabletSchemaAsync(request);
+ } catch (Throwable e) {
+ LOG.warn("fetch remote tablet schema catch a exception,
address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 830ed3c41a..819f835a90 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -692,6 +692,23 @@ message PGlobResponse {
repeated PFileInfo files = 2;
}
+message PTabletsLocation {
+ required string host = 1;
+ required int32 brpc_port = 2;
+ repeated int64 tablet_id = 3;
+}
+
+message PFetchRemoteSchemaRequest {
+ repeated PTabletsLocation tablet_location = 1;
+ required bool is_coordinator = 2;
+}
+
+message PFetchRemoteSchemaResponse {
+ optional PStatus status = 1;
+ // intermediate merged schema
+ optional TabletSchemaPB merged_schema = 2;
+}
+
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -731,5 +748,6 @@ service PBackendService {
rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns
(PGetTabletVersionsResponse);
rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns
(PReportStreamLoadStatusResponse);
rpc glob(PGlobRequest) returns (PGlobResponse);
+ rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns
(PFetchRemoteSchemaResponse);
};
diff --git a/regression-test/data/variant_p0/desc.out
b/regression-test/data/variant_p0/desc.out
new file mode 100644
index 0000000000..91a6f95609
--- /dev/null
+++ b/regression-test/data/variant_p0/desc.out
@@ -0,0 +1,101 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_1 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_2 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.ddd.aaa TINYINT Yes false NULL NONE
+v.ddd.mxmxm JSON Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_3 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.b JSON Yes false NULL NONE
+v.c.c SMALLINT Yes false NULL NONE
+v.c.e DOUBLE Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_5 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+
+-- !sql_6_1 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.ddd.aaa TINYINT Yes false NULL NONE
+v.ddd.mxmxm JSON Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_6_2 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_6_3 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.b JSON Yes false NULL NONE
+v.c.c SMALLINT Yes false NULL NONE
+v.c.e DOUBLE Yes false NULL NONE
+
+-- !sql_6 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.b JSON Yes false NULL NONE
+v.c.c SMALLINT Yes false NULL NONE
+v.c.e DOUBLE Yes false NULL NONE
+v.ddd.aaa TINYINT Yes false NULL NONE
+v.ddd.mxmxm JSON Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_7 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.b JSON Yes false NULL NONE
+v.c.c SMALLINT Yes false NULL NONE
+v.c.e DOUBLE Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_7_1 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.xxxx TEXT Yes false NULL NONE
+
+-- !sql_7_2 --
+k BIGINT Yes true NULL NONE
+v VARIANT Yes false NULL NONE
+v.a SMALLINT Yes false NULL NONE
+v.b JSON Yes false NULL NONE
+v.c.c SMALLINT Yes false NULL NONE
+v.c.e DOUBLE Yes false NULL NONE
+
+-- !sql_8 --
+k BIGINT Yes true NULL NONE
+v1 VARIANT Yes false NULL NONE
+v1.a SMALLINT Yes false NULL NONE
+v1.b JSON Yes false NULL NONE
+v1.c.c SMALLINT Yes false NULL NONE
+v1.c.e DOUBLE Yes false NULL NONE
+v1.oooo.xxxx.xxx TINYINT Yes false NULL NONE
+v2 VARIANT Yes false NULL NONE
+v2.a SMALLINT Yes false NULL NONE
+v2.xxxx TEXT Yes false NULL NONE
+v3 VARIANT Yes false NULL NONE
+v3.a SMALLINT Yes false NULL NONE
+v3.b JSON Yes false NULL NONE
+v3.c.c SMALLINT Yes false NULL NONE
+v3.c.e DOUBLE Yes false NULL NONE
+
diff --git a/regression-test/suites/variant_p0/desc.groovy
b/regression-test/suites/variant_p0/desc.groovy
new file mode 100644
index 0000000000..1058390f6e
--- /dev/null
+++ b/regression-test/suites/variant_p0/desc.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("regression_test_variant_desc", "variant_type_desc"){
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def create_table = { table_name, buckets="auto" ->
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS ${buckets}
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ """
+ }
+
+ def create_table_partition = { table_name, buckets="auto" ->
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ PARTITION BY RANGE(k)
+ (
+ PARTITION p1 VALUES LESS THAN (3000),
+ PARTITION p2 VALUES LESS THAN (50000),
+ PARTITION p3 VALUES LESS THAN (100000)
+ )
+ DISTRIBUTED BY HASH(k) BUCKETS ${buckets}
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ """
+ }
+
+ def set_be_config = { key, value ->
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ", err="
+ err)
+ }
+
+ try {
+ // sparse columns
+ def table_name = "sparse_columns"
+ create_table table_name
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95")
+ sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
+ union all select 0, '{"a": 1123}' as json_str union all select 0,
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
+ qt_sql_1 """desc ${table_name}"""
+ sql "truncate table sparse_columns"
+ sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
+ union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
+ qt_sql_2 """desc ${table_name}"""
+ sql "truncate table sparse_columns"
+
+ // no sparse columns
+ table_name = "no_sparse_columns"
+ create_table.call(table_name, "4")
+ sql "set enable_two_phase_read_opt = false;"
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "1")
+ sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
+ union all select 0, '{"a": 1123}' as json_str union all select 0,
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
+ qt_sql_3 """desc ${table_name}"""
+ sql "truncate table ${table_name}"
+
+ // always sparse column
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0")
+ sql """insert into ${table_name} select 0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
+ union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
+ qt_sql_5 """desc ${table_name}"""
+ sql "truncate table ${table_name}"
+
+ // partititon
+ table_name = "partition_data"
+ create_table_partition.call(table_name, "4")
+ sql "set enable_two_phase_read_opt = false;"
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95")
+ sql """insert into ${table_name} select 2500, '{"a": 1123, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null,
"oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
+ union all select 2500, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
+ sql """insert into ${table_name} select 45000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
+ union all select 45000, '{"a": 1123}' as json_str union all
select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from
numbers("number" = "4096") limit 4096 ;"""
+ sql """insert into ${table_name} values(95000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+ qt_sql_6_1 """desc ${table_name} partition p1"""
+ qt_sql_6_2 """desc ${table_name} partition p2"""
+ qt_sql_6_3 """desc ${table_name} partition p3"""
+ qt_sql_6 """desc ${table_name}"""
+ sql "truncate table ${table_name}"
+
+ // drop partition
+ table_name = "drop_partition"
+ create_table_partition.call(table_name, "4")
+ // insert into partition p1
+ sql """insert into ${table_name} values(2500, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+ // insert into partition p2
+ sql """insert into ${table_name} values(45000, '{"a": 11245, "xxxx" :
"kaana"}')"""
+ // insert into partition p3
+ sql """insert into ${table_name} values(95000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+ // drop p1
+ sql """alter table ${table_name} drop partition p1"""
+ qt_sql_7 """desc ${table_name}"""
+ qt_sql_7_1 """desc ${table_name} partition p2"""
+ qt_sql_7_2 """desc ${table_name} partition p3"""
+ sql "truncate table ${table_name}"
+
+ // more variant
+ table_name = "more_variant_table"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v1 variant,
+ v2 variant,
+ v3 variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 5
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ """
+ sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}',
'{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" :
7.111}}')"""
+ qt_sql_8 """desc ${table_name}"""
+ sql "truncate table ${table_name}"
+ } finally {
+ // reset flags
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95")
+ }
+}
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index 472cdcbcdc..67165e1371 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -77,222 +77,222 @@ suite("regression_test_variant", "variant_type"){
try {
def table_name = "simple_variant"
// // // 1. simple cases
+ create_table table_name
+ sql """insert into ${table_name} values (1, '[1]'),(1, '{"a" :
1}');"""
+ sql """insert into ${table_name} values (2, '[2]'),(1, '{"a" :
[[[1]]]}');"""
+ sql """insert into ${table_name} values (3, '3'),(1, '{"a" : 1}'),
(1, '{"a" : [1]}');"""
+ sql """insert into ${table_name} values (4, '"4"'),(1, '{"a" :
"1223"}');"""
+ sql """insert into ${table_name} values (5, '5.0'),(1, '{"a" :
[1]}');"""
+ sql """insert into ${table_name} values (6, '"[6]"'),(1, '{"a" :
["1", 2, 1.1]}');"""
+ sql """insert into ${table_name} values (7, '7'),(1, '{"a" : 1, "b"
: {"c" : 1}}');"""
+ sql """insert into ${table_name} values (8, '8.11111'),(1, '{"a" :
1, "b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${table_name} values (9, '"9999"'),(1, '{"a" : 1,
"b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${table_name} values (10, '1000000'),(1, '{"a" :
1, "b" : {"c" : [{"a" : 1}]}}');"""
+ sql """insert into ${table_name} values (11, '[123.0]'),(1, '{"a" :
1, "b" : {"c" : 1}}'),(1, '{"a" : 1, "b" : 10}');"""
+ sql """insert into ${table_name} values (12, '[123.2]'),(1, '{"a" :
1, "b" : 10}'),(1, '{"a" : 1, "b" : {"c" : 1}}');"""
+ qt_sql_1 "select k, v from simple_variant order by k, cast(v as
string)"
+ qt_sql_1_1 "select k, v, cast(v:b as string) from simple_variant where
length(cast(v:b as string)) > 4 order by k, cast(v as string)"
+ verify table_name
+
+ // 2. type confilct cases
+ table_name = "type_conflict_resolution"
+ create_table table_name
+ sql """insert into ${table_name} values (1, '{"c" : "123"}');"""
+ sql """insert into ${table_name} values (2, '{"c" : 123}');"""
+ sql """insert into ${table_name} values (3, '{"cc" : [123]}');"""
+ sql """insert into ${table_name} values (4, '{"cc" : [123.1]}');"""
+ sql """insert into ${table_name} values (5, '{"ccc" : 123}');"""
+ sql """insert into ${table_name} values (6, '{"ccc" : 123321}');"""
+ sql """insert into ${table_name} values (7, '{"cccc" : 123.0}');"""
+ sql """insert into ${table_name} values (8, '{"cccc" : 123.11}');"""
+ sql """insert into ${table_name} values (9, '{"ccccc" : [123]}');"""
+ sql """insert into ${table_name} values (10, '{"ccccc" :
[123456789]}');"""
+ sql """insert into ${table_name} values (11, '{"b" :
1111111111111111}');"""
+ sql """insert into ${table_name} values (12, '{"b" : 1.222222}');"""
+ sql """insert into ${table_name} values (13, '{"bb" : 1}');"""
+ sql """insert into ${table_name} values (14, '{"bb" :
214748364711}');"""
+ sql """insert into ${table_name} values (15, '{"A" : 1}');"""
+ qt_sql """select v from type_conflict_resolution order by k;"""
+ verify table_name
+
+ // 3. simple variant sub column select
+ table_name = "simple_select_variant"
+ create_table table_name
+ sql """insert into ${table_name} values (1, '{"A" : 123}');"""
+ sql """insert into ${table_name} values (2, '{"A" : 1}');"""
+ sql """insert into ${table_name} values (4, '{"A" : 123456}');"""
+ sql """insert into ${table_name} values (8, '{"A" :
123456789101112}');"""
+ qt_sql_2 "select v:A from ${table_name} order by cast(v:A as int)"
+ sql """insert into ${table_name} values (12, '{"AA" : [123456]}');"""
+ sql """insert into ${table_name} values (14, '{"AA" :
[123456789101112]}');"""
+ // qt_sql_3 "select v:AA from ${table_name} where size(v:AA) > 0 order
by k"
+ qt_sql_4 "select v:A, v:AA, v from ${table_name} order by k"
+ qt_sql_5 "select v:A, v:AA, v, v from ${table_name} where cast(v:A as
bigint) > 123 order by k"
+
+ sql """insert into ${table_name} values (16, '{"a" : 123.0, "A" :
191191, "c": 123}');"""
+ sql """insert into ${table_name} values (18, '{"a" : "123", "c" :
123456}');"""
+ sql """insert into ${table_name} values (20, '{"a" : 1.10111, "A" :
1800, "c" : [12345]}');"""
+ // sql """insert into ${table_name} values (12, '{"a" : [123]}, "c":
"123456"');"""
+ sql """insert into ${table_name} values (22, '{"a" : 1.1111, "A" :
17211, "c" : 111111}');"""
+ sql "sync"
+ qt_sql_6 "select v:a, v:A from ${table_name} order by cast(v:A as
bigint), k"
+ qt_sql_7 "select k, v:A from ${table_name} where cast(v:A as bigint)
>= 1 order by cast(v:A as bigint), k"
+
+ // TODO: if not cast, then v:a could return "123" or 123 which is none
determinately
+ qt_sql_8 "select cast(v:a as string), v:A from ${table_name} where
cast(v:a as json) is null order by k"
+ // qt_sql_9 "select cast(v:a as string), v:A from ${table_name} where
cast(v:A as json) is null order by k"
+
+ // !!! Not found cast function String to Float64
+ // qt_sql_10 "select v:a, v:A from ${table_name} where cast(v:a as
double) > 0 order by k"
+ qt_sql_11 "select v:A from ${table_name} where cast(v:A as bigint) > 1
order by k"
+
+ // ----%%----
+ qt_sql_12 "select v:A, v from ${table_name} where cast(v:A as bigint)
> 1 order by k"
+ // ----%%----
+ qt_sql_13 "select v:a, v:A from simple_select_variant where 1=1 and
cast(v:a as json) is null and cast(v:A as bigint) >= 1 order by k;"
+ qt_sql_14 """select v:a, v:A, v from simple_select_variant where
cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;"""
+
+ // !!! Not found cast function String to Float64
+ // qt_sql_15 "select v:a, v:A from ${table_name} where 1=1 and
cast(v:a as double) > 0 and v:A is not null order by k"
+ // qt_sql_16 "select v:a, v:A, v:c from ${table_name} where 1=1 and
cast(v:a as double) > 0 and v:A is not null order by k"
+
+ // TODO: if not cast, then v:a could return "123" or 123 which is none
determinately
+ qt_sql_17 "select cast(v:a as json), v:A, v, v:AA from
simple_select_variant where cast(v:A as bigint) is null order by k;"
+
+ sql """insert into simple_select_variant values (12, '{"oamama":
1.1}')"""
+ qt_sql_18 "select v:a, v:A, v, v:oamama from simple_select_variant
where cast(v:oamama as double) is null order by k;"
+ qt_sql_19 """select v:a, v:A, v, v:oamama from simple_select_variant
where cast(v:oamama as double) is not null order by k"""
+ qt_sql_20 """select v:A from simple_select_variant where cast(v:A as
bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;"""
+
+ // !!! Not found cast function String to Float64
+ // qt_sql_21 """select v:A, v:a, v from simple_select_variant where
cast(v:A as bigint) > 0 and cast(v:a as double) > 1 order by cast(v:A as
bigint);"""
+
+ sql "truncate table simple_select_variant"
+ sql """insert into simple_select_variant values (11, '{"x":
[123456]}');"""
+ sql """insert into simple_select_variant values (12, '{"x":
[123456789101112]}');"""
+ sql """insert into simple_select_variant values (12, '{"xxx" : 123,
"yyy" : 456}');"""
+ qt_sql_21_1 """select * from simple_select_variant where cast(v:x as
json) is null"""
+ qt_sql_21_2 """select cast(v:x as json) from simple_select_variant
where cast(v:x as json) is not null order by k;"""
+
+ // 4. multi variant in single table
+ table_name = "multi_variant"
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v1 variant,
+ v2 variant,
+ v3 variant
+
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY RANDOM BUCKETS 5
+ properties("replication_num" = "1", "disable_auto_compaction"
= "false");
+ """
+ sql """insert into ${table_name} values (1, '{"A" : 123}', '{"B" :
123}', '{"C" : 456}');"""
+ sql """insert into ${table_name} values (2, '{"C" : "123"}', '{"D" :
[123]}', '{"E" : 789}');"""
+ sql """insert into ${table_name} values (3, '{"C" : "123"}', '{"C" :
[123]}', '{"E" : "789"}');"""
+ sql "sync"
+ verify table_name
+ qt_sql_22 "select v1:A from multi_variant order by k;"
+ qt_sql_23 "select v2:D from multi_variant order by k;"
+ qt_sql_24 "select v2:C from multi_variant order by k;"
+
+ // 5. multi tablets concurrent load
+ table_name = "t_json_parallel"
+ create_table table_name
+ sql """INSERT INTO t_json_parallel SELECT *, '{"k1":1, "k2": "some",
"k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" =
"50000");"""
+ qt_sql_25 """ SELECT sum(cast(v:k1 as int)), sum(cast(v:k4 as
double)), sum(cast(json_extract(v:k5, "\$.[0].[0]") as int)) from
t_json_parallel; """
+ //50000 61700000 55000.00000000374 6150000
+ // 7. gh data
+ table_name = "ghdata"
+ create_table table_name
+ load_json_data.call(table_name, """${getS3Url() +
'/load/ghdata_sample.json'}""")
+ qt_sql_26 "select count() from ${table_name}"
+
+ // 8. json empty string
+ // table_name = "empty_string"
// create_table table_name
- // sql """insert into ${table_name} values (1, '[1]'),(1, '{"a" :
1}');"""
- // sql """insert into ${table_name} values (2, '[2]'),(1, '{"a" :
[[[1]]]}');"""
- // sql """insert into ${table_name} values (3, '3'),(1, '{"a" :
1}'), (1, '{"a" : [1]}');"""
- // sql """insert into ${table_name} values (4, '"4"'),(1, '{"a" :
"1223"}');"""
- // sql """insert into ${table_name} values (5, '5.0'),(1, '{"a" :
[1]}');"""
- // sql """insert into ${table_name} values (6, '"[6]"'),(1, '{"a" :
["1", 2, 1.1]}');"""
- // sql """insert into ${table_name} values (7, '7'),(1, '{"a" : 1,
"b" : {"c" : 1}}');"""
- // sql """insert into ${table_name} values (8, '8.11111'),(1, '{"a"
: 1, "b" : {"c" : [{"a" : 1}]}}');"""
- // sql """insert into ${table_name} values (9, '"9999"'),(1, '{"a" :
1, "b" : {"c" : [{"a" : 1}]}}');"""
- // sql """insert into ${table_name} values (10, '1000000'),(1, '{"a"
: 1, "b" : {"c" : [{"a" : 1}]}}');"""
- // sql """insert into ${table_name} values (11, '[123.0]'),(1, '{"a"
: 1, "b" : {"c" : 1}}'),(1, '{"a" : 1, "b" : 10}');"""
- // sql """insert into ${table_name} values (12, '[123.2]'),(1, '{"a"
: 1, "b" : 10}'),(1, '{"a" : 1, "b" : {"c" : 1}}');"""
- // qt_sql_1 "select k, v from simple_variant order by k, cast(v as
string)"
- // qt_sql_1_1 "select k, v, cast(v:b as string) from simple_variant
where length(cast(v:b as string)) > 4 order by k, cast(v as string)"
- // verify table_name
-
- // // 2. type confilct cases
- // table_name = "type_conflict_resolution"
- // create_table table_name
- // sql """insert into ${table_name} values (1, '{"c" : "123"}');"""
- // sql """insert into ${table_name} values (2, '{"c" : 123}');"""
- // sql """insert into ${table_name} values (3, '{"cc" : [123]}');"""
- // sql """insert into ${table_name} values (4, '{"cc" : [123.1]}');"""
- // sql """insert into ${table_name} values (5, '{"ccc" : 123}');"""
- // sql """insert into ${table_name} values (6, '{"ccc" : 123321}');"""
- // sql """insert into ${table_name} values (7, '{"cccc" : 123.0}');"""
- // sql """insert into ${table_name} values (8, '{"cccc" : 123.11}');"""
- // sql """insert into ${table_name} values (9, '{"ccccc" : [123]}');"""
- // sql """insert into ${table_name} values (10, '{"ccccc" :
[123456789]}');"""
- // sql """insert into ${table_name} values (11, '{"b" :
1111111111111111}');"""
- // sql """insert into ${table_name} values (12, '{"b" : 1.222222}');"""
- // sql """insert into ${table_name} values (13, '{"bb" : 1}');"""
- // sql """insert into ${table_name} values (14, '{"bb" :
214748364711}');"""
- // sql """insert into ${table_name} values (15, '{"A" : 1}');"""
- // qt_sql """select v from type_conflict_resolution order by k;"""
- // verify table_name
-
- // // 3. simple variant sub column select
- // table_name = "simple_select_variant"
- // create_table table_name
- // sql """insert into ${table_name} values (1, '{"A" : 123}');"""
- // sql """insert into ${table_name} values (2, '{"A" : 1}');"""
- // sql """insert into ${table_name} values (4, '{"A" : 123456}');"""
- // sql """insert into ${table_name} values (8, '{"A" :
123456789101112}');"""
- // qt_sql_2 "select v:A from ${table_name} order by cast(v:A as int)"
- // sql """insert into ${table_name} values (12, '{"AA" :
[123456]}');"""
- // sql """insert into ${table_name} values (14, '{"AA" :
[123456789101112]}');"""
- // // qt_sql_3 "select v:AA from ${table_name} where size(v:AA) > 0
order by k"
- // qt_sql_4 "select v:A, v:AA, v from ${table_name} order by k"
- // qt_sql_5 "select v:A, v:AA, v, v from ${table_name} where cast(v:A
as bigint) > 123 order by k"
-
- // sql """insert into ${table_name} values (16, '{"a" : 123.0, "A" :
191191, "c": 123}');"""
- // sql """insert into ${table_name} values (18, '{"a" : "123", "c" :
123456}');"""
- // sql """insert into ${table_name} values (20, '{"a" : 1.10111, "A"
: 1800, "c" : [12345]}');"""
- // // sql """insert into ${table_name} values (12, '{"a" : [123]},
"c": "123456"');"""
- // sql """insert into ${table_name} values (22, '{"a" : 1.1111, "A" :
17211, "c" : 111111}');"""
- // sql "sync"
- // qt_sql_6 "select v:a, v:A from ${table_name} order by cast(v:A as
bigint), k"
- // qt_sql_7 "select k, v:A from ${table_name} where cast(v:A as
bigint) >= 1 order by cast(v:A as bigint), k"
-
- // // TODO: if not cast, then v:a could return "123" or 123 which is
none determinately
- // qt_sql_8 "select cast(v:a as string), v:A from ${table_name} where
cast(v:a as json) is null order by k"
- // // qt_sql_9 "select cast(v:a as string), v:A from ${table_name}
where cast(v:A as json) is null order by k"
-
- // // !!! Not found cast function String to Float64
- // // qt_sql_10 "select v:a, v:A from ${table_name} where cast(v:a as
double) > 0 order by k"
- // qt_sql_11 "select v:A from ${table_name} where cast(v:A as bigint)
> 1 order by k"
-
- // // ----%%----
- // qt_sql_12 "select v:A, v from ${table_name} where cast(v:A as
bigint) > 1 order by k"
- // // ----%%----
- // qt_sql_13 "select v:a, v:A from simple_select_variant where 1=1 and
cast(v:a as json) is null and cast(v:A as bigint) >= 1 order by k;"
- // qt_sql_14 """select v:a, v:A, v from simple_select_variant where
cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;"""
-
- // // !!! Not found cast function String to Float64
- // // qt_sql_15 "select v:a, v:A from ${table_name} where 1=1 and
cast(v:a as double) > 0 and v:A is not null order by k"
- // // qt_sql_16 "select v:a, v:A, v:c from ${table_name} where 1=1 and
cast(v:a as double) > 0 and v:A is not null order by k"
-
- // // TODO: if not cast, then v:a could return "123" or 123 which is
none determinately
- // qt_sql_17 "select cast(v:a as json), v:A, v, v:AA from
simple_select_variant where cast(v:A as bigint) is null order by k;"
-
- // sql """insert into simple_select_variant values (12, '{"oamama":
1.1}')"""
- // qt_sql_18 "select v:a, v:A, v, v:oamama from simple_select_variant
where cast(v:oamama as double) is null order by k;"
- // qt_sql_19 """select v:a, v:A, v, v:oamama from
simple_select_variant where cast(v:oamama as double) is not null order by k"""
- // qt_sql_20 """select v:A from simple_select_variant where cast(v:A
as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;"""
-
- // // !!! Not found cast function String to Float64
- // // qt_sql_21 """select v:A, v:a, v from simple_select_variant where
cast(v:A as bigint) > 0 and cast(v:a as double) > 1 order by cast(v:A as
bigint);"""
-
- // sql "truncate table simple_select_variant"
- // sql """insert into simple_select_variant values (11, '{"x":
[123456]}');"""
- // sql """insert into simple_select_variant values (12, '{"x":
[123456789101112]}');"""
- // sql """insert into simple_select_variant values (12, '{"xxx" : 123,
"yyy" : 456}');"""
- // qt_sql_21_1 """select * from simple_select_variant where cast(v:x
as json) is null"""
- // qt_sql_21_2 """select cast(v:x as json) from
simple_select_variant where cast(v:x as json) is not null order by k;"""
-
- // // 4. multi variant in single table
- // table_name = "multi_variant"
- // sql "DROP TABLE IF EXISTS ${table_name}"
- // sql """
- // CREATE TABLE IF NOT EXISTS ${table_name} (
- // k bigint,
- // v1 variant,
- // v2 variant,
- // v3 variant
- //
- // )
- // DUPLICATE KEY(`k`)
- // DISTRIBUTED BY RANDOM BUCKETS 5
- // properties("replication_num" = "1",
"disable_auto_compaction" = "false");
- // """
- // sql """insert into ${table_name} values (1, '{"A" : 123}', '{"B" :
123}', '{"C" : 456}');"""
- // sql """insert into ${table_name} values (2, '{"C" : "123"}', '{"D"
: [123]}', '{"E" : 789}');"""
- // sql """insert into ${table_name} values (3, '{"C" : "123"}', '{"C"
: [123]}', '{"E" : "789"}');"""
- // sql "sync"
- // verify table_name
- // qt_sql_22 "select v1:A from multi_variant order by k;"
- // qt_sql_23 "select v2:D from multi_variant order by k;"
- // qt_sql_24 "select v2:C from multi_variant order by k;"
-
- // // 5. multi tablets concurrent load
- // table_name = "t_json_parallel"
- // create_table table_name
- // sql """INSERT INTO t_json_parallel SELECT *, '{"k1":1, "k2":
"some", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" =
"50000");"""
- // qt_sql_25 """ SELECT sum(cast(v:k1 as int)), sum(cast(v:k4 as
double)), sum(cast(json_extract(v:k5, "\$.[0].[0]") as int)) from
t_json_parallel; """
- // //50000 61700000 55000.00000000374 6150000
- // // 7. gh data
- // table_name = "ghdata"
- // create_table table_name
- // load_json_data.call(table_name, """${getS3Url() +
'/load/ghdata_sample.json'}""")
- // qt_sql_26 "select count() from ${table_name}"
-
- // // 8. json empty string
- // // table_name = "empty_string"
- // // create_table table_name
- // // sql """INSERT INTO empty_string VALUES (1, ''), (2, '{"k1": 1,
"k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');"""
- // // sql """INSERT INTO empty_string VALUES (3, null), (4, '{"k1": 1,
"k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');"""
- // // qt_sql_27 "SELECT * FROM ${table_name} ORDER BY k;"
-
- // // // 9. btc data
- // // table_name = "btcdata"
- // // create_table table_name
- // // load_json_data.call(table_name, """${getS3Url() +
'/load/btc_transactions.json'}""")
- // // qt_sql_28 "select count() from ${table_name}"
-
- // // 10. alter add variant
- // table_name = "alter_variant"
- // create_table table_name
- // sql """INSERT INTO ${table_name} VALUES (1, ''), (1, '{"k1": 1,
"k2": "v1"}'), (1, '{}'), (1, '{"k1": 2}');"""
- // sql "alter table ${table_name} add column v2 variant default null"
- // sql """INSERT INTO ${table_name} VALUES (1, '{"kyyyy" : "123"}',
'{"kxkxkxkx" : [123]}'), (1, '{"kxxxx" : 123}', '{"xxxxyyyy": 123}');"""
- // qt_sql_29_1 """select * from alter_variant where length(cast(v2 as
string)) > 2 order by k, cast(v as string), cast(v2 as string);"""
- // verify table_name
-
- // // 11. boolean values
- // table_name = "boolean_values"
- // create_table table_name
- // sql """INSERT INTO ${table_name} VALUES (1, ''), (2, '{"k1": true,
"k2": false}'), (3, '{}'), (4, '{"k1": false}');"""
- // verify table_name
+ // sql """INSERT INTO empty_string VALUES (1, ''), (2, '{"k1": 1,
"k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');"""
+ // sql """INSERT INTO empty_string VALUES (3, null), (4, '{"k1": 1,
"k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');"""
+ // qt_sql_27 "SELECT * FROM ${table_name} ORDER BY k;"
- // // 12. jsonb values
- // table_name = "jsonb_values"
- // create_table table_name
- // sql """insert into ${table_name} values (1, '{"a" : ["123", 123,
[123]]}')"""
- // sql """insert into ${table_name} values (2, '{"a" : ["123"]}')"""
- // sql """insert into ${table_name} values (3, '{"a" : "123"}')"""
- // sql """insert into ${table_name} values (4, '{"a" : 123456}')"""
- // sql """insert into ${table_name} values (5, '{"a" : [123, "123",
1.11111]}')"""
- // sql """insert into ${table_name} values (6, '{"a" : [123, 1.11,
"123"]}')"""
- // sql """insert into ${table_name} values(7, '{"a" : [123, {"xx" :
1}], "b" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
- // // TODO data bellow is invalid at present
- // // sql """insert into ${table_name} values (8, '{"a" : [123,
111........]}')"""
- // sql """insert into ${table_name} values (9, '{"a" : [123, {"a" :
1}]}')"""
- // sql """insert into ${table_name} values (10, '{"a" : [{"a" : 1},
123]}')"""
- // qt_sql_29 "select v:a from ${table_name} order by k"
- // // b? 7.111 [123,{"xx":1}] {"b":{"c":456,"e":7.111}} 456
- // qt_sql_30 "select v:b.e, v:a, v:b, v:b.c from jsonb_values where
cast(v:b.e as double) > 1;"
-
- // // 13. sparse columns
- // table_name = "sparse_columns"
+ // // 9. btc data
+ // table_name = "btcdata"
// create_table table_name
- // sql """insert into sparse_columns select 0, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
- // union all select 0, '{"a": 1123}' as json_str union all select
0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
- // qt_sql_30 """ select v from sparse_columns where v is not null and
json_extract(v, "\$") != "{}" order by cast(v as string) limit 10"""
- // sql "truncate table sparse_columns"
- // sql """insert into sparse_columns select 0, '{"a": 1123, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null,
"oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
- // union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
- // qt_sql_31 """ select v from sparse_columns where v is not null and
json_extract(v, "\$") != "{}" order by cast(v as string) limit 10"""
- // sql "truncate table sparse_columns"
-
- // // 12. streamload remote file
- // table_name = "logdata"
- // create_table.call(table_name, "4")
- // sql "set enable_two_phase_read_opt = false;"
- // // no sparse columns
- // set_be_config.call("ratio_of_defaults_as_sparse_column", "1")
- // load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
- // qt_sql_32 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
- // qt_sql_32_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
- // sql "truncate table ${table_name}"
-
- // // 0.95 default ratio
- // set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95")
- // load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
- // qt_sql_33 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
- // qt_sql_33_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
- // sql "truncate table ${table_name}"
-
- // // always sparse column
- // set_be_config.call("ratio_of_defaults_as_sparse_column", "0")
- // load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
- // qt_sql_34 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
- // sql "truncate table ${table_name}"
- // qt_sql_35 """select v->"\$.json.parseFailed" from logdata where k
= 162 and v->"\$.json.parseFailed" != 'null';"""
- // qt_sql_35_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
+ // load_json_data.call(table_name, """${getS3Url() +
'/load/btc_transactions.json'}""")
+ // qt_sql_28 "select count() from ${table_name}"
+
+ // 10. alter add variant
+ table_name = "alter_variant"
+ create_table table_name
+ sql """INSERT INTO ${table_name} VALUES (1, ''), (1, '{"k1": 1, "k2":
"v1"}'), (1, '{}'), (1, '{"k1": 2}');"""
+ sql "alter table ${table_name} add column v2 variant default null"
+ sql """INSERT INTO ${table_name} VALUES (1, '{"kyyyy" : "123"}',
'{"kxkxkxkx" : [123]}'), (1, '{"kxxxx" : 123}', '{"xxxxyyyy": 123}');"""
+ qt_sql_29_1 """select * from alter_variant where length(cast(v2 as
string)) > 2 order by k, cast(v as string), cast(v2 as string);"""
+ verify table_name
+
+ // 11. boolean values
+ table_name = "boolean_values"
+ create_table table_name
+ sql """INSERT INTO ${table_name} VALUES (1, ''), (2, '{"k1": true,
"k2": false}'), (3, '{}'), (4, '{"k1": false}');"""
+ verify table_name
+
+ // 12. jsonb values
+ table_name = "jsonb_values"
+ create_table table_name
+ sql """insert into ${table_name} values (1, '{"a" : ["123", 123,
[123]]}')"""
+ sql """insert into ${table_name} values (2, '{"a" : ["123"]}')"""
+ sql """insert into ${table_name} values (3, '{"a" : "123"}')"""
+ sql """insert into ${table_name} values (4, '{"a" : 123456}')"""
+ sql """insert into ${table_name} values (5, '{"a" : [123, "123",
1.11111]}')"""
+ sql """insert into ${table_name} values (6, '{"a" : [123, 1.11,
"123"]}')"""
+ sql """insert into ${table_name} values(7, '{"a" : [123, {"xx" : 1}],
"b" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+ // TODO data bellow is invalid at present
+ // sql """insert into ${table_name} values (8, '{"a" : [123,
111........]}')"""
+ sql """insert into ${table_name} values (9, '{"a" : [123, {"a" :
1}]}')"""
+ sql """insert into ${table_name} values (10, '{"a" : [{"a" : 1},
123]}')"""
+ qt_sql_29 "select v:a from ${table_name} order by k"
+ // b? 7.111 [123,{"xx":1}] {"b":{"c":456,"e":7.111}} 456
+ qt_sql_30 "select v:b.e, v:a, v:b, v:b.c from jsonb_values where
cast(v:b.e as double) > 1;"
+
+ // 13. sparse columns
+ table_name = "sparse_columns"
+ create_table table_name
+ sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
+ union all select 0, '{"a": 1123}' as json_str union all select 0,
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
+ qt_sql_30 """ select v from sparse_columns where v is not null and
json_extract(v, "\$") != "{}" order by cast(v as string) limit 10"""
+ sql "truncate table sparse_columns"
+ sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
+ union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
+ qt_sql_31 """ select v from sparse_columns where v is not null and
json_extract(v, "\$") != "{}" order by cast(v as string) limit 10"""
+ sql "truncate table sparse_columns"
+
+ // 12. streamload remote file
+ table_name = "logdata"
+ create_table.call(table_name, "4")
+ sql "set enable_two_phase_read_opt = false;"
+ // no sparse columns
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "1")
+ load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
+ qt_sql_32 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
+ qt_sql_32_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
+ sql "truncate table ${table_name}"
+
+ // 0.95 default ratio
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95")
+ load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
+ qt_sql_33 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
+ qt_sql_33_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
+ sql "truncate table ${table_name}"
+
+ // always sparse column
+ set_be_config.call("ratio_of_defaults_as_sparse_column", "0")
+ load_json_data.call(table_name, """${getS3Url() +
'/load/logdata.json'}""")
+ qt_sql_34 """ select v->"\$.json.parseFailed" from logdata where
v->"\$.json.parseFailed" != 'null' order by k limit 10;"""
+ sql "truncate table ${table_name}"
+ qt_sql_35 """select v->"\$.json.parseFailed" from logdata where k =
162 and v->"\$.json.parseFailed" != 'null';"""
+ qt_sql_35_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162;"""
// load gharchive
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]