Entering a busy loop when adding a new sink to the graph

2024-02-11 Thread nick toker
Hello,

We have noticed that when we add a *new kafka sink* operator to the graph, *and
start from the last save point*, the operator is 100% busy for several
minutes and *even 1/2-1 hour* !!!

The problematic code seems to be the following for-loop in
getTransactionalProducer() method:

*org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer*

private FlinkKafkaInternalProducer
getTransactionalProducer(long checkpointId) {
checkState(
checkpointId > lastCheckpointId,
"Expected %s > %s",
checkpointId,
lastCheckpointId);
FlinkKafkaInternalProducer producer = null;
// in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
// this loop ensures that all gaps are filled with initialized
(empty) transactions





* for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
  String transactionalId =
TransactionalIdFactory.buildTransactionalId(
transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id);
  producer = getOrCreateTransactionalProducer(transactionalId);
}*
this.lastCheckpointId = checkpointId;
assert producer != null;
LOG.info("Created new transactional producer {}",
producer.getTransactionalId());
return producer;
}


Since we added a new sink operator the lastCheckpointId is 1,
And if for example the checkpointId is 2,
The loop will be executed for 2 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
nick


Re: Re: failed when job graph change

2024-01-24 Thread nick toker
Hi
i adding a new sink that was not exists in the graph
1. stop with save point
2. run the the new graph with the new sink operator ( from save point)

in this case the job stuck in initializing forever because cant complete
transaction ( on the new operator , kafka topic)

i dont understand how change the uid will help its a new operator with new
uid

nick

‫בתאריך יום ד׳, 24 בינו׳ 2024 ב-19:30 מאת ‪Feng Jin‬‏ <‪
jinfeng1...@gmail.com‬‏>:‬

> Hi nick
>
> If you want to modify the sink operator , I think you can modify the uid
> of the operator to avoid restoring the state that does not belong to it.
>
>
> Best,
> Feng
>
>
> On Thu, Jan 25, 2024 at 1:19 AM nick toker 
> wrote:
>
>> hi
>>
>> i didn't found anything in the log
>> but i found that it happened when i add a new sink operator
>> and because i work with checkpoints  the flink can't finish the
>> transaction ( the new topic in kafka not part of the transaction before i
>> added the new sink operator)
>>
>> so i must cancel the job to make it work
>>
>> How can I solve this issue?
>>
>>
>> nick
>>
>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>> ‬‏>:‬
>>
>>> Hi,
>>> Can you attach the log about the exception when job failed?
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> 在 2023-12-04 15:56:04,"nick toker"  写道:
>>>
>>> Hi
>>>
>>> restart the job it's ok and i do that , but i must cancel the job and
>>> submit a new one and i dont want the data from the state
>>> forget to  mention that i use the parameter "-allowNonRestoredState"
>>>
>>>
>>> my steps:
>>> 1. stop the job with savepoint
>>> 2. run the updated job ( update job graph) from savepoint
>>>
>>> expect it to run
>>>
>>> currently the result is the the job fail
>>>
>>> nick
>>>
>>>
>>>
>>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>>> ‬‏>:‬
>>>
>>>> Hi, nick.
>>>>
>>>> > using savepoint i must cancel the job to be able run the new graph
>>>>
>>>> Do you mean that you need cancel and start the job using the new flink
>>>> job graph in 1.17.1,
>>>> and in the past, it was able to make the changes to the new operator
>>>> effective without restarting the job?
>>>>
>>>> I think in order for the new job graph to take effect, it is necessary
>>>> to restart the job.
>>>>
>>>> --
>>>> Best!
>>>> Xuyang
>>>>
>>>>
>>>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>>>
>>>> Hi
>>>>
>>>> when i add or remove an operator in the job graph , using savepoint i
>>>> must cancel the job to be able run the new graph
>>>>
>>>> e.g. by adding or removing operator (like new sink target)
>>>> it was working in the past
>>>> i using flink 1.17.1
>>>>
>>>> 1. is it a known bug? if so when planned to be fix
>>>>
>>>> 2. do i need to do something to make it work?
>>>>
>>>>
>>>> nick
>>>>
>>>>


