I am trying to write a quick sample of streaming word count using Beam APIs and 
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a 
custom ParDo function:

 * Write content to Kafka.
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {

    private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
    private boolean opened = false;

    public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> 
        this.kafkaSink = kafkaSink;

    public void processElement(ProcessContext c) {
            kafkaSink.open(new Configuration());
            opened = true;
        Tuple2<String, Integer> record = c.element();
        try {
        }catch(Throwable t){
            System.out.println("Error writing record " + record + " to Kafka");

The problem with this approach is that ParDo is not initialized with Streaming 
context, that FlinkKafkaConsumer relies upon, so open fails.

Any suggestions? 

Reply via email to