I have an unbounded kafka source that has records written to it every
second.  Instead of the job waiting to process the new messages it closes.
How do I keep the stream open?

KafkaSource<FluentdMessage> dataSource = KafkaSource
        .<FluentdMessage>builder()
        .setBootstrapServers(kafkaServer)
        .setTopics(Arrays.asList("fluentd"))
        .setGroupId("")
        .setDeserializer(new FluentdRecordDeserializer())
        //.setStartingOffsets(OffsetsInitializer.earliest())
        //.setBounded(OffsetsInitializer.latest())
        .setUnbounded(OffsetsInitializer.latest())
        .build();




-- 
Robert Cullen
240-475-4490

Reply via email to