Arvid, Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); KafkaSource<String> source = KafkaSource .<String>builder() .setBootstrapServers("kafka-headless:9092") .setTopics(Arrays.asList("log-input")) .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) .setUnbounded(OffsetsInitializer.latest()) .build(); env.addSource(source); On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Robert, > > you basically just (re)write your application with DataStream API, use the > new KafkaSource, and then let the automatic batch detection mode work [1]. > The most important part is that all your sources need to be bounded. > Assuming that you just have a Kafka source, you need to use setBounded > with the appropriate end offset/timestamp. > > Note that the rewritten Kafka source still has a couple of issues that > should be addressed by the first bugfix release of 1.12 in this month. So > while it's safe to use for development, I'd wait for 1.12.1 to roll it out > on production. > > If you have specific questions on the migration from DataSet and > DataStream, please let me know. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html > > On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com> > wrote: > >> I have a Kafka source that I would like to run a batch job on. Since >> Version 1.12.0 is now soft deprecating the DataSet API in favor of the >> DataStream API, can someone show me an example of this? (Using DataStream) >> >> thanks >> -- >> Robert Cullen >> 240-475-4490 >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Robert Cullen 240-475-4490