Hi All,

     I am new to apache beam and spring cloud dataflow. I am trying to
integrate
apache beam in spring cloud dataflow. How to get spring-kafka message as a
source in beam pipeline ?. How to add spring-kafka as a sink in beam
pipeline ? Wanted to run pipeline forever untilfinish. Please suggest how
can I integrate ?

example wordcount PipelineOptions options = PipelineOptionsFactory.create();


Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) ---->
instead of TextIO.read().from want to trigger from message channel INPUT in
spring cloud dataflow
 .apply(FlatMapElements
     .into(TypeDescriptors.strings())
     .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
 .apply(Filter.by((String word) -> !word.isEmpty()))
 .apply(Count.<String>perElement())
 .apply(MapElements
     .into(TypeDescriptors.strings())
     .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " +
wordCount.getValue()))
.apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
----> send the result to message channel OUTPUT

p.run().waitUntilFinish();

Thanks,
Shankara

Reply via email to