Topic assignment across Flink Kafka Consumer

2021-07-19 Thread Prasanna kumar
Hi,

We have a Flink job reading from multiple Kafka topics based on a regex
pattern.

What we have found out is that the topics are not shared between the kafka
consumers in an even manner .

Example if there are 8 topics and 4 kafka consumer operators . 1
consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
consumer is not assigned at all.

This leads to inadequate usage of the resources.

I could not find any setting/configuration which would make them as even as
possible.

Let me know if there's a way to do the same.

Thanks,
Prasanna.


Some question of RocksDB state backend on ARM os

2021-07-19 Thread Wanghui (HiCampus)
Hi all:
   When I use RocksDB  as state backend on an aarch64 system, the following 
error occurs:

1.  Does the aarch64 system not support rocksdb?

2.  If not, is there a support plan for later versions of flink?
Caused by: java.lang.Exception: Exception while creating 
StreamOperatorStateContext.
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_ae33e81d863e4093619373d1e1f77012_(1/1) from 
any of the 1 provided restore options.
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)
 ... 9 more
Caused by: java.io.IOException: Could not load the native RocksDB library
 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:948)
 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:489)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 more
Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so: 
cannot open shared object file: No such file or directory (Possible cause: 
can't load AMD 64-bit .so on a AARCH64-bit platform)
 at java.lang.ClassLoader$NativeLibrary.load(Native Method)
 at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1934)
 at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1817)
 at java.lang.Runtime.load0(Runtime.java:810)
 at java.lang.System.load(System.java:1088)
 at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
 at 
org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:923)
 ... 15 more
Best regards
Hui Wang


Re: Can we share state between different keys in the same window?

2021-07-19 Thread JING ZHANG
Hi sweta,
State of different keys are isolated with each other. It means, you could
read/write the state of current key in
`ProcessFunction`/`KeyedProcessFunction`/`ProcessWindowFunction`, there is
no possible to read/write state of other keys.

Would you please describe your business demand, let's see if there is other
way to satisfy the requirement.

Best,
JING ZHANG

Sweta Kalakuntla  于2021年7月20日周二 上午5:04写道:

> Hi,
>
> I need to query the database(not a source but for additional information)
> in ProcessFunction. I want to save the results in a state or some other way
> so that I can use the data for other keys in the same window. What are my
> options?
>
> Thanks,
> sweta
>


RE: Datastream api implementation of a third party pyflink connector

2021-07-19 Thread Wang, Zhongle
Hi Xingbo,

Thanks for the reassurance.

PS: The java implementation of pravega connector is at 
https://github.com/pravega/flink-connectors/packages/

Best,
Zhongle Wang

From: Xingbo Huang 
Sent: Tuesday, July 20, 2021 9:58 AM
To: Wang, Zhongle
Cc: user@flink.apache.org
Subject: Re: Datastream api implementation of a third party pyflink connector

Hi Zhongle Wang,

Your understanding is correct. Firstly, you need to provide an implementation 
of a java connector, then add this jar to the dependency[1], and finally add a 
python connector wrapper.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/#adding-jar-files
 
[ci.apache.org]

Best,
Xingbo

Wang, Zhongle mailto:zhongle.w...@dell.com>> 
于2021年7月19日周一 下午5:43写道:
Hi,

I’m working on a pyflink datastream connector for Pravega and wish to use a 
datasource other than Kafka.

Currently the Kafka connector for the python datastream api is implemented 
using a ` get_gateway` function which creates a binding to java in ` 
FlinkKafkaConsumer`.

So if I want to create a `FlinkPrevegaReader` that consumes other datasource 
like the Pravega, is it recommended to do in the same way? (we have a java 
reader/consumer implementation)
Or the gateway thing might be changed in the future?

PS: In this 
post(https://stackoverflow.com/questions/65009292/is-there-a-kinesis-connector-for-pyflink
 
[stackoverflow.com])
 Xingbo suggested that the Kinesis and other connectors will be added soon, but 
I’m not sure whether it uses the same technique mentioned above.

Thanks,
Zhongle Wang



Internal Use - Confidential


Re: Datastream api implementation of a third party pyflink connector

2021-07-19 Thread Xingbo Huang
Hi Zhongle Wang,

Your understanding is correct. Firstly, you need to provide an
implementation of a java connector, then add this jar to the dependency[1],
and finally add a python connector wrapper.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/#adding-jar-files

Best,
Xingbo

Wang, Zhongle  于2021年7月19日周一 下午5:43写道:

> Hi,
>
>
>
> I’m working on a pyflink datastream connector for Pravega and wish to use
> a datasource other than Kafka.
>
>
>
> Currently the Kafka connector for the python datastream api is implemented
> using a ` get_gateway` function which creates a binding to java in `
> FlinkKafkaConsumer`.
>
>
>
> So if I want to create a `FlinkPrevegaReader` that consumes other
> datasource like the Pravega, is it recommended to do in the same way? (we
> have a java reader/consumer implementation)
>
> Or the gateway thing might be changed in the future?
>
>
>
> PS: In this post(
> https://stackoverflow.com/questions/65009292/is-there-a-kinesis-connector-for-pyflink)
> Xingbo suggested that the Kinesis and other connectors will be added soon,
> but I’m not sure whether it uses the same technique mentioned above.
>
>
>
> Thanks,
>
> Zhongle Wang
>
>
>


Stateful Functions Status

2021-07-19 Thread Omid Bakhshandeh
Hi,

We are evaluating Flink Stateful Functions in our company and we are trying
to see if it fits our needs. I'm hoping to get some help from the
community as we do this.

There are a couple of primary questions that can speed up our process:

1- It seems in version 2.2.0, in the Python SDK, it was possible to have
messages with a specific type because everything was Protobuf but in 3.0.0
that is not possible and there is always some boilerplate to convert
messages.

@functions.bind("showcase/messaging")
> def messaging(context: Context, message: Message):

vs

> def greet(context, greet_request: GreetRequest):


Is that right?


2- Is GRPC and maybe more efficient protocols part of the roadmap in the
near future?

3- All of the examples I found on the Python SDK, all the function has been
written in a single file with no specific structure (e.g. implementing an
API or ...), is there a better way to create Functions in a more structured
way? How can one share these functions within teams and other projects? It
would be great if something like GRPC services and API exists for functions
so other users can get into the dev cycle.

4- Is there any KNative example?

I hope these questions make sense.
Thanks,
-- 

Omid


Can we share state between different keys in the same window?

2021-07-19 Thread Sweta Kalakuntla
Hi,

I need to query the database(not a source but for additional information)
in ProcessFunction. I want to save the results in a state or some other way
so that I can use the data for other keys in the same window. What are my
options?

Thanks,
sweta


Re: Kafka Consumer Retries Failing

2021-07-19 Thread Piotr Nowojski
Ok, thanks for the update. Great that you managed to resolve this issue :)

Best,
Piotrek

pon., 19 lip 2021 o 17:13 Rahul Patwari 
napisał(a):

> Hi Piotrek,
>
> I was just about to update.
> You are right. The issue is because of a stalled task manager due to High
> Heap Usage. And the High Heap Usage is because of a Memory Leak in a
> library we are using.
>
> Thanks for your help.
>
> On Mon, Jul 19, 2021 at 8:31 PM Piotr Nowojski 
> wrote:
>
>> Thanks for the update.
>>
>> > Could the backpressure timeout and heartbeat timeout be because of
>> Heap Usage close to Max configured?
>>
>> Could be. This is one of the things I had in mind under overloaded in:
>>
>> > might be related to one another via some different deeper problem
>> (broken network environment, something being overloaded)
>>
>> You can easily diagnose it. Just attach a memory profiler or check gc
>> logs, just as you would normally do when debugging a non-Flink standalone
>> Java application.
>>
>> It can also be a symptom of a failing network environment. I would first
>> check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
>> caused those RPC timeouts. If that doesn't bring you closer to a solution I
>> would then check for the network environment in your cluster/cloud. Both of
>> those might be a reason behind your Kafka issues. Hard to tell. Definitely
>> you shouldn't have heartbeat timeouts in your cluster, so something IS
>> wrong with your setup.
>>
>> Best,
>> Piotrek
>>
>> czw., 15 lip 2021 o 17:17 Rahul Patwari 
>> napisał(a):
>>
>>> Thanks for the feedback Piotrek.
>>>
>>> We have observed the issue again today. As we are using Flink 1.11.1, I
>>> tried to check the backpressure of Kafka source tasks from the
>>> Jobmanager UI.
>>> The backpressure request was canceled due to Timeout and "No Data" was
>>> displayed in UI. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Invocation of public abstract
>>> java.util.concurrent.CompletableFuture
>>> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
>>> timed out.
>>> at
>>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
>>> .
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
>>> after [15000 ms]. Message of type
>>> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
>>> reason for `AskTimeoutException` is that the recipient actor didn't send a
>>> reply.
>>> at
>>> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> .
>>>
>>> During this time, the heartbeat of one of the Taskmanager to the
>>> Jobmanager timed out. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>>> bead57c15b447eac08531693ec91edc4 timed out. at
>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
>>> ..
>>>
>>> Because of heartbeat timeout, there was an internal restart of Flink
>>> and the Kafka consumption rate recovered after the restart.
>>>
>>> Could the backpressure timeout and heartbeat timeout be because of Heap
>>> Usage close to Max configured?
>>>
>>> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Rahul,

 I would highly doubt that you are hitting the network bottleneck case.
 It would require either a broken environment/network or throughputs in
 orders of GB/second. More likely you are seeing empty input pool and you
 haven't checked the documentation [1]:

 > inPoolUsage - An estimate of the input buffers usage. (ignores
 LocalInputChannels)

 If local channels are backpressured, inPoolUsage will be 0. You can
 check downstream task's inputQueueLength or isBackPressured metrics.
 Besides that, I would highly recommend upgrading to Flink 1.13.x if you are
 investigating backpressure problems as described in the blog post.

 > 1. Can the backpressure Cause "DisconnectException", "Error Sending
 Fetch Request to node ..." and other Kafka Consumer logs mentioned above?

 No, I don't think it's possible. Those two might be related to one
 another via some different deeper problem (broken network environment,
 something being overloaded), but I don't see a way how one could cause the
 other.

 Piotrek

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service

 śr., 14 lip 2021 o 14:18 Rahul Patwari 
 napisał(a):