Re: Re: failed when job graph change

2024-01-24 Thread nick toker
hi

i didn't found anything in the log
but i found that it happened when i add a new sink operator
and because i work with checkpoints  the flink can't finish the transaction
( the new topic in kafka not part of the transaction before i added the new
sink operator)

so i must cancel the job to make it work

How can I solve this issue?


nick

‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com‬‏>:‬

> Hi,
> Can you attach the log about the exception when job failed?
>
> --
> Best!
> Xuyang
>
>
> 在 2023-12-04 15:56:04,"nick toker"  写道:
>
> Hi
>
> restart the job it's ok and i do that , but i must cancel the job and
> submit a new one and i dont want the data from the state
> forget to  mention that i use the parameter "-allowNonRestoredState"
>
>
> my steps:
> 1. stop the job with savepoint
> 2. run the updated job ( update job graph) from savepoint
>
> expect it to run
>
> currently the result is the the job fail
>
> nick
>
>
>
> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
> ‬‏>:‬
>
>> Hi, nick.
>>
>> > using savepoint i must cancel the job to be able run the new graph
>>
>> Do you mean that you need cancel and start the job using the new flink
>> job graph in 1.17.1,
>> and in the past, it was able to make the changes to the new operator
>> effective without restarting the job?
>>
>> I think in order for the new job graph to take effect, it is necessary to
>> restart the job.
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>
>> Hi
>>
>> when i add or remove an operator in the job graph , using savepoint i
>> must cancel the job to be able run the new graph
>>
>> e.g. by adding or removing operator (like new sink target)
>> it was working in the past
>> i using flink 1.17.1
>>
>> 1. is it a known bug? if so when planned to be fix
>>
>> 2. do i need to do something to make it work?
>>
>>
>> nick
>>
>>


Re: failed when job graph change

2023-12-03 Thread nick toker
Hi

restart the job it's ok and i do that , but i must cancel the job and
submit a new one and i dont want the data from the state
forget to  mention that i use the parameter "-allowNonRestoredState"


my steps:
1. stop the job with savepoint
2. run the updated job ( update job graph) from savepoint

expect it to run

currently the result is the the job fail

nick



‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com‬‏>:‬

> Hi, nick.
>
> > using savepoint i must cancel the job to be able run the new graph
>
> Do you mean that you need cancel and start the job using the new flink job
> graph in 1.17.1,
> and in the past, it was able to make the changes to the new operator
> effective without restarting the job?
>
> I think in order for the new job graph to take effect, it is necessary to
> restart the job.
>
> --
> Best!
> Xuyang
>
>
> At 2023-12-03 21:49:23, "nick toker"  wrote:
>
> Hi
>
> when i add or remove an operator in the job graph , using savepoint i must
> cancel the job to be able run the new graph
>
> e.g. by adding or removing operator (like new sink target)
> it was working in the past
> i using flink 1.17.1
>
> 1. is it a known bug? if so when planned to be fix
>
> 2. do i need to do something to make it work?
>
>
> nick
>
>


failed when job graph change

2023-12-03 Thread nick toker
Hi

when i add or remove an operator in the job graph , using savepoint i must
cancel the job to be able run the new graph

e.g. by adding or removing operator (like new sink target)
it was working in the past
i using flink 1.17.1

1. is it a known bug? if so when planned to be fix

2. do i need to do something to make it work?


nick


kafka duplicate messages

2023-09-07 Thread nick toker
Hi

i am configured with exactly ones
i see that flink producer send duplicate messages   ( sometime few copies)

that consumed latter only ones by other application,

How can  I avoid duplications ?

regards'
nick


Re: kafka sink

