morningman commented on code in PR #13959:
URL: https://github.com/apache/doris/pull/13959#discussion_r1013701332
##########
fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java:
##########
@@ -40,15 +40,15 @@
/**
* This scan node is used for table valued function.
Review Comment:
Change the comment
##########
be/src/service/internal_service.cpp:
##########
@@ -407,6 +410,78 @@ void
PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
+void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController*
controller,
+ const PFetchTableSchemaRequest*
request,
+ PFetchTableSchemaResult* result,
+ google::protobuf::Closure* done)
{
+ VLOG_RPC << "fetch table schema";
+ brpc::ClosureGuard closure_guard(done);
+ TFileScanRange file_scan_range;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
+ uint32_t len = request->file_scan_range().size();
+ st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+
+ if (file_scan_range.__isset.ranges == false) {
+ st = Status::InternalError("can not get TFileRangeDesc.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ if (file_scan_range.__isset.params == false) {
+ st = Status::InternalError("can not get TFileScanRangeParams.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+ const TFileScanRangeParams& params = file_scan_range.params;
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
+ std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+ std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
+ switch (params.format_type) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_DEFLATE: {
+ reader.reset(new vectorized::CsvReader(profile.get(), params, range,
file_slots));
+ break;
+ }
+ default:
+ st = Status::InternalError("Not supported file format in fetch table
schema: {}",
+ params.format_type);
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ std::unordered_map<std::string, TypeDescriptor> name_to_col_type;
+ std::vector<std::string> col_names;
+ std::vector<TypeDescriptor> col_types;
+ st = reader->get_parsered_schema(&col_names, &col_types);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ result->set_column_nums(col_names.size());
+ for (size_t idx = 0; idx < col_names.size(); ++idx) {
+ result->add_column_names(col_names[idx]);
+ }
+ for (size_t idx = 0; idx < col_types.size(); ++idx) {
+ PTypeDesc* type_desc = result->add_column_types();
+ col_types[idx].to_protobuf(type_desc);
+ }
+ LOG(INFO) << "complete parse, status: " << st;
Review Comment:
Remove this log, meaningless
##########
be/src/service/internal_service.cpp:
##########
@@ -407,6 +410,78 @@ void
PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
+void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController*
controller,
+ const PFetchTableSchemaRequest*
request,
+ PFetchTableSchemaResult* result,
+ google::protobuf::Closure* done)
{
+ VLOG_RPC << "fetch table schema";
+ brpc::ClosureGuard closure_guard(done);
+ TFileScanRange file_scan_range;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->file_scan_range().data());
+ uint32_t len = request->file_scan_range().size();
+ st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+ if (!st.ok()) {
+ LOG(WARNING) << "fetch table schema failed, errmsg=" <<
st.get_error_msg();
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+
+ if (file_scan_range.__isset.ranges == false) {
+ st = Status::InternalError("can not get TFileRangeDesc.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ if (file_scan_range.__isset.params == false) {
+ st = Status::InternalError("can not get TFileScanRangeParams.");
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+ const TFileScanRangeParams& params = file_scan_range.params;
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
+ std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+ std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
+ switch (params.format_type) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_DEFLATE: {
Review Comment:
Missing Parquet and ORC?
##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -202,6 +187,39 @@ public void init(Analyzer analyzer) throws UserException {
initParamCreateContexts(analyzer);
}
+ private void initHMSExternalTable(HMSExternalTable hmsTable) throws
UserException {
+ Preconditions.checkNotNull(hmsTable);
+
+ if (hmsTable.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '[%s].%s.%s' is not
supported", hmsTable.getDlaType(),
+ hmsTable.getDbName(), hmsTable.getName()));
+ }
+
+ FileScanProviderIf scanProvider;
+ switch (hmsTable.getDlaType()) {
+ case HUDI:
+ scanProvider = new HudiScanProvider(hmsTable, desc);
+ break;
+ case ICEBERG:
+ scanProvider = new IcebergScanProvider(hmsTable, desc);
+ break;
+ case HIVE:
+ scanProvider = new HiveScanProvider(hmsTable, desc);
+ break;
+ default:
+ throw new UserException("Unknown table type: " +
hmsTable.getDlaType());
+ }
+ this.scanProviders.add(scanProvider);
+ }
+
+ private void initFunctionGenTable(FunctionGenTable table,
ExternalFileTableValuedFunction tvf) {
+ Preconditions.checkNotNull(table);
+ FileScanProviderIf scanProvider;
Review Comment:
```suggestion
FileScanProviderIf scanProvider = new TVFScanProvider(table, desc,
tvf);;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]