> Thanks, Piotrek.
>
> We have two Kafka sources. We are facing

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
THANKSS...

This was it!! I did:-

CTRL+SHIFT+A and typed "Reload All Maven Projects"

Building the project didn't result in errors. I don't think I could've resolved 
this...

Thanks again Yun!!!

From: Yun Gao 
Sent: Monday, July 19, 2021 5:23 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

It seems sometime IntelliJ does not works well for index, perhaps
you could choose mvn -> reimport project from the context menu,
if it still not work, perhaps you might try remove the .idea and .iml
file and re-open the project again.

Best,
Yun

--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 23:03
To:user@flink.apache.org ; Yun Gao 
Subject:Re: Apache Flink Kafka Connector not found Error

Hello Yun,
Many thanks for the reply...

For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

I get the following errors:

object connectors is not a member of package org.apache.flink.streaming
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


"mvn clean compile" works however... (Thanks...)

Do you know why IntelliJ doesn't see the import??

Best,
Taimoor


From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

I think it is right regarding the provided dependency and we need to use
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: 
https://github.com/sysarcher/flink-scala-tests

I want to I'm unable to use FlinkKafkaConsumer 
(link)
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website

  *
The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442
 ... Now, DataStream API (and the example) seem to work.
  *
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Hi Taimoor,

It seems sometime IntelliJ does not works well for index, perhaps
you could choose mvn -> reimport project from the context menu,
if it still not work, perhaps you might try remove the .idea and .iml
file and re-open the project again.

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 23:03
To:user@flink.apache.org ; Yun Gao 
Subject:Re: Apache Flink Kafka Connector not found Error

 Hello Yun,
 Many thanks for the reply...

 For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

 I get the following errors:

 object connectors is not a member of package org.apache.flink.streaming
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


 "mvn clean compile" works however... (Thanks...)

 Do you know why IntelliJ doesn't see the import??

 Best, 
Taimoor

From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error
Hi Taimoor,

I think it is right regarding the provided dependency and we need to use 
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having. 

My project is here:  https://github.com/sysarcher/flink-scala-tests
I want to I'm unable to use FlinkKafkaConsumer (link) which I want to try out.
I'm using IntelliJ Idea. The project was generated from the  tutorial on 
Flink's website
The First problem seemed to be the provided scope as suggested here:  
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.
The following link was used to generate the project:  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along


 This same question was originally posted here:  
https://stackoverflow.com/q/68437215/3760442

Thanks in advance 



Re: Kafka Consumer Retries Failing

2021-07-19 Thread Rahul Patwari
Hi Piotrek,

I was just about to update.
You are right. The issue is because of a stalled task manager due to High
Heap Usage. And the High Heap Usage is because of a Memory Leak in a
library we are using.

