CoProcessFunction processElement1 and processElement2

2020-10-31 Thread Tan, Min
Hi,

Can someone confirm that the processElement1 and processElement2 are run 
sequentially?  not  in paradelle.

Thank you very much in advance.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

latency related to the checkpointing mode EXACTLY ONCE

2021-02-18 Thread Tan, Min
Hi,

We use the checkpointing mode EXACTLY ONCE for some of our flink jobs.

I wonder how the checkpoint configurations specially its checkpoint interval 
are related to the end to end latency.

We need to setup read_commit true for the kafak consumers.

Does this lead a latency from one flink job is greater than that of checkpoint 
interval?

Thank you very much for your help in advance.

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: latency related to the checkpointing mode EXACTLY ONCE

2021-02-19 Thread Tan, Min
Many thanks for your quick response.

The config read_commit for the kafka consumers is required by the exactly once 
(EOS)?
No exactly once if we read un committed messages?

Regards,
Min

From: Chesnay Schepler 
Sent: Thursday, February 18, 2021 8:27 PM
To: Tan, Min ; user 
Subject: [External] Re: latency related to the checkpointing mode EXACTLY ONCE

Yes, if you are only reading committed data than it will take least the 
checkpoint interval for the data to be available to downstream consumers.

On 2/18/2021 6:17 PM, Tan, Min wrote:
Hi,

We use the checkpointing mode EXACTLY ONCE for some of our flink jobs.

I wonder how the checkpoint configurations specially its checkpoint interval 
are related to the end to end latency.

We need to setup read_commit true for the kafak consumers.

Does this lead a latency from one flink job is greater than that of checkpoint 
interval?

Thank you very much for your help in advance.

Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Reading Flink states from svaepoint uning State Processor API

2021-05-31 Thread Tan, Min
Hi,

I am using Flink 1.10.1 and try to read the flink states from a savepoint using 
Flink state processor API.
It works well when state types are the normal Java type or Java POJOs.

When Avro generated Java classes are used as the state type, it does not read 
any states anymore.

Are any additional customer serializers required in this situation?

Regards,
Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Write Kafka message header using FlinkKafkaProducer

2021-06-21 Thread Tan, Min
Hi,

I would like to add some meta data in the headers of kafka messages using 
FlinkKakfkaProducer.
I googled a bit and have not found an example.
Which Flink Serialization Schema should I use? Any suggestions?

Thank you very much for your help in advance.

Regards,
Min


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

RE: State Processor API and existing state

2021-07-07 Thread Tan, Min
Hi,

I have followed the steps below in restarting a Flink job with newly modified 
savepoints.

I can re start a job with new savepoints as long as the Flink states are 
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I 
have the following exceptions.

Any ideas? Do I need to redefine any serializers?

Thank you very much for your help in advance.
Regards,

Min


-Flink
 exceptions---
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136)
at 
org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteSt

RE: State Processor API and existing state

2021-07-08 Thread Tan, Min
Many thanks for your prompt reply.

Yes. They are the same operators.
What I did is just modifying the content of POJO .e.g., doubling amount fields.

I am not able to send the production code, but I will do a separate mocked 
project to reproduce the issue. Send you the mocked code later.

Regards,
Min

From: JING ZHANG 
Sent: 08 July 2021 04:45
To: Tan, Min 
Cc: Marco Villalobos ; user 
Subject: [External] Re: State Processor API and existing state

Hi min,
Is the POJO state in an existed operator or a new added operator?
BTW, that would be great if you would like to give the code to reproduce the 
exception. I need more debug to find out the reason based on the code.


Tan, Min mailto:min@ubs.com>> 于2021年7月8日周四 上午2:56写道:
Hi,

I have followed the steps below in restarting a Flink job with newly modified 
savepoints.

I can re start a job with new savepoints as long as the Flink states are 
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I 
have the following exceptions.

Any ideas? Do I need to redefine any serializers?

Thank you very much for your help in advance.
Regards,

Min


-Flink
 exceptions---
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.