Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2020-12-27 Thread Daniel Peled
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if
this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by
the second job with a latency that depends on the *first* job *checkpoint
interval*.

We are able to read the message using the kafka tool or using another kafka
consumer, but NOT with a flink kafka consumer that again depends on the
checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction
commit time of the first job ?

BR,
Danny


Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread Daniel Peled
Thank you for your answers.
We ended up changing the isolation level to read_uncommitted and it solves
the latency problem between the two jobs understanding that we may have
duplications in the second job when the first job fails and roll back.

בתאריך יום ג׳, 5 בינו׳ 2021 ב-15:23 מאת 赵一旦 :

> I think what you need is
> http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .
>
> The isolation.level setting's default value is read_uncommitted. So,
> maybe you do not use the default setting?
>
> 赵一旦  于2021年1月5日周二 下午9:10写道:
>
>> I do not have this problem, so I guess it is related with the config of
>> your kafka producer and consumer, and maybe kafka topic properties or kafka
>> server properties also.
>>
>> Arvid Heise  于2021年1月5日周二 下午6:47写道:
>>
>>> Hi Daniel,
>>>
>>> Flink commits transactions on checkpoints while Kafka Streams/connect
>>> usually commits on record. This is the typical tradeoff between Throughput
>>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>>> comparable latency to Kafka Streams.
>>>
>>> If you have two exactly once jobs, the second job may only read data
>>> that has been committed (not dirty as Chesnay said). If the second job were
>>> to consume data that is uncommitted, it will result in duplicates, in case
>>> the first job fails and rolls back.
>>>
>>> You can configure the read behavior with isolation.level. If you want
>>> to implement exactly once behavior, you also need to set that level in your
>>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>>> you want to go exactly once [2].
>>>
>>> If you really want low latency, please also double-check if you really
>>> need exactly once.
>>>
>>> [1]
>>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>>> [2]
>>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>>
>>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
>>> wrote:
>>>
>>>> I don't particularly know the our Kafka connector, but it sounds like a
>>>> matter of whether a given consumer does dirty reads.
>>>> Flink does not, whereas the other tools you're using do.
>>>>
>>>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>>>
>>>> Hello,
>>>>
>>>> We have 2 flink jobs that communicate with each other through a KAFKA
>>>> topic.
>>>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>>>
>>>> We have seen the following behaviour and we want to make sure and ask
>>>> if this is the expected behaviour or maybe it is a bug.
>>>>
>>>> When the first job produces a message to KAFKA, the message is consumed
>>>>  by the second job with a latency that depends on the *first* job 
>>>> *checkpoint
>>>> interval*.
>>>>
>>>> We are able to read the message using the kafka tool or using another
>>>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>>>> the checkpoint interval of the first job.
>>>>
>>>> How come the consumer of the second job depends on the producer
>>>> transaction commit time of the first job ?
>>>>
>>>> BR,
>>>> Danny
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> <https://www.google.com/maps/search/Invalidenstrasse+115,+10115+Berlin,+Germany?entry=gmail&source=g>
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>


Cannot access state from a empty taskmanager - using kubernetes

2021-01-28 Thread Daniel Peled
Hi,

We have followed the instructions in the following link ""Enabling
Queryable State" with kubernetes:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html#enabling-queryable-state



*When the replicas of the task-manager pods is 1 we get NO errorBut when
the replicas is greater than 1 for example 7 we get the following error
when trying to access flink state:We think it might be related to jira
issue *FLINK-10225  *that
has been abandoned*

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed
request 1.
 Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=stopJobValueState of
job=d5d14923157f5c3d3c4b2e1b7c02a942. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
 Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=stopJobValueState of
job=d5d14923157f5c3d3c4b2e1b7c02a942. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.

at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:247)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:164)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:131)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:121)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)


BR,
Danny


Flink 1.14.2/3 - KafkaSink vs deprecated FlinkKafkaProducer

2022-01-31 Thread Daniel Peled
Hi everyone,

Has anyone encountered any problem with the new KafkaSink that is used in
Flink 1.14 ?

When running our jobs, the *sinks* of some of our jobs are stuck in
initializing for more than an hour.
The only thing that helps is deleting the topic *__transaction_state*.
After deleting this topic, all sinks are immediately released and are in
running status.
The problem is quite random each time in a different job.
There are times that all jobs start running without any problems.

Unfortunately we had to go back to the deprecated FlinkKafkaProducer.

*We didn't have these problems with Flink 1.13 and FlinkKafkaProducer*

Any ideas on what to do ?
What are we doing wrong?

BR,
Daniel


Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-04 Thread Daniel Peled
Hello,

We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!

The problematic code seems to be the following for-loop in
getTransactionalProducer() method:

*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*

private FlinkKafkaInternalProducer
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized
(empty) transactions





* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
  String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
  producer = getOrCreateTransactionalProducer(transactionalId);
}*
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}


Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 2,
The loop will be executed for 2 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny


Re: Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-07 Thread Daniel Peled
Good morning,

Any updates/progress on this issue ?

BR,
Danny

‫בתאריך יום א׳, 4 בפבר׳ 2024 ב-13:20 מאת ‪Daniel Peled‬‏ <‪
daniel.peled.w...@gmail.com‬‏>:‬

> Hello,
>
> We have noticed that when we add a *new kafka sink* operator to the
> graph, *and start from the last save point*, the operator is 100% busy
> for several minutes and *even 1/2-1 hour* !!!
>
> The problematic code seems to be the following for-loop in
> getTransactionalProducer() method:
>
>
> *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*
>
> private FlinkKafkaInternalProducer
> getTransactionalProducer(long checkpointId) {
> checkState(
> checkpointId > lastCheckpointId,
> "Expected %s > %s",
> checkpointId,
> lastCheckpointId);
> FlinkKafkaInternalProducer producer = null;
> // in case checkpoints have been aborted, Flink would create
> non-consecutive transaction ids
> // this loop ensures that all gaps are filled with initialized
> (empty) transactions
>
>
>
>
>
> * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
> String transactionalId =
> TransactionalIdFactory.buildTransactionalId(
> transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
>   producer = getOrCreateTransactionalProducer(transactionalId);
> }*
> this.lastCheckpointId = checkpointId;
> assert producer != null;
> LOG.info("Created new transactional producer {}",
> producer.getTransactionalId());
> return producer;
> }
>
>
> Since we added a new sink operator the lastCheckpointId is 1,
> And if for example the checkpointId is 2,
> The loop will be executed for 2 times !!!
>
>
> We have several questions:
> 1. Is this behaviour expected ?
> 2. Are we doing something wrong ?
> 3. Is there a way to avoid this behavior ?
>
> Best Regards,
> Danny
>


Entering a busy loop when adding a new sink to the graph + checkpoint enabled

2024-02-20 Thread Daniel Peled
Hello Guys,

Can someone please assist us regarding the following issue ?

We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!

The problematic code seems to be the following for-loop in
getTransactionalProducer() method:

*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*

private FlinkKafkaInternalProducer
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized
(empty) transactions





* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
  String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
  producer = getOrCreateTransactionalProducer(transactionalId);
}*
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}


Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 2,
The loop will be executed for 2 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny