Adding the Beam user list in case other folks might run into similar issues.

Hi Phani,

PulsarMessage's getMessageRecord() returns the message from the Pulsar
client itself. I haven't really used Pulsar myself but it looks like under
normal circumstances the Pulsar client would handle the translation from a
byte array to a Java object for you such as this
<https://github.com/streamnative/examples/blob/master/clients/schema/src/main/java/io/streamnative/examples/schema/avro/AvroSchemaConsumerExample.java>
example parsing Avro messages. Looking at the current Beam implementation
however, it would appear that the Pulsar client is configured to simply
take messages as a raw byte array and not use Pulsar's built-in
translation. This would mean that in your code you would probably want
something along the lines of
c.element().getMessageRecord().getMessage().getData() to get at the raw
bytes which you would then have to parse according to however they were
encoded.

Best,
B


On Mon, Jan 23, 2023 at 8:56 AM phani geeth <bvpgeeth0...@gmail.com> wrote:

> Hi Byron,
>
> Hope you are doing good.
>
> Need your help in Apache Beam PulsarIO again, finally able to read
> messages from pulsar server and able to pass to the next transform but not
> able to see the message it getting displayed as some encoded format not
> sure how to retrieve string message from that.
>
>
> Custom transform
> class MessagePrint extends DoFn<PulsarMessage, String>{
>     @ProcessElement
>     public void processElement(ProcessContext c)throws IOException{
>         System.out.println("printing message");
>         System.out.println(c.element().getMessageRecord());
>     }
> }
> Pipeline to read messages from pulsar topic
> PCollection<PulsarMessage> records=p.apply("read from pulsar", PulsarIO.
> read().withTopic(topic_name)
>         .withPublishTime().withClientUrl(client_url).withAdminUrl
> (admin_url));
>
>         records.apply("print message",ParDo.of(new MessagePrint()));
> c.element().getMessageRecord() is returning as "
> org.apache.pulsar.client.impl.MessageImpl@6cb47e3"
>
> Any help on this would be highly appreciated.
>
>
>
> Thanks for all the help so far.
> Regards,
> Phani Geeth
>
>
> On Mon, Oct 10, 2022 at 10:39 PM Phani Geeth <bvpgeeth0...@gmail.com>
> wrote:
>
>> Hi Byron,
>>
>>
>>
>> I am running the pipeline in eclipse workspace able to connect to
>> PulsarIO topic and getting message received status, but message is not able
>> to move to next PTransform. My use case is to read from Pulsar and write to
>> PubSub.
>>
>> Any help on this would be highly appreciated.
>>
>>
>>
>> Thanks for all the help so far.
>>
>>
>>
>> Regards,
>>
>> Phani Geeth
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *Byron Ellis <byronel...@google.com>
>> *Sent: *07 October 2022 19:38
>> *To: *phani geeth <bvpgeeth0...@gmail.com>
>> *Cc: *user@beam.apache.org
>> *Subject: *Re: PulsarIO not connecting
>>
>>
>>
>> There could be any number of reasons for that, it's hard to say without
>> knowing how you're running the pipeline. There's a pretty good chance the
>> message is indeed being printed, but not on a console you can see easily.
>> Personally for this kind of testing I tend to use metrics to get quick
>> feedback rather than logging output (if you do log I'd recommend switching
>> to a logging library rather than using println for a variety of reasons)
>>
>>
>>
>> Best,
>>
>> B
>>
>>
>>
>> On Fri, Oct 7, 2022 at 2:18 AM phani geeth <bvpgeeth0...@gmail.com>
>> wrote:
>>
>> Thanks Byron for the quick response it worked by creating Serializable
>> Function and generating client inside it.
>>
>>
>>
>> One more help is how to display pulsar message while adding DoFn in
>> pipeline it's not printing messages, able to see pipeline is recieving
>> messages but not able to display.
>>
>>
>>
>> Class MessagePrint extends DoFn<PulsarMessage,String>{
>>
>> @ProcessElement
>>
>> public void processElement(ProcessContext c){
>>
>> System.out.println(c.element);
>>
>> System.out.println(c.element().getMessageRecord());
>>
>> }}
>>
>>
>>
>>
>>
>> Added this DoFn in pipeline after PulsarIO but not able to print any
>> message.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Phani Geeth
>>
>>
>>
>> On Fri, 7 Oct, 2022, 2:29 am Byron Ellis via user, <user@beam.apache.org>
>> wrote:
>>
>> Hi Phani,
>>
>>
>>
>> I believe what you want to do here is construct the PulsarClient object
>> within an implementation of SerializableFunction so that it can be executed
>> remotely rather than constructing the client in your main function. That
>> will mean ensuring that you have access to your certificate files from
>> those remote resources as well.
>>
>>
>>
>> Best,
>>
>> B
>>
>>
>>
>> On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bvpgeeth0...@gmail.com>
>> wrote:
>>
>> Hi Team,
>>
>>
>>
>> I am using native PulsarIO to connect existing pulsar server with ssl
>> certification. But while adding withPulsarClient in pipeline I am getting
>> cast error.
>>
>>
>>
>> More details and code is posted in below stackoverflow link
>>
>>
>> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>>
>>
>> Phani Geeth
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>>
>>
>

Reply via email to