[ 
https://issues.apache.org/jira/browse/FLINK-27827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543800#comment-17543800
 ] 

Yun Gao edited comment on FLINK-27827 at 5/30/22 7:31 AM:
----------------------------------------------------------

Hi [~ahailu]  sorry I'm also not quite sure there is concerns initially for 
also supporting explicit boundness on legacy sources, but for one thing, 
alternatively the bounded legacy source might be create like 
[https://github.com/apache/flink/blob/e7834d8b56a7b4b1c2674ab399032e21e76d9e63/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L60]

without maintaining the patch. For input format, the InputFormatSourceFunction 
could be used. 

For the long run, the community would still hope to migrate to the new source 
api and deprecated the legacy ones. 


was (Author: gaoyunhaii):
Hi [~ahailu]  sorry I'm also not quite sure there is concerns initially for 
also supporting explicit boundness on legacy sources, but for one thing, 
alternatively the bounded legacy source might be create like 
[https://github.com/apache/flink/blob/e7834d8b56a7b4b1c2674ab399032e21e76d9e63/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L60]

without maintaining the patch. For input format, the InputFormatSourceFunction 
could be used. 

Might cc [~jqin] 

> StreamExecutionEnvironment method supporting explicit Boundedness
> -----------------------------------------------------------------
>
>                 Key: FLINK-27827
>                 URL: https://issues.apache.org/jira/browse/FLINK-27827
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Andreas Hailu
>            Priority: Minor
>
> When creating a {{{}DataStreamSource{}}}, an explicitly bounded input is only 
> returned if the {{InputFormat}} provided implements {{{}FileInputFormat{}}}. 
> This is results in runtime exceptions when trying to run applications in 
> Batch execution mode while using non {{{}FileInputFormat{}}}s e.g. Apache 
> Iceberg [1], Flink's Hadoop MapReduce compatibility API's [2] inputs, etc...
> I understand there is a {{DataSource}} API [3] that supports the 
> specification of the boundedness of an input, but that would require all 
> connectors to revise their APIs to leverage it which would take some time.
> My organization is in the middle of migrating from the {{DataSet}} API to the 
> {{{}DataStream API{}}}, and we've found this to be a challenge as nearly all 
> of our inputs have been determined to be unbounded as we use {{InputFormats}} 
> that are not {{{}FileInputFormat{}}}s.
> Our work-around was to provide a local patch in 
> {{StreamExecutionEnvironment}} with a method supporting explicitly bounded 
> inputs.
> As this helped us implement a Batch {{DataStream}} solution, perhaps this is 
> something that may be helpful for others?
>  
> [1] [https://iceberg.apache.org/docs/latest/flink/#reading-with-datastream]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/dataset/hadoop_map_reduce/]
>  
> [3] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-data-source-api]
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to