Thanks for your help.

On Mon, Jul 19, 2021 at 8:31 PM Piotr Nowojski  wrote:

> Thanks for the update.
>
> > Could the backpressure timeout and heartbeat timeout be because of Heap
> Usage close to Max configured?
>
> Could be. This is one of the things I had in mind under overloaded in:
>
> > might be related to one another via some different deeper problem
> (broken network environment, something being overloaded)
>
> You can easily diagnose it. Just attach a memory profiler or check gc
> logs, just as you would normally do when debugging a non-Flink standalone
> Java application.
>
> It can also be a symptom of a failing network environment. I would first
> check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
> caused those RPC timeouts. If that doesn't bring you closer to a solution I
> would then check for the network environment in your cluster/cloud. Both of
> those might be a reason behind your Kafka issues. Hard to tell. Definitely
> you shouldn't have heartbeat timeouts in your cluster, so something IS
> wrong with your setup.
>
> Best,
> Piotrek
>
> czw., 15 lip 2021 o 17:17 Rahul Patwari 
> napisał(a):
>
>> Thanks for the feedback Piotrek.
>>
>> We have observed the issue again today. As we are using Flink 1.11.1, I
>> tried to check the backpressure of Kafka source tasks from the
>> Jobmanager UI.
>> The backpressure request was canceled due to Timeout and "No Data" was
>> displayed in UI. Here are the respective logs:
>>
>> java.util.concurrent.TimeoutException: Invocation of public abstract
>> java.util.concurrent.CompletableFuture
>> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
>> timed out.
>> at
>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
>> .
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
>> after [15000 ms]. Message of type
>> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
>> reason for `AskTimeoutException` is that the recipient actor didn't send a
>> reply.
>> at
>> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> .
>>
>> During this time, the heartbeat of one of the Taskmanager to the
>> Jobmanager timed out. Here are the respective logs:
>>
>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>> bead57c15b447eac08531693ec91edc4 timed out. at
>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
>> ..
>>
>> Because of heartbeat timeout, there was an internal restart of Flink and
>> the Kafka consumption rate recovered after the restart.
>>
>> Could the backpressure timeout and heartbeat timeout be because of Heap
>> Usage close to Max configured?
>>
>> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Rahul,
>>>
>>> I would highly doubt that you are hitting the network bottleneck case.
>>> It would require either a broken environment/network or throughputs in
>>> orders of GB/second. More likely you are seeing empty input pool and you
>>> haven't checked the documentation [1]:
>>>
>>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>>> LocalInputChannels)
>>>
>>> If local channels are backpressured, inPoolUsage will be 0. You can
>>> check downstream task's inputQueueLength or isBackPressured metrics.
>>> Besides that, I would highly recommend upgrading to Flink 1.13.x if you are
>>> investigating backpressure problems as described in the blog post.
>>>
>>> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
>>> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>>
>>> No, I don't think it's possible. Those two might be related to one
>>> another via some different deeper problem (broken network environment,
>>> something being overloaded), but I don't see a way how one could cause the
>>> other.
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>>>
>>> śr., 14 lip 2021 o 14:18 Rahul Patwari 
>>> napisał(a):
>>>
 Thanks, Piotrek.

 We have two Kafka sources. We are facing this issue for both of them.
 The downstream tasks with the sources form two independent directed acyclic
 graphs, running within the same Streaming Job.

 For Example:
 source1 -> task1 -> sink1
 source2 -> task2 -> sink2

 There is bac

Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
Hello Yun,
Many thanks for the reply...

For some reason I'm not able to import org.apache.flink.streaming.connectors 
within the IDE...

I get the following errors:

object connectors is not a member of package org.apache.flink.streaming
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


"mvn clean compile" works however... (Thanks...)

Do you know why IntelliJ doesn't see the import??

Best,
Taimoor


From: Yun Gao 
Sent: Monday, July 19, 2021 3:25 PM
To: Taimoor Bhatti ; user@flink.apache.org 

Subject: Re: Apache Flink Kafka Connector not found Error

Hi Taimoor,

I think it is right regarding the provided dependency and we need to use
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun


--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: 
https://github.com/sysarcher/flink-scala-tests

I want to I'm unable to use FlinkKafkaConsumer 
(link)
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website

  *   The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442
 ... Now, DataStream API (and the example) seem to work.
  *   The current problem is that I'm not able to use the Kafka connector which 
