stevenzwu commented on code in PR #7109: URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142850082
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java: ########## @@ -291,7 +294,25 @@ public DataStream<RowData> build() { if (env.getMaxParallelism() > 0) { parallelism = Math.min(parallelism, env.getMaxParallelism()); } - return env.createInput(format, typeInfo).setParallelism(parallelism); + + DataStreamSource<RowData> source = Review Comment: This covers one scenario. there are two other scenarios. I am wondering if we should fix it in the lower level of Flink readers (avro, parquet, orc). 1) Use `FlinkInputFormat` directly. e.g. `StreamingReaderOperator`. ``` private void processSplits() throws IOException { FlinkInputSplit split = splits.poll(); if (split == null) { currentSplitState = SplitState.IDLE; return; } format.open(split); try { RowData nextElement = null; while (!format.reachedEnd()) { nextElement = format.nextRecord(nextElement); sourceContext.collect(nextElement); } } finally { currentSplitState = SplitState.IDLE; format.close(); } // Re-schedule to process the next split. enqueueProcessSplits(); } ``` 2) new Flink FLIP-27 `IcebergSource`. Here is an example from `IcebergTableSource` that shows how users can construct the DataStream. We can fix it in `IcebergTableSource`. but we can't control users' code to add the filter in the `DataStream`. ``` private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) { SplitAssignerType assignerType = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE); IcebergSource<RowData> source = IcebergSource.forRowData() .tableLoader(loader) .assignerFactory(assignerType.factory()) .properties(properties) .project(getProjectedSchema()) .limit(limit) .filters(filters) .flinkConfig(readableConfig) .build(); DataStreamSource stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), source.name(), TypeInformation.of(RowData.class)); return stream; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org