2023-07-30 Thread nick toker
e{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-00-fcc29062a521/66a3e58e-0f8c-4c09-9fc3-04be0d3388dc',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@5c241b41.
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure [] - Creating operator state backend for
> SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring
> with state from alternative (1/1).
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink.
> KafkaCommitter [] - Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.
> CommitRequestImpl@47835f95) because it's in an invalid state. Most likely
> the transaction has been aborted for some reason. Please check the Kafka
> logs for more details.
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state.
> TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
> remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
> OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
> StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-00-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@157a2949.
>
>
> ‫בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת ‪Shammon FY‬‏ <‪zjur...@gmail.com
> ‬‏>:‬
>
>> Hi nick,
>>
>> Is there any error log? That may help to analyze the root cause.
>>
>> On Sun, Jul 23, 2023 at 9:53 PM nick toker 
>> wrote:
>>
>>> hello
>>>
>>>
>>> we replaced deprecated kafka producer with kafka sink
>>> and from time to time when we submit a job he stack for 5 min in
>>> inisazaing ( on sink operators)
>>> we verify the the transaction prefix is unique
>>>
>>> it's not happened when we use kafka producer
>>>
>>> What can be the reason?
>>>
>>>


Re: kafka sink

2023-07-24 Thread nick toker
 org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@5c241b41.
2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring
with state from alternative (1/1).
2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink.
KafkaCommitter [] - Unable to commit transaction
(org.apache.flink.streaming.runtime.operators.sink.committables.
CommitRequestImpl@47835f95) because it's in an invalid state. Most likely
the transaction has been aborted for some reason. Please check the Kafka
logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer
attempted a transactional operation in an invalid state.
2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state.
TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName=
'file:/opt/flink/shared/savepoints/savepoint-00-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b',
dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
checkpointedSize=291} from job manager and local state alternatives [] from
local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@157a2949.


‫בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת ‪Shammon FY‬‏ <‪zjur...@gmail.com
‬‏>:‬

> Hi nick,
>
> Is there any error log? That may help to analyze the root cause.
>
> On Sun, Jul 23, 2023 at 9:53 PM nick toker 
> wrote:
>
>> hello
>>
>>
>> we replaced deprecated kafka producer with kafka sink
>> and from time to time when we submit a job he stack for 5 min in
>> inisazaing ( on sink operators)
>> we verify the the transaction prefix is unique
>>
>> it's not happened when we use kafka producer
>>
>> What can be the reason?
>>
>>


kafka sink

2023-07-23 Thread nick toker
hello


we replaced deprecated kafka producer with kafka sink
and from time to time when we submit a job he stack for 5 min in inisazaing
( on sink operators)
we verify the the transaction prefix is unique

it's not happened when we use kafka producer

What can be the reason?


Re: Re: checkpoint delay consume message

2020-12-26 Thread nick toker
Hi,
Hi,  We think  we are using the default values unless we are missing
something.
So this doesn't explain the problem we are facing.
Could you please tell us how to choose synchronous or asynchronous
checkpoints just to be sure we are using the correct configuration ?
BR,
Nick

‫בתאריך יום ה׳, 24 בדצמ׳ 2020 ב-3:36 מאת ‪lec ssmi‬‏ <‪
shicheng31...@gmail.com‬‏>:‬