I'm looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along


This same question was originally posted here: 
https://stackoverflow.com/q/68437215/3760442

Thanks in advance


Re: Kafka Consumer Retries Failing

2021-07-19 Thread Piotr Nowojski
Thanks for the update.

> Could the backpressure timeout and heartbeat timeout be because of Heap
Usage close to Max configured?

Could be. This is one of the things I had in mind under overloaded in:

> might be related to one another via some different deeper problem (broken
network environment, something being overloaded)

You can easily diagnose it. Just attach a memory profiler or check gc logs,
just as you would normally do when debugging a non-Flink standalone Java
application.

It can also be a symptom of a failing network environment. I would first
check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
caused those RPC timeouts. If that doesn't bring you closer to a solution I
would then check for the network environment in your cluster/cloud. Both of
those might be a reason behind your Kafka issues. Hard to tell. Definitely
you shouldn't have heartbeat timeouts in your cluster, so something IS
wrong with your setup.

Best,
Piotrek

czw., 15 lip 2021 o 17:17 Rahul Patwari 
napisał(a):

> Thanks for the feedback Piotrek.
>
> We have observed the issue again today. As we are using Flink 1.11.1, I
> tried to check the backpressure of Kafka source tasks from the
> Jobmanager UI.
> The backpressure request was canceled due to Timeout and "No Data" was
> displayed in UI. Here are the respective logs:
>
> java.util.concurrent.TimeoutException: Invocation of public abstract
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
> timed out.
> at
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
> .
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
> after [15000 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> .
>
> During this time, the heartbeat of one of the Taskmanager to the
> Jobmanager timed out. Here are the respective logs:
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> bead57c15b447eac08531693ec91edc4 timed out. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
> ..
>
> Because of heartbeat timeout, there was an internal restart of Flink and
> the Kafka consumption rate recovered after the restart.
>
> Could the backpressure timeout and heartbeat timeout be because of Heap
> Usage close to Max configured?
>
> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
> wrote:
>
>> Hi Rahul,
>>
>> I would highly doubt that you are hitting the network bottleneck case. It
>> would require either a broken environment/network or throughputs in orders
>> of GB/second. More likely you are seeing empty input pool and you haven't
>> checked the documentation [1]:
>>
>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>> LocalInputChannels)
>>
>> If local channels are backpressured, inPoolUsage will be 0. You can check
>> downstream task's inputQueueLength or isBackPressured metrics. Besides
>> that, I would highly recommend upgrading to Flink 1.13.x if you are
>> investigating backpressure problems as described in the blog post.
>>
>> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
>> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>
>> No, I don't think it's possible. Those two might be related to one
>> another via some different deeper problem (broken network environment,
>> something being overloaded), but I don't see a way how one could cause the
>> other.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>>
>> śr., 14 lip 2021 o 14:18 Rahul Patwari 
>> napisał(a):
>>
>>> Thanks, Piotrek.
>>>
>>> We have two Kafka sources. We are facing this issue for both of them.
>>> The downstream tasks with the sources form two independent directed acyclic
>>> graphs, running within the same Streaming Job.
>>>
>>> For Example:
>>> source1 -> task1 -> sink1
>>> source2 -> task2 -> sink2
>>>
>>> There is backpressure in both sources. Verified using the
>>> "isBackPressured" metric.
>>> For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of
>>> immediate downstream task is ~ 0. I think we are observing the rare case
>>> mentioned at the end in [1].
>>>
>>> I have a couple of questions:
>>>
>>>1. Can the backpressure Cause "DisconnectException", "Error Sending
>>>Fetch Request to node ..." and ot

Re: Subpar performance of temporal joins with RocksDB backend

2021-07-19 Thread Adrian Bednarz
Thanks Maciej, I think this has helped a bit. We are now at 2k/3k eps on a
single node. Now, I just wonder if this isn't too slow for a single node
and such a simple query.

On Sat, Jul 10, 2021 at 9:28 AM Maciej Bryński  wrote:

