cshuo commented on code in PR #18022:
URL: https://github.com/apache/hudi/pull/18022#discussion_r2739665834
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -440,6 +440,12 @@ public class FlinkOptions extends HoodieConfig {
+ "the avg read splits number per-second would be
'read.splits.limit'/'read.streaming.check-interval', by "
+ "default no limit");
+ public static final ConfigOption<Boolean> READ_NEW_SOURCE = ConfigOptions
+ .key("read.new.source")
Review Comment:
`read.new-source.enabled` or `read.flip27-source.enabled`
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData>
produceDataStream(StreamExecutionEnvironment execEnv)
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
- if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
- StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
- conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, partitionPruner);
- InputFormat<RowData, ?> inputFormat = getInputFormat(true);
- OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
- SingleOutputStreamOperator<MergeOnReadInputSplit>
monitorOperatorStream = execEnv.addSource(monitoringFunction,
getSourceOperatorName("split_monitor"))
- .uid(Pipelines.opUID("split_monitor", conf))
- .setParallelism(1)
- .setMaxParallelism(1);
-
- DataStream<MergeOnReadInputSplit> sourceWithKey =
addFileDistributionStrategy(monitorOperatorStream);
-
- SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
- .transform("split_reader", typeInfo, factory)
- .uid(Pipelines.opUID("split_reader", conf))
- .setParallelism(conf.get(FlinkOptions.READ_TASKS));
- return new DataStreamSource<>(streamReadSource);
+
+ if (conf.get(FlinkOptions.READ_NEW_SOURCE)) {
+ return produceNewSourceDataStream(execEnv);
} else {
- InputFormatSourceFunctionAdapter<RowData> func = new
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
- DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
- return
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ return produceLegacySourceDataStream(execEnv, typeInfo);
}
}
};
}
+ /**
+ * Produces a DataStream using the new FLIP-27 HoodieSource.
+ *
+ * @param execEnv the stream execution environment
+ * @return the configured DataStream
+ */
+ private DataStream<RowData>
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+ HoodieSource<RowData> hoodieSource = createHoodieSource();
+ DataStreamSource<RowData> source = execEnv.fromSource(
+ hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+ return
source.name(getSourceOperatorName("hudi_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ }
+
+ /**
+ * Produces a DataStream using the legacy source implementation.
+ *
+ * @param execEnv the stream execution environment
+ * @param typeInfo type information for RowData
+ * @return the configured DataStream
+ */
+ private DataStream<RowData> produceLegacySourceDataStream(
+ StreamExecutionEnvironment execEnv,
+ TypeInformation<RowData> typeInfo) {
+ if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+ StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
+ conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, partitionPruner);
+ InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+ OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory =
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+ SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+ .uid(Pipelines.opUID("split_monitor", conf))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+
+ DataStream<MergeOnReadInputSplit> sourceWithKey =
addFileDistributionStrategy(monitorOperatorStream);
+
+ SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
+ .transform("split_reader", typeInfo, factory)
+ .uid(Pipelines.opUID("split_reader", conf))
+ .setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ return new DataStreamSource<>(streamReadSource);
+ } else {
+ InputFormatSourceFunctionAdapter<RowData> func = new
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
+ DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
+ return
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ }
+ }
+
+ /**
+ * Creates a new Flink FLIP-27 HoodieSource for reading data from the Hudi
table.
+ *
+ * @return the configured HoodieSource instance
+ */
+ private HoodieSource<RowData> createHoodieSource() {
+ ValidationUtils.checkState(metaClient != null, "MetaClient must be
initialized before creating HoodieSource");
+ ValidationUtils.checkState(hadoopConf != null, "Hadoop configuration must
be initialized");
+ ValidationUtils.checkState(conf != null, "Configuration must be
initialized");
+
+ HoodieSchema tableSchema = (this.metaClient == null || !tableDataExists())
? inferSchemaFromDdl() : getTableSchema();
Review Comment:
`this.metaClient == null` redundant condition here, already checked above.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData>
produceDataStream(StreamExecutionEnvironment execEnv)
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
- if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
- StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
- conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, partitionPruner);
- InputFormat<RowData, ?> inputFormat = getInputFormat(true);
- OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
- SingleOutputStreamOperator<MergeOnReadInputSplit>
monitorOperatorStream = execEnv.addSource(monitoringFunction,
getSourceOperatorName("split_monitor"))
- .uid(Pipelines.opUID("split_monitor", conf))
- .setParallelism(1)
- .setMaxParallelism(1);
-
- DataStream<MergeOnReadInputSplit> sourceWithKey =
addFileDistributionStrategy(monitorOperatorStream);
-
- SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
- .transform("split_reader", typeInfo, factory)
- .uid(Pipelines.opUID("split_reader", conf))
- .setParallelism(conf.get(FlinkOptions.READ_TASKS));
- return new DataStreamSource<>(streamReadSource);
+
+ if (conf.get(FlinkOptions.READ_NEW_SOURCE)) {
+ return produceNewSourceDataStream(execEnv);
} else {
- InputFormatSourceFunctionAdapter<RowData> func = new
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
- DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
- return
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ return produceLegacySourceDataStream(execEnv, typeInfo);
}
}
};
}
+ /**
+ * Produces a DataStream using the new FLIP-27 HoodieSource.
+ *
+ * @param execEnv the stream execution environment
+ * @return the configured DataStream
+ */
+ private DataStream<RowData>
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+ HoodieSource<RowData> hoodieSource = createHoodieSource();
+ DataStreamSource<RowData> source = execEnv.fromSource(
+ hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+ return
source.name(getSourceOperatorName("hudi_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
Review Comment:
Add a uid for new source explicitly like legacy source.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]