Arvid,

Thanks, Can you show me an example of how the source is tied to the
ExecutionEnivornment.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);


On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Robert,
>
> you basically just (re)write your application with DataStream API, use the
> new KafkaSource, and then let the automatic batch detection mode work [1].
> The most important part is that all your sources need to be bounded.
> Assuming that you just have a Kafka source, you need to use setBounded
> with the appropriate end offset/timestamp.
>
> Note that the rewritten Kafka source still has a couple of issues that
> should be addressed by the first bugfix release of 1.12 in this month. So
> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
> on production.
>
> If you have specific questions on the migration from DataSet and
> DataStream, please let me know.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>
> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <cinquate...@gmail.com>
> wrote:
>
>> I have a Kafka source that I would like to run a batch job on.  Since
>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>> DataStream API, can someone show me an example of this? (Using DataStream)
>>
>> thanks
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Robert Cullen
240-475-4490

Reply via email to