> Could you please set 2 configuration options:
> - state.backend.rocksdb.predefined-options =
> SPINNING_DISK_OPTIMIZED_HIGH_MEM
> - state.backend.rocksdb.memory.partitioned-index-filters = true
>
> Regards,
> Maciek
>
> sob., 10 lip 2021 o 08:54 Adrian Bednarz 
> napisał(a):
> >
> > I didn’t tweak any RocksDB knobs. The only thing we did was to increase
> managed memory to 12gb which was supposed to help RocksDB according to the
> documentation. The rest stays at the defaults. Incremental checkpointing
> was enabled as well but it made no difference in performance if we disabled
> it.
> >
> > On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
> >>
> >> Hi Adrian,
> >> Could you share your state backend configuration ?
> >>
> >> Regards,
> >> Maciek
> >>
> >> pt., 9 lip 2021 o 19:09 Adrian Bednarz 
> napisał(a):
> >> >
> >> > Hello,
> >> >
> >> > We are experimenting with lookup joins in Flink 1.13.0.
> Unfortunately, we unexpectedly hit significant performance degradation when
> changing the state backend to RocksDB.
> >> >
> >> > We performed tests with two tables: fact table TXN and dimension
> table CUSTOMER with the following schemas:
> >> >
> >> > TXN:
> >> >  |-- PROD_ID: BIGINT
> >> >  |-- CUST_ID: BIGINT
> >> >  |-- TYPE: BIGINT
> >> >  |-- AMOUNT: BIGINT
> >> >  |-- ITEMS: BIGINT
> >> >  |-- TS: TIMESTAMP(3) **rowtime**
> >> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >> >
> >> > CUSTOMER:
> >> >  |-- ID: BIGINT
> >> >  |-- STATE: BIGINT
> >> >  |-- AGE: BIGINT
> >> >  |-- SCORE: DOUBLE
> >> >  |-- PRIMARY KEY: ID
> >> >
> >> > And the following query:
> >> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS
> OF t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL
> '1' SECOND)
> >> >
> >> > In our catalog, we reconfigured the customer table so that the
> watermark is set to infinity on that side of the join. We generate data in
> a round robin fashion (except for timestamp that grows with a step of 1 ms).
> >> >
> >> > We performed our experiments on a single c5.4xlarge machine with heap
> and managed memory size set to 12gb with a blackhole sink. With 2 000 000
> fact records and 100 000 dimension records, a job with heap backend
> finishes in 5 seconds whereas RocksDB executes in 1h 24m. For 400 000
> dimension records it doesn't grow significantly but goes up to 1h 36m (the
> job processes more records after all).
> >> >
> >> > We also checked what would happen if we reduced the amount of
> customer ids to 1. Our expectation was that RocksDB will not offload
> anything to disk anymore so the performance should be comparable with heap
> backend. It was executed in 10 minutes.
> >> >
> >> > Is this something anybody experienced or something to be expected? Of
> course, we assumed RocksDB to perform slower but 300 eps is below our
> expectations.
> >> >
> >> > Thanks,
> >> > Adrian
> >>
> >>
> >>
> >> --
> >> Maciek Bryński
>
>
>
> --
> Maciek Bryński
>


Re: Set job specific resources in one StreamTableEnvironment

2021-07-19 Thread Yun Gao
Hi Paul,

For parallelism, it should be able to be set with 
`table.exec.resource.default-parallelism` [1] ,
and an example to set the parameter is at the first several paragraph. 

But Regarding the total process memory, I think it should be only set in the 
cluster level since 
it is per-cluster option: the TM should use the option on startup, before the 
job is submitted.

Best,
Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-resource-default-parallelism




--
From:Paul Lam 
Send Time:2021 Jul. 16 (Fri.) 12:21
To:user 
Subject:Set job specific resources in one StreamTableEnvironment

Hi,
I’m reusing the same StreamTableEnvironment to submit multiple table/sql jobs 
to a session cluster, 
but I couldn’t find a proper way to specify job resources for each job (like 
parallelism and total process 
memory), and they all uses the cluster default. 

I have considered overriding resources specs of all nodes in the StreamGraph, 
but it’s problematic 
because some nodes have a parallelism limit (e.g. can’t be greater than 1).
I think I might be missing something and there should be a better way to do 
this. Please give me some 
pointers. Thanks a lot!
Best,
Paul Lam



Re: Apache Flink Kafka Connector not found Error

2021-07-19 Thread Yun Gao
Hi Taimoor,

I think it is right regarding the provided dependency and we need to use 
manually included them in the classpath via the IDEA options.

And regarding the FlinkKafkaConsumer issue, I tried locally and it seems
it could work after adding the import ? Namely 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Best,
Yun



--
From:Taimoor Bhatti 
Send Time:2021 Jul. 19 (Mon.) 18:43
To:user@flink.apache.org 
Subject:Apache Flink Kafka Connector not found Error

   I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having. 

