hudi-agent commented on code in PR #18701:
URL: https://github.com/apache/hudi/pull/18701#discussion_r3210349685


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java:
##########
@@ -32,15 +32,37 @@
 public final class ParquetSplitRecordIterator implements 
ClosableIterator<RowData> {
   private final ParquetColumnarRowSplitReader reader;
 
+  // Cached end-of-stream signal once the underlying Hadoop input stream is 
closed externally
+  // (see hasNext() for the rationale). Avoids re-invoking the reader after 
we've already
+  // observed the close.
+  private boolean drained = false;
+
   public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
     this.reader = reader;
   }
 
   @Override
   public boolean hasNext() {
+    if (drained) {
+      return false;
+    }
     try {
       return !reader.reachedEnd();
     } catch (IOException e) {
+      // The underlying Hadoop FSDataInputStream can be closed externally 
during streaming
+      // source teardown: the SourceV2 SplitFetcher thread can close the 
stream after enqueueing
+      // a BatchRecords, then the mailbox thread polls and tries to read 
another row group on
+      // the now-closed stream. The well-known stable Hadoop signal for this is
+      // BufferedFSInputStream / FSInputChecker throwing IOException("Stream 
is closed!"). Since
+      // the streaming source runs with 
restart-strategy.maxNumberRestartAttempts=0, surfacing
+      // this as a fatal HoodieIOException permanently fails the job and turns
+      // ITTestHoodieDataSource#testStreamReadFromSpecifiedCommitWithChangelog 
into a flake.
+      // Treat it as end-of-stream and cache the state so subsequent hasNext() 
calls on the
+      // now-broken reader short-circuit instead of re-throwing on every poll.
+      if (isStreamClosedSignal(e)) {

Review Comment:
   🤖 This iterator is wired into batch readers too (`CopyOnWriteInputFormat`, 
`MergeOnReadInputFormat`, `HoodieRowDataParquetReader`), not just the SourceV2 
streaming path. In a batch read, an `IOException("Stream is closed!")` from a 
partially-read split would silently truncate the result instead of failing. 
Could the suppression be scoped to the SourceV2/CDC caller (e.g. a flag on the 
iterator, or done inside `HoodieCdcSplitReaderFunction`) so batch reads still 
surface the error? @danny0405 could you weigh in on whether this is the right 
layer for the fix?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java:
##########
@@ -32,15 +32,37 @@
 public final class ParquetSplitRecordIterator implements 
ClosableIterator<RowData> {
   private final ParquetColumnarRowSplitReader reader;
 
+  // Cached end-of-stream signal once the underlying Hadoop input stream is 
closed externally
+  // (see hasNext() for the rationale). Avoids re-invoking the reader after 
we've already
+  // observed the close.
+  private boolean drained = false;
+
   public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
     this.reader = reader;
   }
 
   @Override
   public boolean hasNext() {
+    if (drained) {
+      return false;
+    }
     try {
       return !reader.reachedEnd();

Review Comment:
   🤖 If `reader.reachedEnd()` throws "Stream is closed!" mid-split (rather than 
at the boundary between the last consumed batch and the next), how do we know 
the remaining rows in the current already-buffered batch have actually been 
delivered to the consumer? The fix assumes the BatchRecords already-fetched 
buffer is complete, but `reachedEnd()` calls `ensureBatch()` which can throw 
both when refilling AND when the current batch index is past the last buffered 
row — is there a way to verify we haven't dropped the tail of an in-flight 
batch?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to