Pyflink with FlinkTypeInfo failing for Byte Array Deserializations

2022-02-26 Thread Ananth Gundabattula
Hello All,


I have a pyflink script that connects to Pulsar and streams data from a topic 
which has a BytesSchema (of Pulsar Schema type). The producer that is producing 
these messages is a protobuf producer and is writing messages as byte[] (I am 
not setting any schema type on the producer and hence defaulting to BytesSchema 
of pulsar) .

To deserialize the bytes from the topic, I used a SimpleStringSchema as the 
Deserializer in my script. This hit a deadend as the protobuf parser keeps 
failing as it is not able to parse the string as provided by the PulsarStream 
source. (Perhaps the utf-8 conversion from raw bytes is the issue here)

I am now trying to use the Flink Type Info based approach to deserialize my 
stream. However I am getting an “EOFException”. The FlinkTypeInfo has been set 
to “Types.PRIMITIVE_ARRAY(Types.BYTE())” as the type  information call to 
PulsarDeserializationSchema.flink_type_info().

Please find attached both the source code and the relevant exception log where 
the script is failing.

Could you please provide any pointers to help me get over this issue ?

Regards,
Ananth


FlinkTypeInfoException.log.gz
Description: FlinkTypeInfoException.log.gz


test_pulsar.py.gz
Description: test_pulsar.py.gz


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Ananth Gundabattula
Thanks a lot Yufei and Wong.

I was able to get a version working by combining both the aspects mentioned in 
each of your responses.


  1.  Trying the sample code base that Wong mentioned below resulted in a 
no-response from JobManager. I had to use the non-sql connector jar in my 
python script to get around this exception.
  2.  I still had to copy the flink-sql-pulsar-connector in the lib folder of 
FLINK_HOME and had to add the jar flink-pulsar-connector.jar in the client 
side.  In my previous tests, I was not doing both at the same time. Specifying 
the flink-pulsar-connector.jar jar in the client side overcomes the 
serialization issue that Yufei hypothesised as the root cause.
  3.  Not adding the flink-sql-pulsar-connector jar in FLINK_HOME/lib resulted 
in an Exception because the variable flinkSchema (in the Pulsar connector java 
code base) which is a static field does not seem to have been initialized if 
only added as an env jar in the client side.
  4.  If I add just the flink-pulsar-connector (non-sql one) in the 
FLINK_HOME/lib, there are exceptions related to missing Pulsar classes (Not 
Under the Flink code-base). I guess using the flink-sql-pulsar-connector.jar 
gives me the advantage of having the pulsar classes (flink and non-flink) 
resolvable via a single “uber” jar.


A point to note is that when the subscription type is set to “Shared”, the 
client seems to “hang” when running in standalone mode with a topic that has a 
single partition. Perhaps this is because there is only one partition in my 
topic. Will test with larger partitions to confirm if this is indeed a bug.



Regards,
Ananth

From: Luning Wong 
Date: Monday, 21 February 2022 at 10:45 pm
To: user@flink.apache.org , Ananth Gundabattula 

Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.


Luning Wong mailto:gfen...@gmail.com>> 于2022年2月21日周一 19:38写道:
import logging
import sys

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import PulsarSource, 
PulsarDeserializationSchema, SubscriptionType
from pyflink.common.typeinfo import Types

def foo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars('file:///Users/a/src/me/flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15-SNAPSHOT.jar')
deserialization_schema = 
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())
# deserialization_schema = 
PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)

ps = PulsarSource.builder()\
.set_deserialization_schema(deserialization_schema)\
.set_service_url('pulsar://localhost:6650')\
.set_admin_url('http://localhost:8080')\
.set_topics('ada')\
.set_subscription_name('axcsdas')\
.set_subscription_type(SubscriptionType.Exclusive)\
.build()

kafka_source = env.from_source(
source=ps,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="pulsar_source"
)
kafka_source.print()
env.execute('pulsar_source')

if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
foo()

The above is my test script. it is successful to submit a job to a standalone 
cluster.

Best,
Wong
Yufei Zhang mailto:affei...@gmail.com>> 于2022年2月21日周一 
18:33写道:
Hi Ananth,


