I am trying to write a quick sample of streaming word count using Beam APIs and 
FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a 
custom ParDo function:

/**
 * Write content to Kafka.
 *
 */
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {

    private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
    private boolean opened = false;

    public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> 
kafkaSink){
        this.kafkaSink = kafkaSink;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        if(!opened){
            kafkaSink.open(new Configuration());
            opened = true;
        }
        Tuple2<String, Integer> record = c.element();
        try {
            kafkaSink.invoke(record);
        }catch(Throwable t){
            System.out.println("Error writing record " + record + " to Kafka");
            t.printStackTrace();
        }
    }
}


The problem with this approach is that ParDo is not initialized with Streaming 
context, that FlinkKafkaConsumer relies upon, so open fails.


Any suggestions? 

Reply via email to