Adding the Beam user list in case other folks might run into similar issues.
Hi Phani, PulsarMessage's getMessageRecord() returns the message from the Pulsar client itself. I haven't really used Pulsar myself but it looks like under normal circumstances the Pulsar client would handle the translation from a byte array to a Java object for you such as this <https://github.com/streamnative/examples/blob/master/clients/schema/src/main/java/io/streamnative/examples/schema/avro/AvroSchemaConsumerExample.java> example parsing Avro messages. Looking at the current Beam implementation however, it would appear that the Pulsar client is configured to simply take messages as a raw byte array and not use Pulsar's built-in translation. This would mean that in your code you would probably want something along the lines of c.element().getMessageRecord().getMessage().getData() to get at the raw bytes which you would then have to parse according to however they were encoded. Best, B On Mon, Jan 23, 2023 at 8:56 AM phani geeth <bvpgeeth0...@gmail.com> wrote: > Hi Byron, > > Hope you are doing good. > > Need your help in Apache Beam PulsarIO again, finally able to read > messages from pulsar server and able to pass to the next transform but not > able to see the message it getting displayed as some encoded format not > sure how to retrieve string message from that. > > > Custom transform > class MessagePrint extends DoFn<PulsarMessage, String>{ > @ProcessElement > public void processElement(ProcessContext c)throws IOException{ > System.out.println("printing message"); > System.out.println(c.element().getMessageRecord()); > } > } > Pipeline to read messages from pulsar topic > PCollection<PulsarMessage> records=p.apply("read from pulsar", PulsarIO. > read().withTopic(topic_name) > .withPublishTime().withClientUrl(client_url).withAdminUrl > (admin_url)); > > records.apply("print message",ParDo.of(new MessagePrint())); > c.element().getMessageRecord() is returning as " > org.apache.pulsar.client.impl.MessageImpl@6cb47e3" > > Any help on this would be highly appreciated. > > > > Thanks for all the help so far. > Regards, > Phani Geeth > > > On Mon, Oct 10, 2022 at 10:39 PM Phani Geeth <bvpgeeth0...@gmail.com> > wrote: > >> Hi Byron, >> >> >> >> I am running the pipeline in eclipse workspace able to connect to >> PulsarIO topic and getting message received status, but message is not able >> to move to next PTransform. My use case is to read from Pulsar and write to >> PubSub. >> >> Any help on this would be highly appreciated. >> >> >> >> Thanks for all the help so far. >> >> >> >> Regards, >> >> Phani Geeth >> >> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for >> Windows >> >> >> >> *From: *Byron Ellis <byronel...@google.com> >> *Sent: *07 October 2022 19:38 >> *To: *phani geeth <bvpgeeth0...@gmail.com> >> *Cc: *user@beam.apache.org >> *Subject: *Re: PulsarIO not connecting >> >> >> >> There could be any number of reasons for that, it's hard to say without >> knowing how you're running the pipeline. There's a pretty good chance the >> message is indeed being printed, but not on a console you can see easily. >> Personally for this kind of testing I tend to use metrics to get quick >> feedback rather than logging output (if you do log I'd recommend switching >> to a logging library rather than using println for a variety of reasons) >> >> >> >> Best, >> >> B >> >> >> >> On Fri, Oct 7, 2022 at 2:18 AM phani geeth <bvpgeeth0...@gmail.com> >> wrote: >> >> Thanks Byron for the quick response it worked by creating Serializable >> Function and generating client inside it. >> >> >> >> One more help is how to display pulsar message while adding DoFn in >> pipeline it's not printing messages, able to see pipeline is recieving >> messages but not able to display. >> >> >> >> Class MessagePrint extends DoFn<PulsarMessage,String>{ >> >> @ProcessElement >> >> public void processElement(ProcessContext c){ >> >> System.out.println(c.element); >> >> System.out.println(c.element().getMessageRecord()); >> >> }} >> >> >> >> >> >> Added this DoFn in pipeline after PulsarIO but not able to print any >> message. >> >> >> >> >> >> Regards, >> >> Phani Geeth >> >> >> >> On Fri, 7 Oct, 2022, 2:29 am Byron Ellis via user, <user@beam.apache.org> >> wrote: >> >> Hi Phani, >> >> >> >> I believe what you want to do here is construct the PulsarClient object >> within an implementation of SerializableFunction so that it can be executed >> remotely rather than constructing the client in your main function. That >> will mean ensuring that you have access to your certificate files from >> those remote resources as well. >> >> >> >> Best, >> >> B >> >> >> >> On Thu, Oct 6, 2022 at 3:19 AM Phani Geeth <bvpgeeth0...@gmail.com> >> wrote: >> >> Hi Team, >> >> >> >> I am using native PulsarIO to connect existing pulsar server with ssl >> certification. But while adding withPulsarClient in pipeline I am getting >> cast error. >> >> >> >> More details and code is posted in below stackoverflow link >> >> >> https://stackoverflow.com/questions/73937922/not-able-to-connect-to-pulsario-using-apache-beam-java-sdk >> >> >> >> >> >> >> >> Regards, >> >> >> >> Phani Geeth >> >> >> >> >> >> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for >> Windows >> >> >> >> >> >