skywalker0618 opened a new issue, #18424:
URL: https://github.com/apache/hudi/issues/18424

   ### Bug Description
   
   We found that the hudi 1.2 flink append-only sink failed to flush data to 
GCS when checkpoint barrier arrives. Please check the logs session for stack 
trace.
   
   And I did some investigation, seems like there's a bug in the way we close 
disruptor thread. In hudi 1.2 flink sink, we added a disruptor queue and thread 
to decouple the parquet write from the main flink thread, the flow:
   
   <img width="704" height="306" alt="Image" 
src="https://github.com/user-attachments/assets/15da3972-43fc-4921-bd64-45be267f5091";
 />
   
   When the checkpoint barrier arrives, the snapshotState function 
([code](https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java#L141))
 is called, where it calls:
   1. flushDisruptor()
   2. super.snapshotState() 
   
   In step 1, flushDisruptor() calls disruptorQueue.close(), which drains ring 
buffer AND kills the disruptor thread. For the parquet in-memory buffer (the 
2nd buffer), the disruptor thread is the writer thread, and the GCS upload 
thread is the reader thread. The reader thread checks every 1s if the writer 
thread is still alive, code:
   `// java.io.PipedInputStream.read() — JDK standard library, line ~321
   while (in < 0) {                                    // pipe buffer empty, 
waiting for data
       if (writeSide != null && !writeSide.isAlive()) { // writeSide.isAlive() 
is false when the disruptor thread is dead
           throw new IOException("Pipe broken");        // upload thread throws
       }
       try { wait(1000); } catch (InterruptedException e) { ... }
   }`
   
   So when the disruptor thread is terminated, the GCS upload thread detects 
"write side is not alive" and then this thread error out as well.
   Hence in step 2, when super.snapshotState() tries to flush parquet in-memory 
buffer to GCS, the job errors out because the upload thread was dead.
   
   To fix this, we can potentially change the flushDisruptor function NOT to 
call disruptorQueue.close() directly, but implementing a function to spin-wait 
until the disruptor buffer is fully consumed. In other words, just does the 
"spin-wait part" but not the "terminate thread" part.
   
   Does this make sense?
   
   ### Environment
   
   **Hudi version: 1.2
   **Query engine: Flink
   **Relevant configs:**
   
   
   ### Logs and Stack Trace
   
   Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could 
not complete snapshot 2 for operator hoodie_append_write: 
default_database.kafka_hp_storeindex_query_nodedup (188/512)#0. Failure reason: 
Checkpoint was declined.
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:718)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:351)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
        ... 16 common frames omitted
   Caused by: org.apache.hudi.exception.HoodieException: Error collect the 
write status for task [187]
        at 
org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getWriteStatuses(BulkInsertWriterHelper.java:204)
        at 
org.apache.hudi.sink.append.AppendWriteFunction.flushData(AppendWriteFunction.java:145)
        at 
org.apache.hudi.sink.append.AppendWriteFunction.snapshotState(AppendWriteFunction.java:97)
        at 
org.apache.hudi.sink.append.AppendWriteFunctionWithDisruptorBufferSort.snapshotState(AppendWriteFunctionWithDisruptorBufferSort.java:147)
        at 
org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:181)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
        ... 27 common frames omitted
   Caused by: java.io.IOException: Upload failed for 
'gs://uber-staging-kbb9a/zyf5m/team/flink_ingestion_test/kafka_hp_storeindex_query_nodedup/2026/03/27/aef1bbf1-a85b-4395-a350-366deaeac34c-0_187-512-0_20260327175848521.parquet'.
 details=java.io.IOException: Pipe broken


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to