> Checkpoint can be done synchronously and  asynchronously,  the latter is
> the default .
> If you chooese  the synchronous way , it may cause this problem.
>
> nick toker  于2020年12月23日周三 下午3:53写道:
>
>> Hi Yun,
>>
>> Sorry but we didn't understand your questions.
>> The delay we are experiencing is on the *read* side.
>> The message is written to kafka topic and consumed by flink with a delay
>> that depends on the checkpoints interval
>> When we disabled the checkpoints the messages are immediately consumed
>> We use the EXACTLY-ONCE semantic.
>>
>> Please advise.
>> BR,
>> Nick
>>
>> ‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪
>> yungao...@aliyun.com‬‏>:‬
>>
>>> Hi nick,
>>>
>>>Sorry I initially think that the data is also write into Kafka with
>>> flink . So it could be ensured that there is no delay in the write side,
>>> right ? Does the delay in the read side keeps existing ?
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> --Original Mail --
>>> *Sender:*nick toker 
>>> *Send Date:*Tue Dec 22 01:43:50 2020
>>> *Recipients:*Yun Gao 
>>> *CC:*user 
>>> *Subject:*Re: checkpoint delay consume message
>>>
>>>> hi
>>>>
>>>> i am confused
>>>>
>>>> the delay in in the source when reading message not on the sink
>>>>
>>>> nick
>>>>
>>>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>>>> yungao...@aliyun.com‬‏>:‬
>>>>
>>>>>  Hi Nick,
>>>>>
>>>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>>>> transactions, and only commit the transaction on checkpoint complete to
>>>>> ensure end-to-end exactly-once. A detailed description could be find in 
>>>>> [1]
>>>>>
>>>>>
>>>>> Best,
>>>>>  Yun
>>>>>
>>>>>
>>>>> [1]
>>>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>>>
>>>>> --
>>>>> Sender:nick toker
>>>>> Date:2020/12/21 23:52:34
>>>>> Recipient:user
>>>>> Theme:checkpoint delay consume message
>>>>>
>>>>> Hello,
>>>>>
>>>>> We noticed the following behavior:
>>>>> If we enable the flink checkpoints, we saw that there is a delay
>>>>> between the time we write a message to the KAFKA topic and the time the
>>>>> flink kafka connector consumes this message.
>>>>> The delay is closely related to checkpointInterval and/or
>>>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>>>> message from KAFKA will be one of these parameters.
>>>>>
>>>>> Could you please advise how we can remove/control this delay?
>>>>>
>>>>> we use flink 1.11.2
>>>>>
>>>>> BR
>>>>> nick
>>>>>
>>>>>


Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

2020-12-26 Thread nick toker
Hi

any idea?
is it a bug?


regards'
nick

‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
nick.toker@gmail.com‬‏>:‬

> Hello
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters
>
> If we disable the checkpoints, the message is immediately consumed
> We work with the EXACTLY_ONCE semantic
> Please note that we inject only one message
>
> Could you please advise how we can remove/control this delay?
>
> Please see the attached code of AbstractFetcher and KafkaFetcher (as a png
> file)
> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
> Could this explain the behaviour ?
>
>
> BR
>


Re: Re: checkpoint delay consume message

2020-12-22 Thread nick toker
Hi Yun,

Sorry but we didn't understand your questions.
The delay we are experiencing is on the *read* side.
The message is written to kafka topic and consumed by flink with a delay
that depends on the checkpoints interval
When we disabled the checkpoints the messages are immediately consumed
We use the EXACTLY-ONCE semantic.

Please advise.
BR,
Nick

‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

> Hi nick,
>
>Sorry I initially think that the data is also write into Kafka with
> flink . So it could be ensured that there is no delay in the write side,
> right ? Does the delay in the read side keeps existing ?
>
> Best,
>  Yun
>
>
>
> --Original Mail --
> *Sender:*nick toker 
> *Send Date:*Tue Dec 22 01:43:50 2020
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: checkpoint delay consume message
>
>> hi
>>
>> i am confused
>>
>> the delay in in the source when reading message not on the sink
>>
>> nick
>>
>> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
>> yungao...@aliyun.com‬‏>:‬
>>
>>>  Hi Nick,
>>>
>>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>>> transactions, and only commit the transaction on checkpoint complete to
>>> ensure end-to-end exactly-once. A detailed description could be find in [1]
>>>
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>> [1]
>>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>
>>> --
>>> Sender:nick toker
>>> Date:2020/12/21 23:52:34
>>> Recipient:user
>>> Theme:checkpoint delay consume message
>>>
>>> Hello,
>>>
>>> We noticed the following behavior:
>>> If we enable the flink checkpoints, we saw that there is a delay between
>>> the time we write a message to the KAFKA topic and the time the flink kafka
>>> connector consumes this message.
>>> The delay is closely related to checkpointInterval and/or
>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>> message from KAFKA will be one of these parameters.
>>>
>>> Could you please advise how we can remove/control this delay?
>>>
>>> we use flink 1.11.2
>>>
>>> BR
>>> nick
>>>
>>>


Re: checkpoint delay consume message

2020-12-21 Thread nick toker
hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

