This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f2256ec94ccf7c6fc68b49ddd351f5b2aaa8fc35
Author: shaoxiong.zhan <shaoxiong0...@gmail.com>
AuthorDate: Tue Dec 6 20:15:49 2022 +0800

    exclude hudi-kafka-connect & add some api to support FLIP-27 source
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 ++++
 .../org/apache/hudi/table/HoodieTableSource.java   | 42 +++++++++++++---------
 .../table/format/mor/MergeOnReadInputFormat.java   | 29 +++++++++++++++
 .../table/format/mor/MergeOnReadInputSplit.java    |  8 ++++-
 4 files changed, 68 insertions(+), 17 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index aa1e3297bd..df2c96c8a9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -200,6 +200,12 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Parallelism of tasks that do actual read, default is 
the parallelism of the execution environment");
 
+  public static final ConfigOption<Integer> NUM_RECORDS_PER_BATCH = 
ConfigOptions
+      .key("num.records_per.batch")
+      .intType()
+      .defaultValue(10000)
+      .withDescription("num records per batch in single split");
+
   public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = 
ConfigOptions
       .key("source.avro-schema.path")
       .stringType()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6fac5e4b88..ab270f89b0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -115,8 +115,9 @@ public class HoodieTableSource implements
   private final transient HoodieTableMetaClient metaClient;
   private final long maxCompactionMemoryInBytes;
 
-  private final ResolvedSchema schema;
   private final RowType tableRowType;
+  private final String[] schemaFieldNames;
+  private final DataType[] schemaTypes;
   private final Path path;
   private final List<String> partitionKeys;
   private final String defaultPartName;
@@ -135,34 +136,43 @@ public class HoodieTableSource implements
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf) {
-    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
null);
+
+    this(schema.getColumnNames().toArray(new String[0]),
+        schema.getColumnDataTypes().toArray(new DataType[0]),
+        (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(),
+        path, partitionKeys, defaultPartName, conf, null, null, null, null, 
null);
   }
 
   public HoodieTableSource(
-      ResolvedSchema schema,
+      String[] schemaFieldNames,
+      DataType[] schemaTypes,
+      RowType rowType,
       Path path,
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf,
+      @Nullable FileIndex fileIndex,
       @Nullable List<Map<String, String>> requiredPartitions,
       @Nullable int[] requiredPos,
       @Nullable Long limit,
-      @Nullable List<ResolvedExpression> filters) {
-    this.schema = schema;
-    this.tableRowType = (RowType) 
schema.toPhysicalRowDataType().notNull().getLogicalType();
+      @Nullable HoodieTableMetaClient metaClient) {
+    this.schemaFieldNames = schemaFieldNames;
+    this.schemaTypes = schemaTypes;
+    this.tableRowType = rowType;
     this.path = path;
     this.partitionKeys = partitionKeys;
     this.defaultPartName = defaultPartName;
     this.conf = conf;
+    this.fileIndex = fileIndex == null
+        ? FileIndex.instance(this.path, this.conf, this.tableRowType)
+        : fileIndex;
     this.requiredPartitions = requiredPartitions;
     this.requiredPos = requiredPos == null
         ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray()
         : requiredPos;
     this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
-    this.filters = filters == null ? Collections.emptyList() : filters;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-    this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
-    this.fileIndex = FileIndex.instance(this.path, this.conf, 
this.tableRowType);
+    this.metaClient = metaClient == null ? 
StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
     this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
   }
 
@@ -210,8 +220,8 @@ public class HoodieTableSource implements
 
   @Override
   public DynamicTableSource copy() {
-    return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, requiredPartitions, requiredPos, limit, filters);
+    return new HoodieTableSource(schemaFieldNames, schemaTypes, tableRowType, 
path, partitionKeys, defaultPartName,
+        conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
   }
 
   @Override
@@ -256,8 +266,8 @@ public class HoodieTableSource implements
   }
 
   private DataType getProducedDataType() {
-    String[] schemaFieldNames = this.schema.getColumnNames().toArray(new 
String[0]);
-    DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new 
DataType[0]);
+    String[] schemaFieldNames = this.schemaFieldNames;
+    DataType[] schemaTypes = this.schemaTypes;
 
     return DataTypes.ROW(Arrays.stream(this.requiredPos)
             .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], 
schemaTypes[i]))
@@ -266,7 +276,7 @@ public class HoodieTableSource implements
   }
 
   private String getSourceOperatorName(String operatorName) {
-    String[] schemaFieldNames = this.schema.getColumnNames().toArray(new 
String[0]);
+    String[] schemaFieldNames = this.schemaFieldNames;
     List<String> fields = Arrays.stream(this.requiredPos)
         .mapToObj(i -> schemaFieldNames[i])
         .collect(Collectors.toList());
@@ -450,8 +460,8 @@ public class HoodieTableSource implements
 
     return new CopyOnWriteInputFormat(
         FilePathUtils.toFlinkPaths(paths),
-        this.schema.getColumnNames().toArray(new String[0]),
-        this.schema.getColumnDataTypes().toArray(new DataType[0]),
+        this.schemaFieldNames,
+        this.schemaTypes,
         this.requiredPos,
         this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
         this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index c9b6561bde..a0026d54ca 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -132,6 +133,8 @@ public class MergeOnReadInputFormat
    */
   private boolean emitDelete;
 
+  private final int numRecordsPerBatch;
+
   /**
    * Flag saying whether the input format has been closed.
    */
@@ -153,6 +156,7 @@ public class MergeOnReadInputFormat
     // because we need to
     this.requiredPos = tableState.getRequiredPositions();
     this.limit = limit;
+    this.numRecordsPerBatch = conf.get(FlinkOptions.NUM_RECORDS_PER_BATCH);
     this.emitDelete = emitDelete;
   }
 
@@ -408,6 +412,31 @@ public class MergeOnReadInputFormat
     };
   }
 
+  public Iterator<RowData> readBatch() throws IOException {
+    List<RowData> result = new ArrayList<>(this.numRecordsPerBatch);
+    int remaining = this.numRecordsPerBatch;
+    RowData next = null;
+    while (!this.isClosed() && remaining-- > 0) {
+      if (!this.reachedEnd()) {
+        next = nextRecord(null);
+        result.add(next);
+      } else {
+        close();
+        break;
+      }
+    }
+
+    if (result.isEmpty()) {
+      return null;
+    }
+    return result.iterator();
+  }
+
+  public MergeOnReadInputFormat copy() {
+    return new MergeOnReadInputFormat(this.conf, this.tableState, 
this.fieldTypes,
+        this.defaultPartName, this.limit, this.emitDelete);
+  }
+
   private ClosableIterator<RowData> 
getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
     final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
     final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index cde646e41f..f3bc3361a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format.mor;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.core.io.InputSplit;
 
 import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import java.util.List;
 /**
  * Represents an input split of source, actually a data bucket.
  */
-public class MergeOnReadInputSplit implements InputSplit {
+public class MergeOnReadInputSplit implements InputSplit, SourceSplit {
   private static final long serialVersionUID = 1L;
 
   private static final long NUM_NO_CONSUMPTION = 0L;
@@ -78,6 +79,11 @@ public class MergeOnReadInputSplit implements InputSplit {
     this.fileId = fileId;
   }
 
+  @Override
+  public String splitId() {
+    return getFileId();
+  }
+
   public Option<String> getBasePath() {
     return basePath;
   }

Reply via email to