HuangZhenQiu commented on code in PR #18022:
URL: https://github.com/apache/hudi/pull/18022#discussion_r2744297006


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
+      InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+      SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream 
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+          .uid(Pipelines.opUID("split_monitor", conf))
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);

Review Comment:
   It can't be handled in this PR. Will think about the improvement as a follow 
up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to