Your on the right track. Now you want to apply the KafkaIO PTransform and then run the pipeline.
Pipeline pipeline = ... List<String> li = // ArrayList of single element PCollection<String> p = Create.of(li); p.apply(KafkaIO.<Void, String>write() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("results") .withValueSerializer(StringSerializer.class) .values()); PipelineResult result = pipeline.run(); result.waitUntilFinish(); You'll specify the necessary configuration parameters to the KafkaIO.Write PTransform. You can see more details and examples of using Kafka in the javadoc[1]. 1: https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/kafka/KafkaIO.html On Wed, Sep 12, 2018 at 9:40 AM Mahesh Vangala <vangalamahe...@gmail.com> wrote: > Hi Lukasz - > > I am trying to send a kafka message one per element in process_element > function. > As per your suggestion, I am trying to create a single element PCollection. > However, how can I have a PCollection by self? > > I was hoping this would work ... > > List<String> li = // ArrayList of single element > PCollection<String> p = Create.of(li); // But, this creates PValues. > > Any help would be appreciated. > Thank you. > > > > *--* > *Mahesh Vangala* > *(Ph) 443-326-1957* > *(web) mvangala.com <http://mvangala.com>* > > > On Tue, Sep 11, 2018 at 6:26 PM Mahesh Vangala <vangalamahe...@gmail.com> > wrote: > >> Thanks, Lukasz. >> Appreciate your advice. >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >> >> On Tue, Sep 11, 2018 at 6:19 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> A PCollection is a bag of elements. PCollections can be empty, have only >>> one element or have many. It is up to you to choose how many elements are >>> emitted into the PCollection by the upstream transforms. >>> >>> If you can limit the number of elements to the PCollection that you >>> applied KafkaIO to to only one element you will have achieved your goal. >>> >>> On Tue, Sep 11, 2018 at 3:11 PM Mahesh Vangala <vangalamahe...@gmail.com> >>> wrote: >>> >>>> Hello - >>>> >>>> I'd like to write a single record to kafka topic through beam. >>>> However, I only see examples that work with PCollection. >>>> Any thoughts about how I can approach to this? >>>> Thank you. >>>> >>>> Regards, >>>> Mahesh >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com <http://mvangala.com>* >>>> >>>