My project is here:  https://github.com/sysarcher/flink-scala-tests
I want to I'm unable to use  FlinkKafkaConsumer (link) which I want to try out.
I'm using IntelliJ Idea. The project was generated from the  tutorial on 
Flink's website 
The First problem seemed to be the provided scope as suggested here:  
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
The current problem is that I'm not able to use the Kafka connector which I'm 
looking to try out.
The following link was used to generate the project:  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along
 


 This same question was originally posted here:  
https://stackoverflow.com/q/68437215/3760442

Thanks in advance 


Re: Delay data elements in pipeline by X minutes

2021-07-19 Thread Jan Lukavský
I don't want to speak for Apache Flink - I'm using it via Apache Beam 
only - but generally speaking, each key will have to be held in state up 
to some moment when it can be garbage collected. This moment is defined 
(at least in the Apache Beam case) as the timestamp of end of window + 
allowed lateness. So, in the case of global window, it is (practically) 
forever in future, yes.


You can clean the state manually, though. If you would use the UUID (or 
similar) approach, then you would set a timer for the 15 minutes 
(relative) interval and then after you emit the data, you can clear the 
timer and the value state, which should clear the complete state of the 
window (please someone correct me if I'm wrong).


Alternative approach would be to use session windows and a 
GroupByKey-like operation, which would hold and emit element at the end 
of the session, which is exactly what you need. The state of the session 
window will be cleared in this case as well.


 Jan

On 7/19/21 2:00 PM, Dario Heinisch wrote:


Hey Jan,

No it isn't a logical constraint. Reason is there are different kind 
of users, some who pay for live data while other want a cheaper 
version but where the data is delayed.


But what happens if I add a random key ( lets say a uuid ) isn't that 
bad for performance? Then for every Object that is being processed I 
would have a state which is only being used once but I assume Flink 
wouldn't clean that state up, wouldn't it? What happens to the 
ValueState? Is that still being kept in memory? Because I thought that 
for every key Flink encounters it would keep a state.


But I think this could be solved with a TTL: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl, 
guess I will test that at some point this week! :)


For reference, this would be the code:

[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))

public abstract class Timestamper { public abstract long executedAt(); }

public class DelayedProcessor extends 
KeyedProcessFunction implements ResultTypeQueryable {


    private final String stateName;
    private final Class clazz;

    private ValueState state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class clazz) {
    this.stateName = stateName;
    this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {

    StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

    ValueStateDescriptor desc = new 
ValueStateDescriptor<>(stateName, clazz);

    desc.enableTimeToLive(ttlConfig);
    state = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(T t, Context ctx, Collector 
collector) throws Exception {

    this.state.update(t);

    long now = System.currentTimeMillis();

    long timeout = (now + TIMEOUT) - t.executedAt();

    ctx.timerService().registerEventTimeTimer(timeout);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

    out.collect(this.state.value());
    }

    @Override
    public TypeInformation getProducedType() {
    return TypeInformation.of(clazz);
    }
}


Best regards,

Dario



On 18.07.21 19:12, Jan Lukavský wrote:


Hi Dario,

out of curiosity, could you briefly describe the driving use-case? 
What is the (logical) constraint, that drives the requirement? I'd 
guess, that it could be related to waiting for some (external) 
condition? Or maybe related to late data? I think that there might be 
better approaches, than (unconditionally) delay data in pipeline. On 
the other hand, if that is really the best approach, then adding a 
random key to create a keyed stream should work in all cases, right?


 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:


Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink 
& one kafka sink.


So I can just process the data in real time and insert them in the 
DB. Then I would just select the latest row where created_at >= 
NOW() - interval '15 minutes' and for any kafka consumer I would 
just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which 
are not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from 
delaying sink writes) can be solved via spark streaming. Then maybe 
spark streaming with a micro-batch of 15 mins would help.




On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisc

Re: Delay data elements in pipeline by X minutes

2021-07-19 Thread Dario Heinisch

Hey Jan,

No it isn't a logical constraint. Reason is there are different kind of 
users, some who pay for live data while other want a cheaper version but 
where the data is delayed.


But what happens if I add a random key ( lets say a uuid ) isn't that 
bad for performance? Then for every Object that is being processed I 
would have a state which is only being used once but I assume Flink 
wouldn't clean that state up, wouldn't it? What happens to the 
ValueState? Is that still being kept in memory? Because I thought that 
for every key Flink encounters it would keep a state.


