CoProcessFunction processElement1 and processElement2
Hi, Can someone confirm that the processElement1 and processElement2 are run sequentially? not in paradelle. Thank you very much in advance. Regards, Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
latency related to the checkpointing mode EXACTLY ONCE
Hi, We use the checkpointing mode EXACTLY ONCE for some of our flink jobs. I wonder how the checkpoint configurations specially its checkpoint interval are related to the end to end latency. We need to setup read_commit true for the kafak consumers. Does this lead a latency from one flink job is greater than that of checkpoint interval? Thank you very much for your help in advance. Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
RE: latency related to the checkpointing mode EXACTLY ONCE
Many thanks for your quick response. The config read_commit for the kafka consumers is required by the exactly once (EOS)? No exactly once if we read un committed messages? Regards, Min From: Chesnay Schepler Sent: Thursday, February 18, 2021 8:27 PM To: Tan, Min ; user Subject: [External] Re: latency related to the checkpointing mode EXACTLY ONCE Yes, if you are only reading committed data than it will take least the checkpoint interval for the data to be available to downstream consumers. On 2/18/2021 6:17 PM, Tan, Min wrote: Hi, We use the checkpointing mode EXACTLY ONCE for some of our flink jobs. I wonder how the checkpoint configurations specially its checkpoint interval are related to the end to end latency. We need to setup read_commit true for the kafak consumers. Does this lead a latency from one flink job is greater than that of checkpoint interval? Thank you very much for your help in advance. Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Reading Flink states from svaepoint uning State Processor API
Hi, I am using Flink 1.10.1 and try to read the flink states from a savepoint using Flink state processor API. It works well when state types are the normal Java type or Java POJOs. When Avro generated Java classes are used as the state type, it does not read any states anymore. Are any additional customer serializers required in this situation? Regards, Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Write Kafka message header using FlinkKafkaProducer
Hi, I would like to add some meta data in the headers of kafka messages using FlinkKakfkaProducer. I googled a bit and have not found an example. Which Flink Serialization Schema should I use? Any suggestions? Thank you very much for your help in advance. Regards, Min E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
RE: State Processor API and existing state
Hi, I have followed the steps below in restarting a Flink job with newly modified savepoints. I can re start a job with new savepoints as long as the Flink states are expressed in Java primitives. When the flink states are expressed in a POJO, my job does not get restarted. I have the following exceptions. Any ideas? Do I need to redefine any serializers? Thank you very much for your help in advance. Regards, Min -Flink exceptions--- 2021-07-07 19:02:58 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136) at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315) at org.apache.flink.runtime.state.heap.CopyOnWriteSt
RE: State Processor API and existing state
Many thanks for your prompt reply. Yes. They are the same operators. What I did is just modifying the content of POJO .e.g., doubling amount fields. I am not able to send the production code, but I will do a separate mocked project to reproduce the issue. Send you the mocked code later. Regards, Min From: JING ZHANG Sent: 08 July 2021 04:45 To: Tan, Min Cc: Marco Villalobos ; user Subject: [External] Re: State Processor API and existing state Hi min, Is the POJO state in an existed operator or a new added operator? BTW, that would be great if you would like to give the code to reproduce the exception. I need more debug to find out the reason based on the code. Tan, Min mailto:min@ubs.com>> 于2021年7月8日周四 上午2:56写道: Hi, I have followed the steps below in restarting a Flink job with newly modified savepoints. I can re start a job with new savepoints as long as the Flink states are expressed in Java primitives. When the flink states are expressed in a POJO, my job does not get restarted. I have the following exceptions. Any ideas? Do I need to redefine any serializers? Thank you very much for your help in advance. Regards, Min -Flink exceptions--- 2021-07-07 19:02:58 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.