github-actions[bot] commented on code in PR #61963:
URL: https://github.com/apache/doris/pull/61963#discussion_r3068448141


##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java:
##########
@@ -187,13 +188,19 @@ protected void 
doProcessReportExecStatus(TReportExecStatusParams params, SingleF
             }
         }
 
+        LoadContext loadContext = 
coordinatorContext.asLoadProcessor().loadContext;
+        long txnId = loadContext.getTransactionId();

Review Comment:
   `txnId` is read from `loadContext` before this method applies 
`params.getTxnId()` to the context. On the first terminal report from a 
fragment, `loadContext.getTransactionId()` is still unset, so 
`paimonCommitMessages` are dropped here and never reach `PaimonTransaction`. 
`PaimonTransaction.commit()` then sees an empty list and returns without 
publishing anything. Please derive the transaction id from `params` first (or 
move this block after the `params.isSetTxnId()` update) so the first commit 
payload report is not lost.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.mvcc.MvccUtil;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalTable;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TDataSinkType;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TPaimonTableSink;
+import org.apache.doris.thrift.TPaimonWriteShuffleMode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Paimon table sink
+ *
+ * This class materializes the TPaimonTableSink payload consumed by BE,
+ * including table location, write options, partition keys, bucket keys,
+ * shuffle mode and sink column names.
+ */
+public class PaimonTableSink extends BaseExternalTableDataSink {
+    private static final Logger LOG = 
LogManager.getLogger(PaimonTableSink.class);
+    private final PaimonExternalTable targetTable;
+    private static final Base64.Encoder BASE64_ENCODER = 
java.util.Base64.getUrlEncoder().withoutPadding();
+    private static final HashSet<TFileFormatType> supportedTypes = new 
HashSet<TFileFormatType>() {{
+            add(TFileFormatType.FORMAT_ORC);
+            add(TFileFormatType.FORMAT_PARQUET);
+        }};
+
+    public PaimonTableSink(PaimonExternalTable targetTable) {
+        super();
+        this.targetTable = targetTable;
+    }
+
+    // List of columns to be written to the sink, used to populate columnNames 
in Thrift
+    private List<Column> cols;
+
+    public void setCols(List<Column> cols) {
+        this.cols = cols;
+    }
+
+    @Override
+    public String getExplainString(String prefix, TExplainLevel explainLevel) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append(prefix).append("PAIMON TABLE SINK\n");
+        if (explainLevel == TExplainLevel.BRIEF) {
+            return strBuilder.toString();
+        }
+        return strBuilder.toString();
+    }
+
+    @Override
+    protected Set<TFileFormatType> supportedFileFormatTypes() {
+        return supportedTypes;
+    }
+
+    @Override
+    public void bindDataSink(Optional<InsertCommandContext> insertCtx) throws 
AnalysisException {
+        TPaimonTableSink tSink = new TPaimonTableSink();
+
+        // basic identifiers
+        tSink.setCatalogName(targetTable.getCatalog().getName());
+        tSink.setDbName(targetTable.getDbName());
+        tSink.setTbName(targetTable.getName());
+
+        Map<String, String> catalogProps = 
targetTable.getCatalog().getCatalogProperty().getHadoopProperties();
+        Map<String, String> options = new HashMap<>();
+        options.putAll(catalogProps);
+        String warehouse = ((PaimonExternalCatalog) 
targetTable.getCatalog()).getPaimonOptionsMap()
+                .get(CatalogOptions.WAREHOUSE.key());
+        String defaultFsName = resolveDefaultFsName(warehouse);
+        if (defaultFsName != null && !defaultFsName.isEmpty()) {
+            String currentDefaultFs = options.get("fs.defaultFS");
+            if (currentDefaultFs == null || currentDefaultFs.isEmpty() || 
currentDefaultFs.startsWith("file:/")) {
+                options.put("fs.defaultFS", defaultFsName);
+            }
+        }
+        if (insertCtx.isPresent() && insertCtx.get() instanceof 
BaseExternalTableInsertCommandContext) {
+            BaseExternalTableInsertCommandContext ctx = 
(BaseExternalTableInsertCommandContext) insertCtx.get();
+            if (ctx.getTxnId() > 0) {
+                options.put("doris.commit_identifier", 
String.valueOf(ctx.getTxnId()));
+            }
+            if (ctx.getCommitUser() != null && !ctx.getCommitUser().isEmpty()) 
{
+                options.put("doris.commit_user", ctx.getCommitUser());
+            }
+        }
+
+        if (ConnectContext.get() != null) {
+            options.put("target-file-size",
+                    
String.valueOf(ConnectContext.get().getSessionVariable().paimonTargetFileSize));
+            options.put("write-buffer-size",
+                    
String.valueOf(ConnectContext.get().getSessionVariable().paimonWriteBufferSize));
+            String hadoopUser = options.get("hadoop.username");
+            if (hadoopUser == null || hadoopUser.isEmpty()) {
+                hadoopUser = options.get("hadoop.user.name");
+            }
+            if (hadoopUser == null || hadoopUser.isEmpty()) {
+                hadoopUser = "hadoop";
+            }
+            options.put("hadoop.user.name", hadoopUser);
+            options.put("hadoop.username", hadoopUser);
+        }
+
+        String tableLocation = null;
+        org.apache.paimon.table.Table paimonTable =
+                
targetTable.getPaimonTable(MvccUtil.getSnapshotFromContext(targetTable));
+        if (paimonTable instanceof org.apache.paimon.table.FileStoreTable) {
+            tableLocation = ((org.apache.paimon.table.FileStoreTable) 
paimonTable).location().toString();
+        }
+        tableLocation = normalizeTableLocation(tableLocation);
+        if (tableLocation == null || tableLocation.isEmpty()) {
+            if (warehouse != null && !warehouse.isEmpty()) {
+                String base = warehouse.endsWith("/") ? warehouse : warehouse 
+ "/";
+                tableLocation = base + targetTable.getDbName() + ".db/" + 
targetTable.getName() + "/";
+            }
+        } else if (defaultFsName != null && !defaultFsName.isEmpty() && 
tableLocation.startsWith("hdfs://")) {

Review Comment:
   This rewrites every explicit `hdfs://<authority>/...` table location to 
`defaultFsName + path`, which discards the authority returned by Paimon 
metadata. If the warehouse authority differs from the table authority, FE will 
send BE to write to the wrong filesystem location. `FileStoreTable.location()` 
is already authoritative here; only the authority-missing case should be 
normalized, not the fully-qualified `hdfs://...` case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to