This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 65b382c17d4 [Fix](Outfile) Fixed the problem that the concurrent
Outfile wrote multiple Success files (#33016)
65b382c17d4 is described below
commit 65b382c17d4077fdeb76901c6d2fcb006176d7a3
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue Apr 9 14:51:44 2024 +0800
[Fix](Outfile) Fixed the problem that the concurrent Outfile wrote multiple
Success files (#33016)
**Problem:**
When we enable concurrent `Outfile` and specify the `success_file_name`, a
SUCCESS file is written for each BE instance, which is not what we expected
**solution:**
Therefore, we added a new RPC request that when the Outfile was completed,
the FE sent an RPC to the Be request to write one Success file.
---
be/src/service/internal_service.cpp | 80 ++++++++++++++++++++++
be/src/service/internal_service.h | 5 ++
be/src/vec/sink/writer/vfile_result_writer.cpp | 35 ----------
be/src/vec/sink/writer/vfile_result_writer.h | 2 -
.../org/apache/doris/analysis/OutFileClause.java | 8 +++
.../doris/nereids/glue/LogicalPlanAdapter.java | 17 +++++
.../main/java/org/apache/doris/qe/Coordinator.java | 8 +++
.../java/org/apache/doris/qe/StmtExecutor.java | 78 +++++++++++++++++++++
.../org/apache/doris/rpc/BackendServiceClient.java | 5 ++
.../org/apache/doris/rpc/BackendServiceProxy.java | 12 ++++
gensrc/proto/internal_service.proto | 12 ++++
.../suites/export_p0/test_outfile.groovy | 5 +-
.../suites/nereids_p0/outfile/test_outfile.groovy | 6 +-
13 files changed, 230 insertions(+), 43 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index ab8b2f9cb66..adcd81689c1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -26,6 +26,7 @@
#include <butil/iobuf.h>
#include <fcntl.h>
#include <fmt/core.h>
+#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
@@ -615,6 +616,85 @@ void
PInternalService::fetch_data(google::protobuf::RpcController* controller,
}
}
+void PInternalService::outfile_write_success(google::protobuf::RpcController*
controller,
+ const
POutfileWriteSuccessRequest* request,
+ POutfileWriteSuccessResult*
result,
+ google::protobuf::Closure* done) {
+ bool ret = _heavy_work_pool.try_offer([request, result, done]() {
+ VLOG_RPC << "outfile write success file";
+ brpc::ClosureGuard closure_guard(done);
+ TResultFileSink result_file_sink;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const
uint8_t*)(request->result_file_sink().data());
+ uint32_t len = request->result_file_sink().size();
+ st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
+ if (!st.ok()) {
+ LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+
+ TResultFileSinkOptions file_options = result_file_sink.file_options;
+ std::stringstream ss;
+ ss << file_options.file_path << file_options.success_file_name;
+ std::string file_name = ss.str();
+ if (result_file_sink.storage_backend_type ==
TStorageBackendType::LOCAL) {
+ // For local file writer, the file_path is a local dir.
+ // Here we do a simple security verification by checking whether
the file exists.
+ // Because the file path is currently arbitrarily specified by the
user,
+ // Doris is not responsible for ensuring the correctness of the
path.
+ // This is just to prevent overwriting the existing file.
+ bool exists = true;
+ st = io::global_local_filesystem()->exists(file_name, &exists);
+ if (!st.ok()) {
+ LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ if (exists) {
+ st = Status::InternalError("File already exists: {}",
file_name);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "outfile write success filefailed, errmsg=" <<
st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+
+ auto&& res = FileFactory::create_file_writer(
+
FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
+ ExecEnv::GetInstance(), file_options.broker_addresses,
+ file_options.broker_properties, file_name);
+ using T = std::decay_t<decltype(res)>;
+ if (!res.has_value()) [[unlikely]] {
+ st = std::forward<T>(res).error();
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+
+ std::unique_ptr<doris::io::FileWriter> _file_writer_impl =
std::forward<T>(res).value();
+ // must write somthing because s3 file writer can not writer empty file
+ st = _file_writer_impl->append({"success"});
+ if (!st.ok()) {
+ LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ st = _file_writer_impl->close();
+ if (!st.ok()) {
+ LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ });
+ if (!ret) {
+ offer_failed(result, done, _heavy_work_pool);
+ return;
+ }
+}
+
void PInternalService::fetch_table_schema(google::protobuf::RpcController*
controller,
const PFetchTableSchemaRequest*
request,
PFetchTableSchemaResult* result,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index bdb91a0bdf7..fdf3a183d96 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -96,6 +96,11 @@ public:
void fetch_data(google::protobuf::RpcController* controller, const
PFetchDataRequest* request,
PFetchDataResult* result, google::protobuf::Closure* done)
override;
+ void outfile_write_success(google::protobuf::RpcController* controller,
+ const POutfileWriteSuccessRequest* request,
+ POutfileWriteSuccessResult* result,
+ google::protobuf::Closure* done) override;
+
void fetch_table_schema(google::protobuf::RpcController* controller,
const PFetchTableSchemaRequest* request,
PFetchTableSchemaResult* result,
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index f61f65b998f..811658afa4d 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -99,37 +99,6 @@ void VFileResultWriter::_init_profile(RuntimeProfile*
parent_profile) {
_written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes",
TUnit::BYTES);
}
-Status VFileResultWriter::_create_success_file() {
- std::string file_name;
- RETURN_IF_ERROR(_get_success_file_name(&file_name));
- _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
- FileFactory::convert_storage_type(_storage_type),
_state->exec_env(),
- _file_opts->broker_addresses, _file_opts->broker_properties,
file_name));
- // must write somthing because s3 file writer can not writer empty file
- RETURN_IF_ERROR(_file_writer_impl->append({"success"}));
- return _file_writer_impl->close();
-}
-
-Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
- std::stringstream ss;
- ss << _file_opts->file_path << _file_opts->success_file_name;
- *file_name = ss.str();
- if (_storage_type == TStorageBackendType::LOCAL) {
- // For local file writer, the file_path is a local dir.
- // Here we do a simple security verification by checking whether the
file exists.
- // Because the file path is currently arbitrarily specified by the
user,
- // Doris is not responsible for ensuring the correctness of the path.
- // This is just to prevent overwriting the existing file.
- bool exists = true;
- RETURN_IF_ERROR(io::global_local_filesystem()->exists(*file_name,
&exists));
- if (exists) {
- return Status::InternalError("File already exists: {}",
*file_name);
- }
- }
-
- return Status::OK();
-}
-
Status VFileResultWriter::_create_next_file_writer() {
std::string file_name;
RETURN_IF_ERROR(_get_next_file_name(&file_name));
@@ -275,10 +244,6 @@ Status VFileResultWriter::_close_file_writer(bool done) {
RETURN_IF_ERROR(_create_next_file_writer());
} else {
// All data is written to file, send statistic result
- if (_file_opts->success_file_name != "") {
- // write success file, just need to touch an empty file
- RETURN_IF_ERROR(_create_success_file());
- }
if (_output_block == nullptr) {
RETURN_IF_ERROR(_send_result());
} else {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 29fd6d89cd3..72ba90cd015 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -79,10 +79,8 @@ private:
Status _create_file_writer(const std::string& file_name);
Status _create_next_file_writer();
- Status _create_success_file();
// get next export file name
Status _get_next_file_name(std::string* file_name);
- Status _get_success_file_name(std::string* file_name);
void _get_file_url(std::string* file_url);
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next
file.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 8c975d4ea0d..0314b225e67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -778,6 +778,14 @@ public class OutFileClause {
return fileFormatType == TFileFormatType.FORMAT_ORC;
}
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public String getSuccessFileName() {
+ return successFileName;
+ }
+
@Override
public OutFileClause clone() {
return new OutFileClause(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
index 72dd6c11e9b..8fdcbb1198d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
@@ -24,11 +24,14 @@ import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import com.google.common.collect.Lists;
+
import java.util.ArrayList;
import java.util.List;
@@ -66,6 +69,20 @@ public class LogicalPlanAdapter extends StatementBase
implements Queriable {
@Override
public OutFileClause getOutFileClause() {
+ if (logicalPlan instanceof LogicalFileSink) {
+ LogicalFileSink fileSink = (LogicalFileSink) logicalPlan;
+ OutFileClause outFile = new OutFileClause(
+ fileSink.getFilePath(),
+ fileSink.getFormat(),
+ fileSink.getProperties()
+ );
+ try {
+ outFile.analyze(null, Lists.newArrayList(),
Lists.newArrayList());
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage(), e.getCause());
+ }
+ return outFile;
+ }
return null;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 40400844675..82b7cef3607 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -4086,6 +4086,14 @@ public class Coordinator implements CoordInterface {
return result;
}
+ public Map<PlanFragmentId, FragmentExecParams> getFragmentExecParamsMap() {
+ return fragmentExecParamsMap;
+ }
+
+ public List<PlanFragment> getFragments() {
+ return fragments;
+ }
+
// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
public TUniqueId targetFragmentInstanceId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ac5e3221355..5bbe01ef793 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -65,6 +65,8 @@ import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StmtRewriter;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.SwitchStmt;
import org.apache.doris.analysis.TableName;
@@ -82,6 +84,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@@ -112,6 +115,7 @@ import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
@@ -148,19 +152,26 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.proto.InternalService.POutfileWriteSuccessRequest;
+import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.Coordinator.FragmentExecParams;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
@@ -168,6 +179,7 @@ import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
+import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
@@ -185,6 +197,8 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TResultFileSink;
+import org.apache.doris.thrift.TResultFileSinkOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TSyncLoadForTabletsRequest;
@@ -207,6 +221,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
import java.io.IOException;
import java.io.StringReader;
@@ -220,6 +235,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
@@ -1800,6 +1816,9 @@ public class StmtExecutor {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(),
exprToType(queryStmt.getResultExprs()));
} else {
+ if
(!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
+
outfileWriteSuccess(queryStmt.getOutFileClause());
+ }
sendFields(OutFileClause.RESULT_COL_NAMES,
OutFileClause.RESULT_COL_TYPES);
}
isSendFields = true;
@@ -1857,6 +1876,65 @@ public class StmtExecutor {
}
}
+ private void outfileWriteSuccess(OutFileClause outFileClause) throws
Exception {
+ // 1. set TResultFileSinkOptions
+ TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();
+
+ // 2. set brokerNetAddress
+ List<PlanFragment> fragments = coord.getFragments();
+ Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap =
coord.getFragmentExecParamsMap();
+ PlanFragmentId topId = fragments.get(0).getFragmentId();
+ FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
+ DataSink topDataSink = topParams.fragment.getSink();
+ TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
+ if (topDataSink instanceof ResultFileSink
+ && ((ResultFileSink) topDataSink).getStorageType() ==
StorageBackend.StorageType.BROKER) {
+ // set the broker address for OUTFILE sink
+ ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
+ FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
+ .getBroker(topResultFileSink.getBrokerName(),
execBeAddr.getHostname());
+ sinkOptions.setBrokerAddresses(Lists.newArrayList(new
TNetworkAddress(broker.host, broker.port)));
+ }
+
+ // 3. set TResultFileSink properties
+ TResultFileSink sink = new TResultFileSink();
+ sink.setFileOptions(sinkOptions);
+ StorageType storageType = outFileClause.getBrokerDesc() == null
+ ? StorageBackend.StorageType.LOCAL :
outFileClause.getBrokerDesc().getStorageType();
+ sink.setStorageBackendType(storageType.toThrift());
+
+ // 4. get BE
+ TNetworkAddress address = null;
+ for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ if (be.isAlive()) {
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+ break;
+ }
+ }
+ if (address == null) {
+ throw new AnalysisException("No Alive backends");
+ }
+
+ // 5. send rpc to BE
+ POutfileWriteSuccessRequest request =
POutfileWriteSuccessRequest.newBuilder()
+ .setResultFileSink(ByteString.copyFrom(new
TSerializer().serialize(sink))).build();
+ Future<POutfileWriteSuccessResult> future =
BackendServiceProxy.getInstance()
+ .outfileWriteSuccessAsync(address, request);
+ InternalService.POutfileWriteSuccessResult result = future.get();
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ String errMsg;
+ if (code != TStatusCode.OK) {
+ if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+ errMsg = result.getStatus().getErrorMsgsList().get(0);
+ } else {
+ errMsg = "Outfile write success file failed. backend address: "
+ + NetUtils
+ .getHostPortInAccessibleFormat(address.getHostname(),
address.getPort());
+ }
+ throw new AnalysisException(errMsg);
+ }
+ }
+
private void handleTransactionStmt() throws Exception {
if (context.getConnectType() == ConnectType.MYSQL) {
// Every time set no send flag and clean all data in buffer
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 3f4bb846767..50afc7c96bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -106,6 +106,11 @@ public class BackendServiceClient {
return stub.fetchArrowFlightSchema(request);
}
+ public Future<InternalService.POutfileWriteSuccessResult>
outfileWriteSuccessAsync(
+ InternalService.POutfileWriteSuccessRequest request) {
+ return stub.outfileWriteSuccess(request);
+ }
+
public Future<InternalService.PFetchTableSchemaResult>
fetchTableStructureAsync(
InternalService.PFetchTableSchemaRequest request) {
return stub.fetchTableSchema(request);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5c8251f5e12..e541b0eb689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -325,6 +325,18 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.POutfileWriteSuccessResult>
outfileWriteSuccessAsync(TNetworkAddress address,
+ InternalService.POutfileWriteSuccessRequest request) throws
RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.outfileWriteSuccessAsync(request);
+ } catch (Throwable e) {
+ LOG.warn("outfile write success file catch a exception,
address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public Future<InternalService.PFetchTableSchemaResult>
fetchTableStructureAsync(
TNetworkAddress address, InternalService.PFetchTableSchemaRequest
request) throws RpcException {
try {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index b6579323aae..1a6dad5521b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -691,6 +691,17 @@ message PFetchTableSchemaResult {
repeated PTypeDesc column_types = 4;
}
+message POutfileWriteSuccessRequest {
+ // optional string file_path = 1;
+ // optional string success_file_name = 2;
+ // map<string, string> broker_properties = 4; // only for remote file
+ optional bytes result_file_sink = 1;
+}
+
+message POutfileWriteSuccessResult {
+ optional PStatus status = 1;
+}
+
message PJdbcTestConnectionRequest {
optional bytes jdbc_table = 1;
optional int32 jdbc_table_type = 2;
@@ -954,6 +965,7 @@ service PBackendService {
rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);
rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns
(PTabletWriteSlaveResult);
rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest)
returns (PTabletWriteSlaveDoneResult);
+ rpc outfile_write_success(POutfileWriteSuccessRequest) returns
(POutfileWriteSuccessResult);
rpc fetch_table_schema(PFetchTableSchemaRequest) returns
(PFetchTableSchemaResult);
rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns
(PGetFileCacheMetaResponse);
diff --git a/regression-test/suites/export_p0/test_outfile.groovy
b/regression-test/suites/export_p0/test_outfile.groovy
index 76c5bb688c4..8b60803e185 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -210,9 +210,8 @@ suite("test_outfile") {
(100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
sql "set enable_parallel_outfile = true;"
sql """select * from select_into_file into outfile
"file://${outFilePath}/";"""
- // TODO: parallel outfile is not compatible with success_file_name.
remove this case temporary
- // sql "set enable_parallel_outfile = true;"
- // sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+
+ sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
} finally {
try_sql("DROP TABLE IF EXISTS select_into_file")
File path = new File(outFilePath)
diff --git a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
index 1cfdd7b62ce..f256df91809 100644
--- a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
+++ b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
@@ -236,9 +236,9 @@ suite("test_outfile") {
(100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
sql "set enable_parallel_outfile = true;"
sql """select * from select_into_file into outfile
"file://${outFilePath}/";"""
- // TODO: parallel outfile is not compatible with success_file_name.
remove this case temporary
- // sql "set enable_parallel_outfile = true;"
- // sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+
+ sql "set enable_parallel_outfile = true;"
+ sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
} finally {
try_sql("DROP TABLE IF EXISTS select_into_file")
File path = new File(outFilePath)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]