codope commented on code in PR #7238:
URL: https://github.com/apache/hudi/pull/7238#discussion_r1037838713


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java:
##########
@@ -149,6 +153,7 @@ public void close() throws IOException {
 
   @Override
   public float getProgress() throws IOException {
-    return Math.min(parquetReader.getProgress(), 
logRecordScanner.getProgress());
+    // TODO fix to reflect scanner progress

Review Comment:
   Why would we need that for "unmerged" record reader?



##########
packaging/hudi-flink-bundle/pom.xml:
##########
@@ -107,6 +107,7 @@
                   
<include>com.fasterxml.jackson.core:jackson-databind</include>
                   <include>com.fasterxml.jackson.core:jackson-core</include>
 
+                  <include>com.lmax:disruptor</include>

Review Comment:
   should it be relocated as well?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java:
##########
@@ -76,42 +72,50 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, 
JobConf job,
     this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
     // Iterator for consuming records from parquet file
     this.parquetRecordsIterator = new 
RecordReaderValueIterator<>(this.parquetReader);
+
+    HoodieUnMergedLogRecordScanner.Builder scannerBuilder =
+        HoodieUnMergedLogRecordScanner.newBuilder()
+          .withFileSystem(FSUtils.getFs(split.getPath().toString(), 
this.jobConf))
+          .withBasePath(split.getBasePath())
+          .withLogFilePaths(split.getDeltaLogPaths())
+          .withReaderSchema(getReaderSchema())
+          .withLatestInstantTime(split.getMaxCommitTime())
+          
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+          .withReverseReader(false)
+          
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
+
     this.executor = new BoundedInMemoryExecutor<>(
-        
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), 
getParallelProducers(),
+        
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), 
getParallelProducers(scannerBuilder),
         Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), 
Functions.noop());
     // Consumer of this record reader
-    this.iterator = this.executor.getQueue().iterator();
-    this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()
-        .withFileSystem(FSUtils.getFs(split.getPath().toString(), 
this.jobConf))
-        .withBasePath(split.getBasePath())
-        .withLogFilePaths(split.getDeltaLogPaths())
-        .withReaderSchema(getReaderSchema())
-        .withLatestInstantTime(split.getMaxCommitTime())
-        
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
-        .withReverseReader(false)
-        
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withLogRecordScannerCallback(record -> {
-          // convert Hoodie log record to Hadoop AvroWritable and buffer
-          GenericRecord rec = (GenericRecord) 
record.getData().getInsertValue(getReaderSchema(), payloadProps).get();
-          ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
-          this.executor.getQueue().insertRecord(aWritable);
-        })
-        .build();
+    this.iterator = this.executor.getRecordIterator();
+
     // Start reading and buffering
-    this.executor.startProducers();
+    this.executor.startProducingAsync();
   }
 
   /**
    * Setup log and parquet reading in parallel. Both write to central buffer.
    */
-  private List<HoodieProducer<ArrayWritable>> getParallelProducers() {
-    List<HoodieProducer<ArrayWritable>> producers = new ArrayList<>();
-    producers.add(new FunctionBasedQueueProducer<>(buffer -> {
-      logRecordScanner.scan();
-      return null;
-    }));
-    producers.add(new IteratorBasedQueueProducer<>(parquetRecordsIterator));
-    return producers;
+  private List<HoodieProducer<ArrayWritable>> getParallelProducers(
+      HoodieUnMergedLogRecordScanner.Builder scannerBuilder
+  ) {
+    return Arrays.asList(
+        new FunctionBasedQueueProducer<>(queue -> {
+          HoodieUnMergedLogRecordScanner scanner =
+              scannerBuilder.withLogRecordScannerCallback(record -> {
+                    // convert Hoodie log record to Hadoop AvroWritable and 
buffer
+                    GenericRecord rec = (GenericRecord) 
record.getData().getInsertValue(getReaderSchema(), payloadProps).get();
+                    ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
+                    queue.insertRecord(aWritable);
+                  })
+                  .build();
+          // Scan all the delta-log files, filling in the queue
+          scanner.scan();

Review Comment:
   maybe assign scanner progress here, which can be used in L157.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to