Re: Updating Scala package names while preserving state
My conclusions. First, I think it would be good to clarify the background. The class for which I changed the package/namespace is a POJO class which is part of the applications state. According to the official Flink documentation on state evolution: Class name of the POJO type cannot change, including the namespace of the class. So that is quite clear. I also investigated using the state-processor-api to create a new Savepoint with updated namespace for the class, but that is not trivial, so decided to give up on that since the state of the application in question was not considered that important. If using the State-processor-api for handling a namespace change that would require both the moved and the original class to be on the java class path when running the state processor, to enabled both de-serializing the state (old namespace) and serializing the new state (new namespace). So it could have been done that way I guess. I did not find any other option for migrating the state due to a namespace/package name change. Performing text replace with sed does not work. On 7 Feb 2023, at 12:03, Thomas Eckestad mailto:thomas.eckes...@niradynamics.se>> wrote: Hi, I would like to change the package name of our Scala code from com.company.foo.something to com.company.bar.something while preserving the state. How can I make a Savepoint from an application built with com.company.foo.something and make that Savepoint compatible with new code built from com.company.bar.something? In a Savepoint directory from one of our Flink jobs. Doing: egrep com\.company\.bar produces a lot of hits. Could it be expected to work with just using sed to replace the strings? Or are there binary, non-text data, as well that needs to be updated? We are currently on FLink 1.13.6. Thanks, Thomas Thomas Eckestad Systems Engineer Development RSI NIRA Dynamics AB Wallenbergs gata 4 58330 Link?ping, Sweden Mobile: +46 701 447 279 thomas.eckes...@niradynamics.se<mailto:thomas.eckes...@niradynamics.se> www.niradynamics.se
Updating Scala package names while preserving state
Hi, I would like to change the package name of our Scala code from com.company.foo.something to com.company.bar.something while preserving the state. How can I make a Savepoint from an application built with com.company.foo.something and make that Savepoint compatible with new code built from com.company.bar.something? In a Savepoint directory from one of our Flink jobs. Doing: egrep com\.company\.bar produces a lot of hits. Could it be expected to work with just using sed to replace the strings? Or are there binary, non-text data, as well that needs to be updated? We are currently on FLink 1.13.6. Thanks, Thomas Thomas Eckestad Systems Engineer Development RSI NIRA Dynamics AB Wallenbergs gata 4 58330 Link?ping, Sweden Mobile: +46 701 447 279 thomas.eckes...@niradynamics.se www.niradynamics.se
Re: Stop vs Cancel with savepoint
OK, thank you for validating my thoughts =) I created https://issues.apache.org/jira/browse/FLINK-21666# Thanks, Thomas On 3 Mar 2021, at 22:02, Chesnay Schepler mailto:ches...@apache.org>> wrote: Your understanding of cancel vs stop(-with-savepoint) is correct. I agree that we should update the REST API documentation and have a section outlining the problems with cancel-with-savepoint. Would you like to open a ticket yourself? On 3/3/2021 11:16 AM, Thomas Eckestad wrote: Hi! Cancel with savepoint is marked as deprecated in the cli-documentation. It is not marked as deprecated in the REST-API documentation though? Is that a mistake? At least some recommendation regarding stop vs cancel would be appropriate to include in the API doc, or? As I understand, stop will cancel each operator in the job-DAG bottom-up in a gracefull manner. Conceptually meaning, first cancel the sources, then, when the operators directly downstream to the sources have drained all pending input, those will be canceled as well. This continues until the sinks are done as well. Or, maybe more to the point, the checkpoint barrier triggered for the savepoint will not be followed by any more input data, the sources will stop consuming new data untill the savepoint is complete and the job exits. Is the above understanding correct? In that case, for some streaming jobs without exactly-once sinks, cancel with savepoint might trigger duplication. Which should be OK of course since the job needs to handle a restart anyway, but it might be beneficial to not generate duplicated output for this specific use case if there is a choice where the alternatives have the same cost implementation wise... Is my understanding of cancel vs stop correct? If not what is the real practical difference between stop and cancel with savepoint? To me it feels like cancel with save point should be deprecated in both the rest API and the cli and also there should be a text that explains why it is deprecated and why usage of it is discouraged, or? Thanks, Thomas Thomas Eckestad Systems Engineer Road Perception NIRA Dynamics AB Wallenbergs gata 4 58330 Link?ping, Sweden Mobile: +46 738 453 937 thomas.eckes...@niradynamics.se<mailto:thomas.eckes...@niradynamics.se> www.niradynamics.se
Stop vs Cancel with savepoint
Hi! Cancel with savepoint is marked as deprecated in the cli-documentation. It is not marked as deprecated in the REST-API documentation though? Is that a mistake? At least some recommendation regarding stop vs cancel would be appropriate to include in the API doc, or? As I understand, stop will cancel each operator in the job-DAG bottom-up in a gracefull manner. Conceptually meaning, first cancel the sources, then, when the operators directly downstream to the sources have drained all pending input, those will be canceled as well. This continues until the sinks are done as well. Or, maybe more to the point, the checkpoint barrier triggered for the savepoint will not be followed by any more input data, the sources will stop consuming new data untill the savepoint is complete and the job exits. Is the above understanding correct? In that case, for some streaming jobs without exactly-once sinks, cancel with savepoint might trigger duplication. Which should be OK of course since the job needs to handle a restart anyway, but it might be beneficial to not generate duplicated output for this specific use case if there is a choice where the alternatives have the same cost implementation wise... Is my understanding of cancel vs stop correct? If not what is the real practical difference between stop and cancel with savepoint? To me it feels like cancel with save point should be deprecated in both the rest API and the cli and also there should be a text that explains why it is deprecated and why usage of it is discouraged, or? Thanks, Thomas Thomas Eckestad Systems Engineer Road Perception NIRA Dynamics AB Wallenbergs gata 4 58330 Link?ping, Sweden Mobile: +46 738 453 937 thomas.eckes...@niradynamics.se www.niradynamics.se
Re: Strange behaviour when using RMQSource in Flink 1.11.2
Hi Andrey, Thank you for your response. I created https://issues.apache.org/jira/browse/FLINK-20244. Best Regards, Thomas From: Andrey Zagrebin Sent: Thursday, November 19, 2020 8:41 To: Thomas Eckestad Cc: user@flink.apache.org Subject: Re: Strange behaviour when using RMQSource in Flink 1.11.2 Hi Thomas, I am not an expert on RMQSource connector but your concerns look valid. Could you file a Jira issue in Flink issue tracker? [1] I cannot immediately refer to a committer who could help with this but let's hope that the issue gets attention. If you want to contribute an improvement for this in Flink, you can write your suggestion there as well and once there is positive feedback from a committer, a github PR can be opened. Best, Andrey [1] https://issues.apache.org/jira/projects/FLINK<https://urldefense.com/v3/__https://issues.apache.org/jira/projects/FLINK__;!!BFCLnRDDbM3FOmw!poiK1UqzheHq_Ti3rz0PpJHO__Mq5lAT3h-GayGfv2A9uEJiGtUMYn09o799YXM8i6faow$> On Wed, Nov 18, 2020 at 3:49 PM Thomas Eckestad mailto:thomas.eckes...@verisure.com>> wrote: Hi, we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the queue in question (and we have enabled checkpointing). The following behavior by the RMQSource seems strange to us. When a job is restarted from a checkpoint and there are unacked messages on the RabbitMQ queue for messages processed in the previous checkpoint interval, those messages will stay unacked until the job either finishes or is restarted again. When the connection to RabbitMQ is later closed (the job finished or is restarted), the unacked messages will be requeued for resend and sent when the next connection is established. When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete). Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ. I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK. The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add(): . . . if (!addId(correlationId)) { // we have already processed this message return false; } } sessionIds.add(deliveryTag); . . . Directly related to the above I also notice that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2): 2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during disposal of stream operator. com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details). I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2): 2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.Forgivin
Strange behaviour when using RMQSource in Flink 1.11.2
-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at java.lang.Thread.run(Unknown Source) [?:?] I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is included in the Job-jar: less | egrep 'AMQP\$Connection\$CloseOk\$Builder' -rw 2.0 fat 818 bl defN 20-Nov-11 16:17 com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class So, to sum up. It looks like there is a bug regarding ACK:s when using correlation IDs. This will break the exactly-once guarantee of the RMQSource since unacked messages will be requeued after re-connect to RMQ and thus might be processed more than once. Also, the clean-up logic of the RMQSource seems buggy. Does my reasoning make sense to you? Best Regards, Thomas Eckestad
Two Kubernetes clusters and one Flink cluster?
Hi, I'm working with two separate Kubernetes clusters located in different regions (hosted in proprietary data centers), the distance between the regions introduces a pretty high (~50ms) latency between the clusters, so communication should not go cross-site unless necessary. I would like to use one Flink cluster spanning both K8s clusters. I use Rocksdb as state backend. What would be an appropriate Flink setup for a K8s setup such as this? The current stream source is RabbitMQ (not hosted within K8s). My first naive thought is to use a active-passive setup with one separate job manager in each cluster and separate task managers for each job manager. I.e. flink would only be active processing data in one cluster at a time. I'm guessing that ZooKeeper would be necessary to enable leader election between the two clusters? Any ideas or real world experience with similar setups? Does this sound like a viable thing to do or is it just not meant to be done this way? Thanks, Thomas
Re: Running single Flink job in a job cluster, problem starting JobManager
I have investigated this further: During normal operation, without Spring Boot, OptimizerPlanEnvironment.ProgramAbortException is thrown by Flink from StreamPlanEnvironment::execute():70. This is caught by PackagedProgram::callMainMethod():537, where it is re-thrown as an Error exception. This Error is caught in OptimizerPlanEnvironment::getOptimizedPlan():88, which checks if the optimizerPlan field != null, if so it returns the value of that field else it re-throws, i.e. since the optimizerPlan IS != null the exception stops there and the job is executed as expected. I.e. the Flink control flow is relying on throwing (and handling) ProgramAbortException. When using Spring Boot the execution fails due to the OptimizerPlanEnvironment.ProgramAbortException mentioned above. In that case Spring logic gets between PackagedProgram::callMainMethod() and the invocation of the method where the Flink ExecutionEnvironment is built and executed. Spring will catch any Throwable and interpret it as a failure and exit. I guess when deploying the Spring Boot Flink job to a session-cluster, which I mentioned above works fine, the Flink job does not rely on passing exceptions between Flink bootstrap-code and the Flink job? /Thomas From: Chesnay Schepler Sent: Sunday, February 10, 2019 10:30:54 AM To: Thomas Eckestad; user@flink.apache.org Subject: Re: Running single Flink job in a job cluster, problem starting JobManager I'm afraid we haven't had much experience with Spring Boot Flink applications. It is indeed strange that the job ends up using a StreamPlanEnvironment. As a debugging step I would look into all calls to ExecutionEnviroment#initializeContextEnvironment(). This is how specific execution environments are injected into (Stream)ExecutionEnvironment#getEnvironment(). On 08.02.2019 15:17, Thomas Eckestad wrote: Hi again, when removing Spring Boot from the application it works. I would really like to mix Spring Boot and Flink. It does work with Spring Boot when submitting jobs to a session cluster, as stated before. /Thomas ____ From: Thomas Eckestad <mailto:thomas.eckes...@verisure.com> Sent: Friday, February 8, 2019 12:14 PM To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Running single Flink job in a job cluster, problem starting JobManager Hi, I am trying to run a flink job cluster in K8s. As a first step I have created a Docker image according to: https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.7_flink-2Dcontainer_docker_README.md&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=CXkwM8WcThTDrIFvV0U_OQL5QmZ-Qn2g1lQSSNaAd1k&e=> When I try to run the image: docker run --name=flink-job-manager flink-image:latest job-cluster --job-classname com.foo.bar.FlinkTest -Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 -Dblob.server.port=6124 -Dqueryable-state.server.ports=6125 the execution fails with the following exception: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'MyFlinkJob': Invocation of init method failed; nested exception is org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498) at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846) at org.springframework.context.support.AbstractApplicationContext.finishBea
Re: Running single Flink job in a job cluster, problem starting JobManager
Hi again, when removing Spring Boot from the application it works. I would really like to mix Spring Boot and Flink. It does work with Spring Boot when submitting jobs to a session cluster, as stated before. /Thomas From: Thomas Eckestad Sent: Friday, February 8, 2019 12:14 PM To: user@flink.apache.org Subject: Running single Flink job in a job cluster, problem starting JobManager Hi, I am trying to run a flink job cluster in K8s. As a first step I have created a Docker image according to: https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md When I try to run the image: docker run --name=flink-job-manager flink-image:latest job-cluster --job-classname com.foo.bar.FlinkTest -Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 -Dblob.server.port=6124 -Dqueryable-state.server.ports=6125 the execution fails with the following exception: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'MyFlinkJob': Invocation of init method failed; nested exception is org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498) at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) at com.foo.bar.FlinkTest.main(FlinkTest.java:10) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70) at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363) at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307) at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136) ... 22 more I can successfully run the same job.jar on a session cluster (start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing something obvious? At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob"); It feels strange that the
Running single Flink job in a job cluster, problem starting JobManager
StreamExecutionEnvironment { I am using https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz (I have also tried 1.6.3 and 1.7.0, no difference in behavior). * docker --version -> Docker version 1.13.1 * uname -a -> Linux SEOSTL0069.SEC.INTRA 4.20.4-200.fc29.x86_64 #1 SMP Wed Jan 23 16:11:28 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux Thank you, Thomas Eckestad