But I think this could be solved with a TTL: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl, 
guess I will test that at some point this week! :)


For reference, this would be the code:

[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))

public abstract class Timestamper { public abstract long executedAt(); }

public class DelayedProcessor extends 
KeyedProcessFunction implements ResultTypeQueryable {


    private final String stateName;
    private final Class clazz;

    private ValueState state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class clazz) {
    this.stateName = stateName;
    this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {

    StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

    ValueStateDescriptor desc = new 
ValueStateDescriptor<>(stateName, clazz);

    desc.enableTimeToLive(ttlConfig);
    state = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(T t, Context ctx, Collector 
collector) throws Exception {

    this.state.update(t);

    long now = System.currentTimeMillis();

    long timeout = (now + TIMEOUT) - t.executedAt();

    ctx.timerService().registerEventTimeTimer(timeout);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

    out.collect(this.state.value());
    }

    @Override
    public TypeInformation getProducedType() {
    return TypeInformation.of(clazz);
    }
}


Best regards,

Dario



On 18.07.21 19:12, Jan Lukavský wrote:


Hi Dario,

out of curiosity, could you briefly describe the driving use-case? 
What is the (logical) constraint, that drives the requirement? I'd 
guess, that it could be related to waiting for some (external) 
condition? Or maybe related to late data? I think that there might be 
better approaches, than (unconditionally) delay data in pipeline. On 
the other hand, if that is really the best approach, then adding a 
random key to create a keyed stream should work in all cases, right?


 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:


Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & 
one kafka sink.


So I can just process the data in real time and insert them in the 
DB. Then I would just select the latest row where created_at >= NOW() 
- interval '15 minutes' and for any kafka consumer I would just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which 
are not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from delaying 
sink writes) can be solved via spark streaming. Then maybe spark 
streaming with a micro-batch of 15 mins would help.




On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch 
mailto:dario.heini...@gmail.com>> wrote:


Hey there,

Hope all is well!

I would like to delay the time by 15minutes before my data
arrives at my
sinks:

stream()
.map()
[]
.
.print()

I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:

public abstract class Timestamper {
 public abstract long executedAt();
}

public class DelayedProcessor extends
ProcessFunction {

 private final String stateName;
 private final Class clazz;

 // TODO: Should we do ListState as this is being preferred for
serialization
 //  or should we do Value but this may impact
serialization.
 private ListState state;

 private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

 public DelayedProcessor(String stateName, Class clazz) {
 this.stateName = stateName;
 this.clazz = clazz;
 }

 @Override
 public void open(C

Apache Flink Kafka Connector not found Error

2021-07-19 Thread Taimoor Bhatti
I'm having some trouble with using the Flink DataStream API with the Kafka 
Connector. There don't seem to be great resources on the internet which can 
explain the issue I'm having.


My project is here: https://github.com/sysarcher/flink-scala-tests

I want to I'm unable to use FlinkKafkaConsumer 
(link)
 which I want to try out.

I'm using IntelliJ Idea. The project was generated from the tutorial on Flink's 
website

  *   The First problem seemed to be the provided scope as suggested here: 
https://stackoverflow.com/a/63667067/3760442 ... Now, DataStream API (and the 
example) seem to work.
  *   The current problem is that I'm not able to use the Kafka connector which 
I'm looking to try out.

The following link was used to generate the project: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/#how-to-follow-along


This same question was originally posted here: 
https://stackoverflow.com/q/68437215/3760442

Thanks in advance


Datastream api implementation of a third party pyflink connector

2021-07-19 Thread Wang, Zhongle
Hi,

I'm working on a pyflink datastream connector for Pravega and wish to use a 
datasource other than Kafka.

Currently the Kafka connector for the python datastream api is implemented 
using a ` get_gateway` function which creates a binding to java in ` 
FlinkKafkaConsumer`.

So if I want to create a `FlinkPrevegaReader` that consumes other datasource 
like the Pravega, is it recommended to do in the same way? (we have a java 
reader/consumer implementation)
Or the gateway thing might be changed in the future?

PS: In this 
post(https://stackoverflow.com/questions/65009292/is-there-a-kinesis-connector-for-pyflink)
 Xingbo suggested that the Kinesis and other connectors will be added soon, but 
I'm not sure whether it uses the same technique mentioned above.

Thanks,
Zhongle Wang