From the steps you described, the steps involved using 
`flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge pulsar 
connector has not supported Table API yet, so would you mind considering using 
the  `flink-connector-pulsar-1.14.jar` (without sql, though the classes should 
be the same. 1.14 is also the stable version) . Since it failed to submit, I'm 
wildly guessing it's because some class not found issue prevented the 
serialization before submitting.

Also, you mentioned "Get a “transactions not enabled” error in spite of 
enabling transactions in 2.8.0 broker" this is interesting. To use 
transactions, not only do we need to enable transactions in the broker, but 
also in the pulsar source connector as well. Please refer to 
PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a call 
PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) 
would suffice)


Thank you for your report and I think since you have these detailed steps to 
reproduce, I'd recommend submitting a JIRA ticket and we'll try to reproduce 
the issue you just described in the coming days to find the exact cause. Thank 
you so much for your precise steps to reproduce.

Cheers,
Yufei.

On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Thanks Guowei.

A small correction in the telnet result command below. I had a typo in the 
telnet command earlier (did not separate the port fr

Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Ananth Gundabattula
Thanks Guowei.

A small correction in the telnet result command below. I had a typo in the 
telnet command earlier (did not separate the port from host name ). Issuing the 
proper telnet command resolved the jobmanagers host properly.

Regards,
Ananth

From: Guowei Ma 
Date: Monday, 21 February 2022 at 8:42 pm
To: Ananth Gundabattula 
Cc: user@flink.apache.org , affei...@gmail.com 

Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Thanks Ananth for your clarification.But I am not an expert on Pulsar.
I would cc the author of the connector to have a look. Would Yufei like to give 
some insight?

Best,
Guowei


On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Thanks for the response Guowei.


  *   Tried a telnet to the jobmanager host:port and I get 
“127.0.0.1:8086<http://127.0.0.1:8086>: nodename nor servname provided, or not 
known” which suggests that the network access is fine ?
  *   I resubmitted the word count example and it ran fine to completion.

For the pulsar script, I have also tried localhost, and the local LAN Ips as 
jobmanager host configuration in conf/flink.yaml and all of them end with the 
same result. I have also tried this with Pulsar 2.8.0 and it did have issues 
with “shared” subscription type (Get a “transactions not enabled” error in 
spite of enabling transactions in 2.8.0 broker).  When I change the 
subscription type to “Exclusive” it exhibits the same behavior as the Pulsar 
2.9.1 version. i.e. The job manager submission fails. (in both 2.8.0 pulsar and 
2.9.1 pulsar)

Regards,
Ananth

From: Guowei Ma mailto:guowei@gmail.com>>
Date: Monday, 21 February 2022 at 4:57 pm
To: Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what the 
specific problem is. From the current log, there are two things to try first:

1. From the client's log, it is a 5-minute timeout, so you can telnet 
127.0.0.1:8086<http://127.0.0.1:8086> to see if there is a problem with the 
local network
2. From the log on the server side, there is no job submission at all. You can 
try to submit the wordcount example again when submitting the pulsar example 
fails. So as to rule out whether the session cluster is inherently problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Ananth Gundabattula
Thanks for the response Guowei.


  *   Tried a telnet to the jobmanager host:port and I get “127.0.0.1:8086: 
nodename nor servname provided, or not known” which suggests that the network 
access is fine ?
  *   I resubmitted the word count example and it ran fine to completion.

For the pulsar script, I have also tried localhost, and the local LAN Ips as 
jobmanager host configuration in conf/flink.yaml and all of them end with the 
same result. I have also tried this with Pulsar 2.8.0 and it did have issues 
with “shared” subscription type (Get a “transactions not enabled” error in 
spite of enabling transactions in 2.8.0 broker).  When I change the 
subscription type to “Exclusive” it exhibits the same behavior as the Pulsar 
2.9.1 version. i.e. The job manager submission fails. (in both 2.8.0 pulsar and 
2.9.1 pulsar)

Regards,
Ananth

From: Guowei Ma 
Date: Monday, 21 February 2022 at 4:57 pm
To: Ananth Gundabattula 
Cc: user@flink.apache.org 
Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what the 
specific problem is. From the current log, there are two things to try first:

1. From the client's log, it is a 5-minute timeout, so you can telnet 
127.0.0.1:8086<http://127.0.0.1:8086> to see if there is a problem with the 
local network
2. From the log on the server side, there is no job submission at all. You can 
try to submit the wordcount example again when submitting the pulsar example 
fails. So as to rule out whether the session cluster is inherently problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Ananth Gundabattula
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



flink-ananth-client-Ananths-MacBook-Pro.local.log.gz
Description: flink-ananth-client-Ananths-MacBook-Pro.local.log.gz


flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz
Description:  flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz


test_pulsar.py.gz
Description: test_pulsar.py.gz


Re: Pyflink with pulsar

2022-02-19 Thread Ananth Gundabattula
Thanks a lot Wong.

I was wondering what the timelines are for the 1.15 release (to plan a things 
around our CI pipeline).  The below information really helps and thanks for the 
contribution.

Regards,
Ananth

From: Luning Wong 
Date: Friday, 18 February 2022 at 6:12 pm
To: user@flink.apache.org , Ananth Gundabattula 

Subject: Re: Pyflink with pulsar
The Pulsar python source connector will be released in 1.15 version.
if you want to use it right now, you could compile the master branch.
When I completed the python connector code, I only tested the native
pulsar protocol without KOP.

Usage examples are in comments of the PulsarSource class and in test
case of the FlinkPulsarTest class.
You can find them in the following PR link.
https://github.com/apache/flink/pull/18388

Best regards,

Wong

Xingbo Huang  于2022年2月18日周五 14:32写道:
>
>
>
> -- Forwarded message -
> 发件人: Ananth Gundabattula 
> Date: 2022年2月17日周四 16:57
> Subject: Pyflink with pulsar
> To: user@flink.apache.org 
>
>
> Hello All,
>
> I am trying to build a pyflink application and I currently have a pulsar 
> instance that I need to connect and start streaming messages from.
>
> I was wondering if there is any advice regarding pulsar as a source connector 
> available via python ?
>
> Alternately, Pulsar seems to have a kafka protocol handler (KOP) and was 
> wondering if anyone has built a pyflink application streaming from pulsar 
> using the Kafka protocol ( By using pyflink kafka consumer ) ? If yes, could 
> you please share your experiences.
>
>
> Regards,
> Ananth
>
>


Pyflink with pulsar

2022-02-17 Thread Ananth Gundabattula
Hello All,

I am trying to build a pyflink application and I currently have a pulsar 
instance that I need to connect and start streaming messages from.

I was wondering if there is any advice regarding pulsar as a source connector 
available via python ?

Alternately, Pulsar seems to have a kafka protocol handler (KOP) and was 
wondering if anyone has built a pyflink application streaming from pulsar using 
the Kafka protocol ( By using pyflink kafka consumer ) ? If yes, could you 
please share your experiences.


Regards,
Ananth




Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Ananth, Gundabattula
Congratulations Thomas.

Regards,
Ananth

From: zhijiang 
Reply-To: zhijiang 
Date: Wednesday, 13 February 2019 at 2:38 am
To: Jark Wu , Kostas Kloudas 
Cc: Hequn Cheng , Stefan Richter 
, user 
Subject: Re: [ANNOUNCE] New Flink PMC member Thomas Weise

Congrats Thomas!


Best,
Zhijiang
--
From:Kostas Kloudas 
Send Time:2019年2月12日(星期二) 22:46
To:Jark Wu 
Cc:Hequn Cheng ; Stefan Richter 
; user 
Subject:Re: [ANNOUNCE] New Flink PMC member Thomas Weise

Congratulations Thomas!

Best,
Kostas

On Tue, Feb 12, 2019 at 12:39 PM Jark Wu 
mailto:imj...@gmail.com>> wrote:
Congrats Thomas!

On Tue, 12 Feb 2019 at 18:58, Hequn Cheng 
mailto:chenghe...@gmail.com>> wrote:
Congrats Thomas!

Best, Hequn


On Tue, Feb 12, 2019 at 6:53 PM Stefan Richter 
mailto:s.rich...@data-artisans.com>> wrote:
Congrats Thomas!,

Best,
Stefan

Am 12.02.2019 um 11:20 schrieb Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>>:

Congratulations to Thomas. I see that this is not his first time in the PMC 
rodeo... also somebody needs to update LDAP as he's not on 
https://people.apache.org/phonebook.html?pmc=flink yet!

-stephenc

On Tue, 12 Feb 2019 at 09:59, Fabian Hueske 
mailto:fhue...@apache.org>> wrote:
Hi everyone,

On behalf of the Flink PMC I am happy to announce Thomas Weise as a new member 
of the Apache Flink PMC.

Thomas is a long time contributor and member of our community.
He is starting and participating in lots of discussions on our mailing lists, 
working on topics that are of joint interest of Flink and Beam, and giving 
talks on Flink at many events.

Please join me in welcoming and congratulating Thomas!

Best,
Fabian



--

Kostas Kloudas | Software Engineer



[Image removed by sender.]


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

** IMPORTANT MESSAGE *
This e-mail message is intended only for the addressee(s) and contains 
information which may be
confidential.
If you are not the intended recipient please advise the sender by return email, 
do not use or
disclose the contents, and delete the message and any attachments from your 
system. Unless
specifically indicated, this email does not constitute formal advice or 
commitment by the sender
or the Commonwealth Bank of Australia (ABN 48 123 123 124 AFSL and Australian 
credit licence 234945)
or its subsidiaries.
We can be contacted through our web site: commbank.com.au.
If you no longer wish to receive commercial electronic messages from us, please 
reply to this
e-mail by typing Unsubscribe in the subject line.
**