This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6ceedd1f5 [fix](BE) SIGSEGV in MergeSorterState when writing large 
sorted Iceberg tables (#64126)
7a6ceedd1f5 is described below

commit 7a6ceedd1f5abced04a21e41a19b519426a156aa
Author: nsivarajan <[email protected]>
AuthorDate: Thu Jun 11 07:56:57 2026 +0530

    [fix](BE) SIGSEGV in MergeSorterState when writing large sorted Iceberg 
tables (#64126)
    
    ### What problem does this PR solve?
    
    Issue Number: close #64124
    
    Related PR: #xxx
    
    Problem Summary:
    
    BE crashes with SIGSEGV when inserting into an Iceberg table with a sort
    order (`ORDER BY`) if the data volume requires more than one file flush
    within a single write task.
    
    **Root cause:** `MergeSorterState::reset()` did not clear `_queue`
    between flush cycles. After the first file was flushed and the sorter
    reset, exhausted cursors from the previous merge tree remained in the
    queue. On the next flush `update_batch_size()` dereferenced those stale
    cursor pointers → null pointer dereference → SIGSEGV.
    
    Two additional bugs fixed in the same path:
    - `_num_rows` was not reset, causing row counts to accumulate across
    flush cycles
    - `_update_spill_block_batch_row_count()` was called after
    `append_block()`, which clears the source block after copying, so spill
    batch sizing was always computed from an empty block
    
      Stack trace:
    
    ```
    2026-05-04 10:17:37 INFO  [Thread-7] c.a.j.j.NativeLogger:34 - 
JdoAliyunMetaClient.cpp:236] Successfully get Secrets with AccessKeyId: 
STS.NZfRo9uGMPd4VqBKyrkcNDYkh from 
http://100.100.100.200/latest/meta-data/ram/security-credentials/dev-eval-decoupled-dev-role
    2026-05-04 10:17:37 INFO  [Thread-7] c.a.j.j.NativeLogger:34 - 
JdoAuthStsCredentialsProvider.cpp:136] Update auth retry 0: key updated = 1, 
secret updated = 1, token updated = 1
    2026-05-04 10:17:37 INFO  [Thread-7] c.a.j.j.NativeLogger:34 - 
JdoAuthStsCredentialsProvider.cpp:162] Auth updated, current time: 
1777904257734, updated time: 1777904257734, force update: 1, accessKeyId: 
STS.NZfRo9uGMPd4VqBKyrkcNDYkh, time elapsed: 7.702743MS
    2026-05-04 10:17:37 INFO  [Thread-20] c.a.j.c.FsStats:18 - cmd=open, 
src=oss://doris-eval-coupled-dev/iceberg/warehouse/iceberg_tt_test.db/store_sales_tt/metadata/b0469cad-c8dd-4ff7-9601-49a82905457b-m3.avro,
 dst=null, size=0, parameter=hasGetFileLength:true,readProfile:columnar, 
time-in-ms=556, version=6.10.4-nextarch
    2026-05-04 10:17:37 INFO  [Thread-20] c.a.j.c.FsStats:18 - cmd=read, 
src=oss://doris-eval-coupled-dev/iceberg/warehouse/iceberg_tt_test.db/store_sales_tt/metadata/b0469cad-c8dd-4ff7-9601-49a82905457b-m3.avro,
 dst=null, size=93145, 
parameter=byteReaded:93145,byteNeeded:93145,readTimes:15,BackendRequestCountTotal:0,uuid:6583b276-d663-4cc7-ae9b-7eceb6effcda,
 time-in-ms=65, version=6.10.4-nextarch
    2026-05-04 10:18:02 INFO  [Thread-22] c.a.j.o.a.HadoopLoginUserInfo:49 - 
User: hadoop, authMethod: SIMPLE, ugi: hadoop (auth:SIMPLE)
    2026-05-04 10:18:02 INFO  [Thread-22] c.a.j.c.JindoHadoopSystem:257 - 
Initialized native file system: true, userName: hadoop, authMethod: SIMPLE
    2026-05-04 10:18:02 INFO  [Thread-22] c.a.j.c.FsStats:18 - cmd=open, 
src=oss://doris-eval-coupled-dev/iceberg/warehouse/iceberg_tt_test.db/store_sales_tt/metadata/b0469cad-c8dd-4ff7-9601-49a82905457b-m0.avro,
 dst=null, size=0, parameter=hasGetFileLength:true,readProfile:columnar, 
time-in-ms=7, version=6.10.4-nextarch
    2026-05-04 10:18:02 INFO  [Thread-22] c.a.j.c.FsStats:18 - cmd=read, 
src=oss://doris-eval-coupled-dev/iceberg/warehouse/iceberg_tt_test.db/store_sales_tt/metadata/b0469cad-c8dd-4ff7-9601-49a82905457b-m0.avro,
 dst=null, size=253833, 
parameter=byteReaded:253833,byteNeeded:253833,readTimes:40,BackendRequestCountTotal:0,uuid:c820f3c3-fbd2-4618-91ae-b60b6a0659f7,
 time-in-ms=10, version=6.10.4-nextarch
    *** Query id: c3db8feece39405d-8ac88ac72a7487d5 ***
    *** is nereids: 1 ***
    *** tablet id: 0 ***
    *** Aborted at 1778338941 (unix time) try "date -d @1778338941" if you are 
using GNU date ***
    *** Current BE git commitID: 635a6e1c302 ***
    *** SIGSEGV unknown detail explain (@0x0) received by PID 203408 (TID 
463290 OR 0x6fe5e7d95700) from PID 0; stack trace: ***
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/common/signal_handler.h:420
     1# PosixSignals::chained_handler(int, siginfo_t*, void*) [clone .part.10] 
in /usr/java/applejdk-17.0.9.9.1/lib/server/[libjvm.so](http://libjvm.so/)
     2# JVM_handle_linux_signal in 
/usr/java/applejdk-17.0.9.9.1/lib/server/[libjvm.so](http://libjvm.so/)
     3# 0x0000700258ADBCF0 in /lib64/[libpthread.so](http://libpthread.so/).0
     4# doris::SortingQueueBatch<doris::MergeSortCursor>::update_batch_size() 
at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sort/sort_cursor.h:430
     5# doris::MergeSorterState::_merge_sort_read_impl(int, doris::Block*, 
bool*) at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sort/sorter.cpp:123
     6# doris::FullSorter::get_next(doris::RuntimeState*, doris::Block*, bool*) 
at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sort/sorter.cpp:270
     7# doris::VIcebergSortWriter::_write_sorted_data() at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp:184
     8# doris::VIcebergSortWriter::_flush_to_file() at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp:170
     9# doris::VIcebergSortWriter::write(doris::Block&) at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp:72
    10# doris::VIcebergTableWriter::_write_prepared_block(doris::Block&) at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp:329
    11# doris::VIcebergTableWriter::write(doris::RuntimeState*, doris::Block&) 
at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp:213
    12# doris::AsyncResultWriter::process_block(doris::RuntimeState*, 
doris::RuntimeProfile*) in /ngs/app/doris/doris-current/be/lib/doris_be
    13# std::_Function_handler<void (), 
doris::AsyncResultWriter::start_writer(doris::RuntimeState*, 
doris::RuntimeProfile*)::$_0>::_M_invoke(std::_Any_data const&) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292
    14# doris::ThreadPool::dispatch_thread() at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/util/threadpool.cpp:623
    15# doris::Thread::supervise_thread(void*) at 
/home/zcp/repo_center/doris_tmp-branch-4.1.0-oss-native-sdk/doris/be/src/util/thread.cpp:461
    16# start_thread in /lib64/[libpthread.so](http://libpthread.so/).0
    17# __clone in /lib64/[libc.so](http://libc.so/).6
    ```
    
    ### Release note
    
    Fix BE crash (SIGSEGV) when writing to a sorted Iceberg table with data
    large enough to produce more than one output file.
    
    Co-authored-by: Sivarajan Narayanan <[email protected]>
---
 .../sink/writer/iceberg/viceberg_sort_writer.cpp   |   4 +-
 be/src/exec/sort/sorter.cpp                        |   6 +-
 be/test/exec/sort/merge_sorter_state.cpp           |  84 ++++++++++++++++
 .../write/test_iceberg_write_sorted_order.groovy   | 109 +++++++++++++++++++++
 4 files changed, 198 insertions(+), 5 deletions(-)

diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
index 3827cb6d925..6081166777f 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -56,10 +56,10 @@ Status VIcebergSortWriter::open(RuntimeState* state, 
RuntimeProfile* profile,
 
 Status VIcebergSortWriter::write(Block& block) {
     std::lock_guard<std::mutex> lock(_sorter_mutex);
-
+    // Sample row size before append_block clears the block.
+    _update_spill_block_batch_row_count(block);
     // Append incoming block data to the sorter's internal buffer
     RETURN_IF_ERROR(_sorter->append_block(&block));
-    _update_spill_block_batch_row_count(block);
 
     // When accumulated data size reaches the target file size threshold,
     // sort the data in memory and flush it directly to a Parquet/ORC file.
diff --git a/be/src/exec/sort/sorter.cpp b/be/src/exec/sort/sorter.cpp
index 9b2ecd60953..2d9304adfa2 100644
--- a/be/src/exec/sort/sorter.cpp
+++ b/be/src/exec/sort/sorter.cpp
@@ -57,11 +57,11 @@ namespace doris {
 //
 
 void MergeSorterState::reset() {
-    std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
-    std::vector<std::shared_ptr<Block>> empty_blocks(0);
-    _sorted_blocks.swap(empty_blocks);
+    _sorted_blocks.clear();      // replaces swap-with-empty idiom
+    _queue = MergeSorterQueue(); // release stale cursors from prior flush 
cycle
     unsorted_block() = Block::create_unique(unsorted_block()->clone_empty());
     _in_mem_sorted_bocks_size = 0;
+    _num_rows = 0;
 }
 
 void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
diff --git a/be/test/exec/sort/merge_sorter_state.cpp 
b/be/test/exec/sort/merge_sorter_state.cpp
index 7af89e7cbdf..cd500a47056 100644
--- a/be/test/exec/sort/merge_sorter_state.cpp
+++ b/be/test/exec/sort/merge_sorter_state.cpp
@@ -168,4 +168,88 @@ TEST_F(MergeSorterStateTest, 
whole_block_fast_path_allows_smaller_than_batch) {
         EXPECT_EQ(block.rows(), 0);
     }
 }
+
+TEST_F(MergeSorterStateTest, test_reset_clears_all_state) {
+    state.reset(new MergeSorterState(*row_desc, 0));
+
+    // Add sorted blocks and build merge tree (simulates a sort-write cycle)
+    state->add_sorted_block(create_block({1, 3, 5}));
+    state->add_sorted_block(create_block({2, 4, 6}));
+    EXPECT_EQ(state->num_rows(), 6);
+
+    SortDescription desc {SortColumnDescription {0, 1, -1}};
+    ASSERT_TRUE(state->build_merge_tree(desc));
+    EXPECT_EQ(state->get_queue().size(), 2);
+
+    // Drain the queue (simulates _write_sorted_data completing)
+    Block block;
+    bool eos = false;
+    while (!eos) {
+        ASSERT_TRUE(state->merge_sort_read(&block, 10, &eos).ok());
+        block.clear_column_data();
+    }
+    EXPECT_EQ(state->get_queue().size(), 0);
+
+    // reset() must clear all state for the next batch
+    state->reset();
+    EXPECT_EQ(state->get_sorted_block().size(), 0); // _sorted_blocks cleared
+    EXPECT_EQ(state->get_queue().size(), 0);        // _queue cleared
+    EXPECT_EQ(state->num_rows(), 0);                // _num_rows reset
+    EXPECT_EQ(state->data_size(), 0);               // no accumulated data
+
+    // Verify the sorter is fully reusable after reset
+    state->add_sorted_block(create_block({10, 20}));
+    EXPECT_EQ(state->num_rows(), 2);
+    ASSERT_TRUE(state->build_merge_tree(desc));
+    EXPECT_EQ(state->get_queue().size(), 1);
+
+    Block block2;
+    bool eos2 = false;
+    ASSERT_TRUE(state->merge_sort_read(&block2, 10, &eos2).ok());
+    EXPECT_TRUE(
+            ColumnHelper::block_equal(block2, 
ColumnHelper::create_block<DataTypeInt64>({10, 20})));
+}
+
+TEST_F(MergeSorterStateTest, test_reset_with_partial_drain) {
+    state.reset(new MergeSorterState(*row_desc, 0));
+
+    state->add_sorted_block(create_block({1, 2, 3}));
+    state->add_sorted_block(create_block({4, 5, 6}));
+
+    SortDescription desc {SortColumnDescription {0, 1, -1}};
+    ASSERT_TRUE(state->build_merge_tree(desc));
+    EXPECT_EQ(state->get_queue().size(), 2);
+
+    // Read only part of the data — queue is NOT fully drained
+    Block block;
+    bool eos = false;
+    ASSERT_TRUE(state->merge_sort_read(&block, 2, &eos).ok());
+    EXPECT_FALSE(eos);
+    EXPECT_GT(state->get_queue().size(), 0);
+
+    // reset() must cleanly discard the in-flight queue
+    state->reset();
+    EXPECT_EQ(state->get_queue().size(), 0);
+    EXPECT_EQ(state->num_rows(), 0);
+
+    // Sorter must be fully usable after mid-cycle reset
+    state->add_sorted_block(create_block({7, 8}));
+    ASSERT_TRUE(state->build_merge_tree(desc));
+    Block block2;
+    bool eos2 = false;
+    ASSERT_TRUE(state->merge_sort_read(&block2, 10, &eos2).ok());
+    EXPECT_TRUE(
+            ColumnHelper::block_equal(block2, 
ColumnHelper::create_block<DataTypeInt64>({7, 8})));
+}
+
+TEST_F(MergeSorterStateTest, test_reset_on_fresh_state) {
+    state.reset(new MergeSorterState(*row_desc, 0));
+
+    // reset() on a state that has never had data must not crash
+    state->reset();
+    EXPECT_EQ(state->get_sorted_block().size(), 0);
+    EXPECT_EQ(state->get_queue().size(), 0);
+    EXPECT_EQ(state->num_rows(), 0);
+    EXPECT_EQ(state->data_size(), 0);
+}
 } // namespace doris
\ No newline at end of file
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_sorted_order.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_sorted_order.groovy
new file mode 100644
index 00000000000..0d806afcacf
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_sorted_order.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression test for: BE SIGSEGV in MergeSorterState when writing to a
+// sorted Iceberg table with data large enough to trigger multiple file 
flushes.
+// MergeSorterState::reset() was not clearing _queue between flush cycles,
+// leaving stale cursors that caused a null pointer dereference in
+// update_batch_size() on the next flush.
+
+suite("test_iceberg_write_sorted_order", "p0,external") {
+
+    def test_sorted_write_multi_flush = { String catalog_name ->
+        sql """ DROP TABLE IF EXISTS test_iceberg_sorted_write """
+        sql """
+            CREATE TABLE test_iceberg_sorted_write (
+                id     INT,
+                name   STRING,
+                value  DOUBLE
+            ) ENGINE = iceberg
+            ORDER BY (id ASC)
+            PROPERTIES (
+                "compression-codec" = "zstd",
+                "write-format" = "parquet"
+            )
+        """
+
+        // Lower the target file size so each pipeline block (~90 KB) exceeds 
the
+        // threshold, forcing _flush_to_file() → reset() on every write() call.
+        // With 10 000 rows this produces 2+ flushes from write() — the minimum
+        // needed to hit the stale-cursor crash on the second cycle.
+        sql """ SET iceberg_write_target_file_size_bytes = 51200 """
+
+        // 10 000 rows via the built-in numbers() generator — no external data 
needed.
+        sql """
+            INSERT INTO test_iceberg_sorted_write
+            SELECT number,
+                   concat('name_', cast(number AS STRING)),
+                   number * 1.5
+            FROM numbers("number" = "10000")
+        """
+
+        // Verify via Iceberg $files metadata table:
+        // - multiple files were created (proves multiple flush cycles 
happened)
+        // - total committed row count matches what was inserted
+        def files = sql """ SELECT count(*), sum(record_count)
+                            FROM test_iceberg_sorted_write\$files """
+        def file_count  = files[0][0].toLong()
+        def total_rows  = files[0][1].toLong()
+
+        assertTrue(file_count > 1L,  "Expected multiple files from multi-flush 
write, got ${file_count}")
+        assertEquals(10000L, total_rows)
+
+        sql """ DROP TABLE IF EXISTS test_iceberg_sorted_write """
+        sql """ SET iceberg_write_target_file_size_bytes = 0 """
+    }
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable Hive test.")
+        return
+    }
+
+    for (String hivePrefix : ["hive2", "hive3"]) {
+        setHivePrefix(hivePrefix)
+        try {
+            String hms_port      = context.config.otherConfigs.get(hivePrefix 
+ "HmsPort")
+            String hdfs_port     = context.config.otherConfigs.get(hivePrefix 
+ "HdfsPort")
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String catalog_name  = "test_iceberg_sorted_write_${hivePrefix}"
+
+            sql """ DROP CATALOG IF EXISTS ${catalog_name} """
+            sql """
+                CREATE CATALOG IF NOT EXISTS ${catalog_name} PROPERTIES (
+                    'type'                 = 'iceberg',
+                    'iceberg.catalog.type' = 'hms',
+                    'hive.metastore.uris'  = 
'thrift://${externalEnvIp}:${hms_port}',
+                    'fs.defaultFS'         = 
'hdfs://${externalEnvIp}:${hdfs_port}',
+                    'use_meta_cache'       = 'true'
+                )
+            """
+
+            sql """ DROP DATABASE IF EXISTS 
`${catalog_name}`.`sorted_write_test` """
+            sql """ CREATE DATABASE `${catalog_name}`.`sorted_write_test` """
+            sql """ USE `${catalog_name}`.`sorted_write_test` """
+
+            sql """ SET enable_fallback_to_original_planner = false """
+
+            test_sorted_write_multi_flush(catalog_name)
+
+            sql """ DROP DATABASE IF EXISTS 
`${catalog_name}`.`sorted_write_test` """
+            sql """ DROP CATALOG IF EXISTS ${catalog_name} """
+        } finally {
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to