Your on the right track. Now you want to apply the KafkaIO PTransform and
then run the pipeline.

Pipeline pipeline = ...
List<String> li = // ArrayList of single element
PCollection<String> p = Create.of(li);
p.apply(KafkaIO.<Void, String>write()
     .withBootstrapServers("broker_1:9092,broker_2:9092")
     .withTopic("results")
     .withValueSerializer(StringSerializer.class)
     .values());
PipelineResult result = pipeline.run();
result.waitUntilFinish();

You'll specify the necessary configuration parameters to the KafkaIO.Write
PTransform. You can see more details and examples of using Kafka in the
javadoc[1].

1:
https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/kafka/KafkaIO.html


On Wed, Sep 12, 2018 at 9:40 AM Mahesh Vangala <vangalamahe...@gmail.com>
wrote:

> Hi Lukasz -
>
> I am trying to send a kafka message one per element in process_element
> function.
> As per your suggestion, I am trying to create a single element PCollection.
> However, how can I have a PCollection by self?
>
> I was hoping this would work ...
>
> List<String> li = // ArrayList of single element
> PCollection<String> p = Create.of(li); // But, this creates PValues.
>
> Any help would be appreciated.
> Thank you.
>
>
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Tue, Sep 11, 2018 at 6:26 PM Mahesh Vangala <vangalamahe...@gmail.com>
> wrote:
>
>> Thanks, Lukasz.
>> Appreciate your advice.
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Tue, Sep 11, 2018 at 6:19 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> A PCollection is a bag of elements. PCollections can be empty, have only
>>> one element or have many. It is up to you to choose how many elements are
>>> emitted into the PCollection by the upstream transforms.
>>>
>>> If you can limit the number of elements to the PCollection that you
>>> applied KafkaIO to to only one element you will have achieved your goal.
>>>
>>> On Tue, Sep 11, 2018 at 3:11 PM Mahesh Vangala <vangalamahe...@gmail.com>
>>> wrote:
>>>
>>>> Hello -
>>>>
>>>> I'd like to write a single record to kafka topic through beam.
>>>> However, I only see examples that work with PCollection.
>>>> Any thoughts about how I can approach to this?
>>>> Thank you.
>>>>
>>>> Regards,
>>>> Mahesh
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>

Reply via email to