Hi all,

I’m converting several batch Flink workflows to streaming, with bounded sources.

Some of our sources are reading Hadoop sequence files via 
StreamExecutionEnvironment.createInput(HadoopInputFormat).

The problem is that StreamGraphGenerator.existsUnboundedSource is returning 
true, because the LegacySourceTransformation for this source says it’s 
CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the 
execution mode to batch.

The root cause is that StreamExecutionEnvironment.createInput() checks if the 
input format extends FileInputFormat, and only sets up a bounded source if it 
does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set 
to CONTINUOUS_UNBOUNDED, which is wrong.

This looks like a bug in StreamExecutionEnvironment.createInput(), though not 
sure how best to fix it. Relying on class checks feels brittle.

Regards,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to