zhangstar333 commented on code in PR #60719: URL: https://github.com/apache/doris/pull/60719#discussion_r2802127017
########## fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java: ########## @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties; +import org.apache.doris.datasource.property.fileformat.FileFormatProperties; +import org.apache.doris.datasource.property.fileformat.OrcFileFormatProperties; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TTVFTableSink; + +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * TVFTableSink is used for INSERT INTO tvf_name(properties) SELECT ... + * It writes query results to files via TVF (local/s3/hdfs). + * + * Property parsing reuses the same StorageProperties and FileFormatProperties + * infrastructure as the read-side TVF (SELECT * FROM s3/hdfs/local(...)). + */ +public class TVFTableSink extends DataSink { + private final PlanNodeId exchNodeId; + private final String tvfName; + private final Map<String, String> properties; + private final List<Column> cols; + private TDataSink tDataSink; + + public TVFTableSink(PlanNodeId exchNodeId, String tvfName, Map<String, String> properties, List<Column> cols) { + this.exchNodeId = exchNodeId; + this.tvfName = tvfName; + this.properties = properties; + this.cols = cols; + } + + public void bindDataSink() throws AnalysisException { + TTVFTableSink tSink = new TTVFTableSink(); + tSink.setTvfName(tvfName); + + // --- 1. Parse file format properties (reuse read-side FileFormatProperties) --- + // Make a mutable copy; FileFormatProperties.analyzeFileFormatProperties removes consumed keys. + Map<String, String> propsCopy = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + propsCopy.putAll(properties); + + String formatStr = propsCopy.getOrDefault("format", "csv").toLowerCase(); + propsCopy.remove("format"); + + // Also consume "compression_type" as alias for "compress_type" (write-side convention) + if (propsCopy.containsKey("compression_type") && !propsCopy.containsKey("compress_type")) { + propsCopy.put("compress_type", propsCopy.remove("compression_type")); + } + + FileFormatProperties fileFormatProps = FileFormatProperties.createFileFormatProperties(formatStr); + fileFormatProps.analyzeFileFormatProperties(propsCopy, true); + + TFileFormatType formatType = fileFormatProps.getFileFormatType(); + if (!Util.isCsvFormat(formatType) && formatType != TFileFormatType.FORMAT_PARQUET + && formatType != TFileFormatType.FORMAT_ORC) { + throw new AnalysisException("Unsupported format: " + formatType.name()); + } + tSink.setFileFormat(formatType); + + // Set file type based on TVF name + TFileType fileType = getFileType(tvfName); + tSink.setFileType(fileType); + + // --- 2. Parse storage/connection properties (reuse read-side StorageProperties) --- + Map<String, String> backendConnectProps; + if (tvfName.equals("local")) { + // Local TVF: pass properties as-is (same as LocalProperties.getBackendConfigProperties) + backendConnectProps = new java.util.HashMap<>(propsCopy); + } else { + // S3/HDFS: use StorageProperties to normalize connection property keys + // (e.g. "s3.endpoint" -> "AWS_ENDPOINT", "hadoop.username" -> hadoop config) + try { + StorageProperties storageProps = StorageProperties.createPrimary(propsCopy); + backendConnectProps = storageProps.getBackendConfigProperties(); + } catch (Exception e) { + throw new AnalysisException("Failed to parse storage properties: " + e.getMessage(), e); + } + } + + String filePath = properties.get("file_path"); + tSink.setFilePath(filePath); + + // Set normalized properties for BE + tSink.setProperties(backendConnectProps); + + // Set columns + List<TColumn> tColumns = new ArrayList<>(); + for (Column col : cols) { + tColumns.add(col.toThrift()); + } + tSink.setColumns(tColumns); + + // --- 3. Set format-specific sink options --- + if (fileFormatProps instanceof CsvFileFormatProperties) { + CsvFileFormatProperties csvProps = (CsvFileFormatProperties) fileFormatProps; + csvProps.checkSupportedCompressionType(true); + tSink.setColumnSeparator(csvProps.getColumnSeparator()); + tSink.setLineDelimiter(csvProps.getLineDelimiter()); + tSink.setCompressionType(csvProps.getCompressionType()); + } else if (fileFormatProps instanceof OrcFileFormatProperties) { + tSink.setCompressionType(((OrcFileFormatProperties) fileFormatProps).getOrcCompressionType()); + } + // Parquet compression is handled by BE via parquet writer options + + // --- 4. Set sink-specific options --- + // Max file size + String maxFileSizeStr = properties.get("max_file_size"); + if (maxFileSizeStr != null) { + tSink.setMaxFileSizeBytes(Long.parseLong(maxFileSizeStr)); + } + + // Delete existing files is handled by FE (InsertIntoTVFCommand), always tell BE not to delete + tSink.setDeleteExistingFiles(false); + + // Backend id for local TVF + String backendIdStr = properties.get("backend_id"); + if (backendIdStr != null) { + tSink.setBackendId(Long.parseLong(backendIdStr)); + } + + // Set hadoop config for hdfs/s3 (BE uses this for file writer creation) + if (!tvfName.equals("local")) { + tSink.setHadoopConfig(backendConnectProps); Review Comment: tSink.setProperties(backendConnectProps); seems properties is also use backendConnectProps? ########## be/src/vec/sink/writer/vtvf_table_writer.cpp: ########## @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vtvf_table_writer.h" + +#include <fmt/format.h> + +#include "common/status.h" +#include "io/file_factory.h" +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { + +VTVFTableWriter::VTVFTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_exprs, dep, fin_dep) { + _tvf_sink = t_sink.tvf_table_sink; +} + +Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + // Init profile counters + RuntimeProfile* writer_profile = profile->create_child("VTVFTableWriter", true, true); + _written_rows_counter = ADD_COUNTER(writer_profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(writer_profile, "WrittenDataBytes", TUnit::BYTES); + _file_write_timer = ADD_TIMER(writer_profile, "FileWriteTime"); + _writer_close_timer = ADD_TIMER(writer_profile, "FileWriterCloseTime"); + + _file_path = _tvf_sink.file_path; + _max_file_size_bytes = + _tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0; + + VLOG_DEBUG << "TVF table writer open, query_id=" << print_id(_state->query_id()) + << ", tvf_name=" << _tvf_sink.tvf_name << ", file_path=" << _tvf_sink.file_path + << ", file_format=" << _tvf_sink.file_format << ", file_type=" << _tvf_sink.file_type + << ", max_file_size_bytes=" << _max_file_size_bytes + << ", columns_count=" << (_tvf_sink.__isset.columns ? _tvf_sink.columns.size() : 0); + + return _create_next_file_writer(); +} + +Status VTVFTableWriter::write(RuntimeState* state, vectorized::Block& block) { + COUNTER_UPDATE(_written_rows_counter, block.rows()); + state->update_num_rows_load_total(block.rows()); + + { + SCOPED_TIMER(_file_write_timer); + RETURN_IF_ERROR(_vfile_writer->write(block)); + } + + _current_written_bytes = _vfile_writer->written_len(); + + // Auto-split if max file size is set + if (_max_file_size_bytes > 0) { Review Comment: seems no need this if check, have done in _create_new_file_if_exceed_size() function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