>  Hi Nick,
>
> Are you using EXACTLY_ONCE semantics ? If so the sink would use
> transactions, and only commit the transaction on checkpoint complete to
> ensure end-to-end exactly-once. A detailed description could be find in [1]
>
>
> Best,
>  Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> --
> Sender:nick toker
> Date:2020/12/21 23:52:34
> Recipient:user
> Theme:checkpoint delay consume message
>
> Hello,
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters.
>
> Could you please advise how we can remove/control this delay?
>
> we use flink 1.11.2
>
> BR
> nick
>
>


checkpoint delay consume message

2020-12-21 Thread nick toker
Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between
the time we write a message to the KAFKA topic and the time the flink kafka
connector consumes this message.
The delay is closely related to checkpointInterval and/or
minPauseBetweenCheckpoints meening that the MAX delay when consuming a
message from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick


Re: [ANNOUNCE] Apache Flink 1.12.0 released

2020-12-10 Thread nick toker
Hi
first good job  and tank you

i don't find in docker hub the new version 1.12

when it will be there ?

nick

‫בתאריך יום ה׳, 10 בדצמ׳ 2020 ב-14:17 מאת ‪Robert Metzger‬‏ <‪
rmetz...@apache.org‬‏>:‬

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.0, which is the latest major release.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/12/10/release-1.12.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348263
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Dian & Robert
>
>


Re: Improved performance when using incremental checkpoints

2020-06-16 Thread nick toker
Hi,

We used both flink versions 1.9.1 and 1.10.1
We used rocksDB default configuration.
The streaming pipeline is very simple.

1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We used the same code with ValueState and observed the same results.


BR,

Nick


‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> It's really strange that performance could improve when checkpoint is
> enabled.
> In general, enable checkpoint might bring a bit performance downside to
> the whole job.
>
> Could you give more details e.g. Flink version, configurations of RocksDB
> and simple code which could reproduce this problem.
>
> Best
> Yun Tang
> ------
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:44
> *To:* user@flink.apache.org 
> *Subject:* Improved performance when using incremental checkpoints
>
> Hello,
>
> We are using RocksDB as the backend state.
> At first we didn't enable the checkpoints mechanism.
>
> We observed the following behaviour and we are wondering why ?
>
> When using the rocksDB *without* checkpoint the performance was very
> extremely bad.
> And when we enabled the checkpoint the performance was improved by a*
> factor of 10*.
>
> Could you please explain if this behaviour is expected ?
> Could you please explain why enabling the checkpoint significantly
> improves the performance ?
>
> BR,
> Nick
>


Re: MapState bad performance

2020-06-16 Thread nick toker
Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write
300MB and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>1. What is the Flink version, taskmanager memory, number of slots and
>RocksDB related configurations?
>2. Have you checked the IOPS, disk util for those machines which
>containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org 
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
> public void processElement(Map value, Context ctx, 
> Collector> out) throws Exception {
>
> if (testMapState.isEmpty()) {
>
> testMapState.putAll(value);
>
> out.collect(value);
>
> testMapState.clear();
> }
> }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>


Improved performance when using incremental checkpoints

2020-06-16 Thread nick toker
Hello,

We are using RocksDB as the backend state.
At first we didn't enable the checkpoints mechanism.

We observed the following behaviour and we are wondering why ?

When using the rocksDB *without* checkpoint the performance was very
extremely bad.
And when we enabled the checkpoint the performance was improved by a*
factor of 10*.

Could you please explain if this behaviour is expected ?
Could you please explain why enabling the checkpoint significantly improves
the performance ?

BR,
Nick


MapState bad performance

2020-06-16 Thread nick toker
Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something
wrong ?
Is this behaviour expected because flink  recommendations are to use
MapState and NOT ValueState ?

BR,
Nick


stack job on fail over

2019-11-26 Thread Nick Toker
Hi
i have a standalone cluster with 3 nodes  and rocksdb backend
when one task manager fails ( the process is being killed)
it takes very long time until the job is totally canceled and a new job is
resubmitted
i see that all slots on all nodes are being canceled except from the slots
of the dead
task manager , it takes about 30- 40 second for the job to totally shutdown.
is that something i can do to reduce this time or there is a plan for a fix
( if so when)?

regards,
nick