S3 schema for jar location?
When will Flink Operator support schemas other than `local` for application deployment jar files? I just tried flink operator 1.9 and it’s still not working with `s3` locations. If s3 is good for savepoints and checkpoints, why can’t the jar also be on s3? Thanks, Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Operator/Autoscaler/Autotuner tuning behavior question
Hello. I have some questions about memory autotuning in the Operator. 1. Does the autotuner try to upgrade the job with more memory allocated if it intercepts OutOfMemoryError? Say I initially provided too little memory for TM `resource` - will the job fail and stop on initializing or will the operator try to increase memory after it catches OutOfMemoryError? 2. When autoscaler scales a TM from, say, from 1 to 2 instances with higher parallelism, does it also double memory and CPU requested in each iteration of upscaling, or it would have already measured RAM requirements when it was collecting stats and request just what it thinks is needed? I see there’s a config option where it increases memory when scaling down so that the job doesn’t fail because fewer TMs are doing the job of what more TMs used to be doing, so I was wondering if the opposite is true when scaling up. 3. Will autotuner ever increase requested memory beyond what was initially requested in TM’s `resource` block in the Deployment CRD? Same for CPU?.. 4. Does the operator care about available resources in k8s, or it just makes an “optimistic” request and hopes it will be granted? What happens if it requests more than available? Keep retrying? Stay in pending state waiting for resources? Time out? Exit? Is there a rollback and retry with smaller amount of resources requested if a request with larger demands fails? I see there’s an option that, when enabled, can refresh information about available resources periodically, which should prevent or reduce inadvertent greedy requests. But what’s the strategy used by the operator if the request is already too large to handle? Thanks a lot! /Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Re: [External] Regarding java.lang.IllegalStateException
My guess it’s a major known issue. Need a workaround. https://issues.apache.org/jira/browse/FLINK-32212 /Maxim From: prashant parbhane Date: Tuesday, April 23, 2024 at 11:09 PM To: user@flink.apache.org Subject: [External] Regarding java.lang.IllegalStateException Hello, We have been facing this weird issue, where we are getting below exception and the job is getting restarted with new task managers. We are using flink 1.17. Same job works fine with lower number of task managers. (<10) java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68] new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336) at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1041) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source) Thanks, Prashant COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0
c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with Foreground propagation [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager Deployment to zero with 300 seconds timeout... [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Scaling JobManager Deployment to zero [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting JobManager Deployment with 298 seconds timeout... [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Deleting JobManager Deployment [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting Kubernetes HA metadata Any ideas? Thanks, Maxim From: Gyula Fóra Date: Friday, April 26, 2024 at 1:10 AM To: Maxim Senin Cc: Maxim Senin via user Subject: Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0 Hi Maxim! Regarding the status update error, it could be related to a problem that we have discovered recently with the Flink Operator HA. Where during a namespace change both leader and follower instances would start processing. It has been fixed in the current master by updating the JOSDK version to the one containing the fix. For details you can check: https://github.com/operator-framework/java-operator-sdk/issues/2341 https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d To resolve the issue (if it's caused by this), you could either cherry-pick the fix internally to the operator or reduce the replicas to 1 if you are using HA. Cheers, Gyula On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user mailto:user@flink.apache.org>> wrote: I have also seen this exception: o.a.f.k.o.o.JobStatusObserver [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305) at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) I can’t find any information on how to interpret this. Please advise.. Cheers, Maxim From: Maxim Senin via user mailto:user@flink.apache.org>> Date: Thursday, April 25, 2024 at 12:01 PM To: Maxim Senin via user mailto:user@flink.apache.org>> Subject: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0 Hi. I already asked before but never got an answer. My observation is that the operator, after collecting some stats, is trying to restart one of the deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, `upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by “scaling it to zero” (by setting replicas = 0 in the new generated config). However, the deployment never comes back up, apparently, due to exception: 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status handling. org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\&
Re: Regarding java.lang.IllegalStateException
We are also seeing something similar: 2024-04-26 16:30:44,401 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Power Consumption:power_consumption -> Ingest Power Consumption -> PopSysFields -> WindowingWatermarkPreCheck (1/1) (cb8c425b6463b1ade9b8359c0514386b_28bc590bb7896e1df191c306d7cb6d23_0_11) switched from DEPLOYING to FAILED on f-a9ad4438-cddf-512f-94c6-c5f921f66078-taskmanager-1-1 @ 10.111.164.30 (dataPort=42621). java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573] new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist-1.19.0.jar:1.19.0] at java.lang.Thread.run(Unknown Source) ~[?:?] 2024-04-26 16:30:44,402 INFO org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting job. java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573] new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist-1.19.0.jar:1.19.0] at java.lang.Thread.run(Unknown Source) ~[?:?] Flink 1.19, operator 1.9.0 . . . . . . . . . . . . . . . . . . . . . Maxim Senin Senior Backend Engineer COGILITY<http://cogility.com/> From: prashant parbhane Date: Tuesday, April 23, 2024 at 11:09 PM To: user@flink.apache.org Subject: Regarding java.lang.IllegalStateException Hello, We have been facing this weird issue, where we are getting below exception and the job is getting restarted with new task managers. We are using flink 1.17. Same job works fine with lower number of task managers. (<10) java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68] new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCac
Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0
Hi, Gyula. Thanks for the tips. All jobs are deployed in a single namespace, “flink”. Which replicas? The JM replicas are already 1, I tried with TM replicas set to 1, but same exception happens. We have only 1 instance of the operator (replicas=1) in this environment. The only workarounds I discovered is either a) disable autoscaling for the failing job (autoscaler scales the job to zero for “gracefully” stopping it and then never starts it) or b) some jobs that keep restarting can be fixed by disabling HA for that job And ` Cannot rescale the given pointwise partitioner.` is also still a mystery. Thanks, Maxim From: Gyula Fóra Date: Friday, April 26, 2024 at 1:10 AM To: Maxim Senin Cc: Maxim Senin via user Subject: Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0 Hi Maxim! Regarding the status update error, it could be related to a problem that we have discovered recently with the Flink Operator HA. Where during a namespace change both leader and follower instances would start processing. It has been fixed in the current master by updating the JOSDK version to the one containing the fix. For details you can check: https://github.com/operator-framework/java-operator-sdk/issues/2341 https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d To resolve the issue (if it's caused by this), you could either cherry-pick the fix internally to the operator or reduce the replicas to 1 if you are using HA. Cheers, Gyula On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user mailto:user@flink.apache.org>> wrote: I have also seen this exception: o.a.f.k.o.o.JobStatusObserver [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305) at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) I can’t find any information on how to interpret this. Please advise.. Cheers, Maxim From: Maxim Senin via user mailto:user@flink.apache.org>> Date: Thursday, April 25, 2024 at 12:01 PM To: Maxim Senin via user mailto:user@flink.apache.org>> Subject: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0 Hi. I already asked before but never got an answer. My observation is that the operator, after collecting some stats, is trying to restart one of the deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, `upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by “scaling it to zero” (by setting replicas = 0 in the new generated config). However, the deployment never comes back up, apparently, due to exception: 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status handling. org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_met
Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0
I have also seen this exception: o.a.f.k.o.o.JobStatusObserver [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305) at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) I can’t find any information on how to interpret this. Please advise.. Cheers, Maxim From: Maxim Senin via user Date: Thursday, April 25, 2024 at 12:01 PM To: Maxim Senin via user Subject: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0 Hi. I already asked before but never got an answer. My observation is that the operator, after collecting some stats, is trying to restart one of the deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, `upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by “scaling it to zero” (by setting replicas = 0 in the new generated config). However, the deployment never comes back up, apparently, due to exception: 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status handling. org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":… at org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161) at org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97) at org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64) at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event processing ExecutionScope{ resource id: Reso
Exception during autoscaling operation - Flink 1.18/Operator 1.8.0
Hi. I already asked before but never got an answer. My observation is that the operator, after collecting some stats, is trying to restart one of the deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, `upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by “scaling it to zero” (by setting replicas = 0 in the new generated config). However, the deployment never comes back up, apparently, due to exception: 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status handling. org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":… at org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161) at org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97) at org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64) at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher [ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event processing ExecutionScope{ resource id: ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, version: 50606957} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED", Caused by: org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status have been modified externally in version 50607043 Previous: {"jobStatus":{"jobName":"autoscaling test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED at org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161) at org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:63) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:279) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:156) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:171) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145) ... 13 more How to fix this? Why is the deployment not coming back up after this exception? Is there an configuration property to set a number of retires? Thanks, Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Job goes into FINISHED state after rescaling - link operator
Hi. My Flink Deployment is set to use savepoint for upgrades and for taking savepoint before stopping. When rescaling happens, for some reason it scales the JobManager to zero (“Scaling JobManager Deployment to zero with 300 seconds timeout”) and the job goes into FINISHED state. It doesn’t seem to be able to continue. Any ideas why is it deleting itself? The savepoints are stored on s3. I can restart the job from savepoint manually but at the next rescaling operation it deletes itself again. Thanks! Log: 2024-04-22 23:39:17,016 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Observing JobManager deployment. Previous status: DEPLOYING 2024-04-22 23:39:17,024 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] JobManager is being deployed 2024-04-22 23:39:17,045 o.a.f.a.JobAutoScalerImpl [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up autoscaling meta data 2024-04-22 23:39:17,046 o.a.f.k.o.s.AbstractFlinkService [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with Foreground propagation 2024-04-22 23:39:17,046 o.a.f.k.o.s.AbstractFlinkService [INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager Deployment to zero with 300 seconds timeout... 2024-04-22 23:39:18,941 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead 2024-04-22 23:39:18,982 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for e22861b1943d40ca6f5a40ae6332d42b could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,984 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for e503bd1c0fb6799d36f0c4b786e69fd9 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,985 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for de478b666343f04920f1cd3e6e65548c could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,987 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 0956ec3e5721a3bc416c208d690e220a could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,988 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 2a0e1e1304dc61d997d3e6f7025df9b3 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,989 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 09152f8c760b2503dd4174abf81781b6 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,991 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for cbe38acc7ac41cc91794215391eedc28 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,992 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for cc2707c5c5b5cdd319d28980a6ad99d0 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,994 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for d2fd710697779b81072031f8924b5967 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,995 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 9892bf1d00288ef951498dd18f18dd24 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,997 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for acf3064b984e12010bfeccbe4e28d9a5 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,998 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 8737aeacfef5f24708290203c9de47e1 could not be found. Either a legacy source or an idle source. Assuming no pending records. 2024-04-22 23:39:18,999 o.a.f.a.ScalingMetricCollector [WARN ][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 45aa9886e32677c3f13caa2aa54ec3ad could not be found. Either a legacy source or an idle source. Assuming no pending records.
Parallelism for auto-scaling, memory for auto-tuning - Flink operator
Hi. Does it make sense to specify `parallelism` for task managers or the `job`, and, similarly, to specify memory amount for the task managers, or it’s better to leave it to autoscaler and autotuner to pick the best values? How many times would the autoscaler need to restart task managers before it picks the right values? Does `pekko.ask.timeout` need to be sufficient for task managers to get into running state with all the restarts? Cheers, Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Re: Long blocking call in UserFunction triggers HA leader lost?
Hi Theo, We had a very similar problem with one of our spark streaming jobs. Best solution was to create a custom source having all external records in cache, periodically reading external data and comparing it to cache. All changed records were then broadcasted to task managers. We tried to implement background loading in separate thread, but this solution was more complicated, we needed to create shadow copy of cache and then quickly switch them. And with spark streaming there were additional problems. Hope this helps, Maxim.
Re: Flink streaming job logging reserves space
Hi Yang, Thanks for your advice, now I have a good reason to upgrade to 1.11. Regards, Maxim. On Tue, Aug 4, 2020 at 9:39 AM Yang Wang wrote: > AFAIK, there is no way to roll the *.out/err files except we hijack the > stdout/stderr in Flink code. However, it is a temporary hack. > > A good way is to write your logs to other separate files that could roll > via log4j. If you want to access them in the Flink webUI, > upgrade to the 1.11 version. Then you will find a "Log List" tab under > JobManager sidebar. > > > Best, > Yang > > Maxim Parkachov 于2020年8月4日周二 下午2:52写道: > >> Hi Yang, >> >> you are right. Since then, I looked for open files and found *.out/*.err >> files on that partition and as you mentioned they don't roll. >> I could implement a workaround to restart the streaming job every week or >> so, but I really don't want to go this way. >> >> I tried to forward logs to files and then I could roll them, but then I >> don't see logs in the GUI. >> >> So my question would be, how to make them roll ? >> >> Regards, >> Maxim. >> >> On Tue, Aug 4, 2020 at 4:48 AM Yang Wang wrote: >> >>> Hi Maxim, >>> >>> First, i want to confirm with you that do you have checked all the >>> "yarn.nodemanager.log-dirs". If you >>> could access the logs in Flink webUI, the log files(e.g. >>> taskmanager.log, taskmanager.out, taskmanager.err) >>> should exist. I suggest to double check the multiple log-dirs. >>> >>> Since the *.out/err files do not roll, if you print some user logs to >>> the stdout/stderr, the two files will increase >>> over time. >>> >>> When you stop the Flink application, Yarn will clean up all the jars and >>> logs, so you find that the disk space get back. >>> >>> >>> Best, >>> Yang >>> >>> Maxim Parkachov 于2020年7月30日周四 下午10:00写道: >>> >>>> Hi everyone, >>>> >>>> I have a strange issue with flink logging. I use pretty much standard >>>> log4 config, which is writing to standard output in order to see it in >>>> Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no >>>> problem. On the servers, where Flink YARN containers are running, there is >>>> disk quota on the partition where YARN normally creates logs. I see no >>>> specific files in the application_xx directory, but space on the disk is >>>> actually decreasing with time. After several weeks we eventually hit quota. >>>> It seems like some file or pipe is created but not closed, but still >>>> reserves the space. After I restart Flink job, space is >>>> immediately returned back. I'm sure that flink job is the problem, I have >>>> re-produces issue on a cluster where only 1 filnk job was running. Below is >>>> my log4 config. Any help or idea is appreciated. >>>> >>>> Thanks in advance, >>>> Maxim. >>>> --- >>>> # This affects logging for both user code and Flink >>>> log4j.rootLogger=INFO, file, stderr >>>> >>>> # Uncomment this if you want to _only_ change Flink's logging >>>> #log4j.logger.org.apache.flink=INFO >>>> >>>> # The following lines keep the log level of common libraries/connectors >>>> on >>>> # log level INFO. The root logger does not override this. You have to >>>> manually >>>> # change the log levels here. >>>> log4j.logger.akka=INFO >>>> log4j.logger.org.apache.kafka=INFO >>>> log4j.logger.org.apache.hadoop=INFO >>>> log4j.logger.org.apache.zookeeper=INFO >>>> >>>> # Log all infos in the given file >>>> log4j.appender.file=org.apache.log4j.FileAppender >>>> log4j.appender.file.file=${log.file} >>>> log4j.appender.file.append=false >>>> log4j.appender.file.layout=org.apache.log4j.PatternLayout >>>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd >>>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n >>>> >>>> # Suppress the irrelevant (wrong) warnings from the Netty channel >>>> handler >>>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, >>>> file >>>> >>>>
Re: Flink streaming job logging reserves space
Hi Yang, you are right. Since then, I looked for open files and found *.out/*.err files on that partition and as you mentioned they don't roll. I could implement a workaround to restart the streaming job every week or so, but I really don't want to go this way. I tried to forward logs to files and then I could roll them, but then I don't see logs in the GUI. So my question would be, how to make them roll ? Regards, Maxim. On Tue, Aug 4, 2020 at 4:48 AM Yang Wang wrote: > Hi Maxim, > > First, i want to confirm with you that do you have checked all the > "yarn.nodemanager.log-dirs". If you > could access the logs in Flink webUI, the log files(e.g. taskmanager.log, > taskmanager.out, taskmanager.err) > should exist. I suggest to double check the multiple log-dirs. > > Since the *.out/err files do not roll, if you print some user logs to the > stdout/stderr, the two files will increase > over time. > > When you stop the Flink application, Yarn will clean up all the jars and > logs, so you find that the disk space get back. > > > Best, > Yang > > Maxim Parkachov 于2020年7月30日周四 下午10:00写道: > >> Hi everyone, >> >> I have a strange issue with flink logging. I use pretty much standard >> log4 config, which is writing to standard output in order to see it in >> Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no >> problem. On the servers, where Flink YARN containers are running, there is >> disk quota on the partition where YARN normally creates logs. I see no >> specific files in the application_xx directory, but space on the disk is >> actually decreasing with time. After several weeks we eventually hit quota. >> It seems like some file or pipe is created but not closed, but still >> reserves the space. After I restart Flink job, space is >> immediately returned back. I'm sure that flink job is the problem, I have >> re-produces issue on a cluster where only 1 filnk job was running. Below is >> my log4 config. Any help or idea is appreciated. >> >> Thanks in advance, >> Maxim. >> --- >> # This affects logging for both user code and Flink >> log4j.rootLogger=INFO, file, stderr >> >> # Uncomment this if you want to _only_ change Flink's logging >> #log4j.logger.org.apache.flink=INFO >> >> # The following lines keep the log level of common libraries/connectors on >> # log level INFO. The root logger does not override this. You have to >> manually >> # change the log levels here. >> log4j.logger.akka=INFO >> log4j.logger.org.apache.kafka=INFO >> log4j.logger.org.apache.hadoop=INFO >> log4j.logger.org.apache.zookeeper=INFO >> >> # Log all infos in the given file >> log4j.appender.file=org.apache.log4j.FileAppender >> log4j.appender.file.file=${log.file} >> log4j.appender.file.append=false >> log4j.appender.file.layout=org.apache.log4j.PatternLayout >> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} >> %-5p %-60c %x - %m%n >> >> # Suppress the irrelevant (wrong) warnings from the Netty channel handler >> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, >> file >> >>
Flink streaming job logging reserves space
Hi everyone, I have a strange issue with flink logging. I use pretty much standard log4 config, which is writing to standard output in order to see it in Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no problem. On the servers, where Flink YARN containers are running, there is disk quota on the partition where YARN normally creates logs. I see no specific files in the application_xx directory, but space on the disk is actually decreasing with time. After several weeks we eventually hit quota. It seems like some file or pipe is created but not closed, but still reserves the space. After I restart Flink job, space is immediately returned back. I'm sure that flink job is the problem, I have re-produces issue on a cluster where only 1 filnk job was running. Below is my log4 config. Any help or idea is appreciated. Thanks in advance, Maxim. --- # This affects logging for both user code and Flink log4j.rootLogger=INFO, file, stderr # Uncomment this if you want to _only_ change Flink's logging #log4j.logger.org.apache.flink=INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
Re: Wait for cancellation event with CEP
Hi Till, thank you for very detailed answer, now it is absolutely clear. Regards, Maxim. On Thu, Apr 30, 2020 at 7:19 PM Till Rohrmann wrote: > Hi Maxim, > > I think your problem should be solvable with the CEP library: > > So what we are doing here is to define a pattern forward followed by any > cancel. Moreover we say that it must happen within 1 minute. If this does > not happen then we will see a timeout where we can create the follow-up > event. > > Cheers, > Till > >> >>
Wait for cancellation event with CEP
Hi everyone, I need to implement following functionality. There is a kafka topic where "forward" events are coming and in the same topic there are "cancel" events. For each "forward" event I need to wait 1 minute for possible "cancel" event. I can uniquely match both events. If "cancel" event comes within this minute I need to delete "forward" event, otherwise pass "forward" event further in another kafka topic. "Cancel" events older than 1 minute could be ignored. I was thinking to implement ProcessFunction with putting "forward" events in state with timer. If "cancel" event is coming I will delete "forward" event from state. My question: Is there more simple way to implement the same logic, possibly with CEP ? Thanks, Maxim.
Re: New kafka producer on each checkpoint
Hi Yun, thanks for the answer. I did now increased checkpoint interval, but still I don't understand reason for creating producer and re-connecting to to kafka broker each time. According to documentation: Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers per each FlinkKafkaProducer011 instance. One of each of those producers is used per one checkpoint. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer011 will throw an exception and will fail the whole application. Please configure max pool size and max number of concurrent checkpoints accordingly. I assumed that this is also true for post 011 producers as well. I expected to have 5 (default) producers created and used without re-instantiating producer each time. In my case checkpoint is so fast that I will never have concurrent checkpoints. Regards, Maxim. On Wed, Apr 8, 2020 at 4:52 AM Yun Tang wrote: > Hi Maxim > > If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE) > for flink kafka producer. It will create new producer when every new > checkpoint comes [1]. This is by design and from my point of view, the > checkpoint interval of 10 seconds might be a bit too often. In general I > think interval of 3 minutes should be enough. If you cannot offer the > source rewind time after failover, you could turn the interval more often. > > > [1] > https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871 > > Best > Yun Tang > -- > *From:* Maxim Parkachov > *Sent:* Monday, April 6, 2020 23:16 > *To:* user@flink.apache.org > *Subject:* New kafka producer on each checkpoint > > Hi everyone, > > I'm trying to test exactly once functionality with my job under production > load. The job is reading from kafka, using kafka timestamp as event time, > aggregates every minute and outputs to other kafka topic. I use checkpoint > interval 10 seconds. > > Everything seems to be working fine, but when I look to the log on INFO > level, I see that with each checkpoint, new kafka producer is created and > then closed again. > > 1. Is this how it is supposed to work ? > 2. Is checkpoint interval 10 second too often ? > > Thanks, > Maxim. >
New kafka producer on each checkpoint
Hi everyone, I'm trying to test exactly once functionality with my job under production load. The job is reading from kafka, using kafka timestamp as event time, aggregates every minute and outputs to other kafka topic. I use checkpoint interval 10 seconds. Everything seems to be working fine, but when I look to the log on INFO level, I see that with each checkpoint, new kafka producer is created and then closed again. 1. Is this how it is supposed to work ? 2. Is checkpoint interval 10 second too often ? Thanks, Maxim.
Re: [Flink 1.10] Classpath doesn't include custom files in lib/
Hi Yang, I've just tried your suggestions, but, unfortunately, in yarn per job mode it doesn't work, both commands return null. I double checked that file is shipped to yarn container, but I feel that it happens later in process. At the moment I'm reading file with File interface, instead of getting it as resource, which I do in local mode. Regards, Maxim. On Mon, Feb 17, 2020 at 3:03 PM Yang Wang wrote: > Hi Maxim, > > I have verified that the following two ways could both work. > > getClass().getClassLoader().getResource("lib/job.properties") > getClass().getClassLoader().getResource("job.properties") > > > Best, > Yang > > Maxim Parkachov 于2020年2月17日周一 下午6:47写道: > >> Hi Yang, >> >> thanks, this explains why classpath behavior changed, but now I struggle >> to >> understand how I could overwrite resource, which is already shipped in >> job jar. >> >> Before I had job.properties files in JAR in under >> resources/lib/job.properties >> for local development and deploying on cluster it was overwritten >> with environment specific settings in lib/job.properties of flink >> distribution. >> Now this doesn't seem to work. I'm using: >> >> getClass.getClassLoader.getResource("lib/job.properties") >> >> to get file. Could it be the problem ? >> >> Thanks, >> Maxim. >> >> On Mon, Feb 17, 2020 at 4:12 AM Yang Wang wrote: >> >>> Hi Maxim Parkachov, >>> >>> The users files also have been shipped to JobManager and TaskManager. >>> However, it >>> is not directly added to the classpath. Instead, the parent directory is >>> added to the >>> classpath. This changes are to make resource classloading work. You >>> could check more >>> information here[1]. >>> >>> >>> [1]. https://issues.apache.org/jira/browse/FLINK-13127 >>> >>> >>> Best, >>> Yang >>> >>> Maxim Parkachov 于2020年2月15日周六 上午12:58写道: >>> >>>> Hi everyone, >>>> >>>> I'm trying to run my job with flink 1.10 with YARN cluster per-job >>>> mode. In the previous versions all files in lib/ folder were automatically >>>> included in classpath. Now, with 1.10 I see only *.jar files are included >>>> in classpath. but not "other" files. Is this deliberate change or bug ? >>>> >>>> Generally, what is recommended way to include custom files in classpath >>>> and ship it during start to all containers ? >>>> >>>> Thank >>>> >>>
Re: [Flink 1.10] Classpath doesn't include custom files in lib/
Hi Yang, thanks, this explains why classpath behavior changed, but now I struggle to understand how I could overwrite resource, which is already shipped in job jar. Before I had job.properties files in JAR in under resources/lib/job.properties for local development and deploying on cluster it was overwritten with environment specific settings in lib/job.properties of flink distribution. Now this doesn't seem to work. I'm using: getClass.getClassLoader.getResource("lib/job.properties") to get file. Could it be the problem ? Thanks, Maxim. On Mon, Feb 17, 2020 at 4:12 AM Yang Wang wrote: > Hi Maxim Parkachov, > > The users files also have been shipped to JobManager and TaskManager. > However, it > is not directly added to the classpath. Instead, the parent directory is > added to the > classpath. This changes are to make resource classloading work. You could > check more > information here[1]. > > > [1]. https://issues.apache.org/jira/browse/FLINK-13127 > > > Best, > Yang > > Maxim Parkachov 于2020年2月15日周六 上午12:58写道: > >> Hi everyone, >> >> I'm trying to run my job with flink 1.10 with YARN cluster per-job mode. >> In the previous versions all files in lib/ folder were automatically >> included in classpath. Now, with 1.10 I see only *.jar files are included >> in classpath. but not "other" files. Is this deliberate change or bug ? >> >> Generally, what is recommended way to include custom files in classpath >> and ship it during start to all containers ? >> >> Thank >> >
[Flink 1.10] Classpath doesn't include custom files in lib/
Hi everyone, I'm trying to run my job with flink 1.10 with YARN cluster per-job mode. In the previous versions all files in lib/ folder were automatically included in classpath. Now, with 1.10 I see only *.jar files are included in classpath. but not "other" files. Is this deliberate change or bug ? Generally, what is recommended way to include custom files in classpath and ship it during start to all containers ? Thank
Re: Flink 1.10 on MapR secure cluster with high availability
Hi Chesnay, I managed to re-compile with MapR zookeeper and can confirm that it works with HA as well. Still I find it strange that HA uses shadow version of zookeeper instead of version from classpath how it is done for hadoop. Thanks, Maxim. On Wed, Feb 5, 2020 at 3:43 PM Chesnay Schepler wrote: > No, since a) HA will never use classes from the user-jar and b) zookeeper > is relocated to a different package (to avoid conflicts) and hence any > replacement has to follow the same relocation convention. > > On 05/02/2020 15:38, Maxim Parkachov wrote: > > Hi Chesnay, > > thanks for advise. Will it work if I include MapR specific zookeeper in > job dependencies and still use out-of-box Flink binary distribution ? > > Regards, > Maxim. > > On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler > wrote: > >> You must rebuild Flink while overriding zookeeper.version property to >> match your MapR setup. >> For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604 >> Note that you will also have to configure the MapR repository in your >> local setup as described here >> <https://maven.apache.org/guides/mini/guide-multiple-repositories.html>. >> >> On 05/02/2020 15:12, Maxim Parkachov wrote: >> >> Hi everyone, >> >> I have already written about issue with Flink 1.9 on secure MapR cluster >> and high availability. The issue was resolved with custom compiled Flink >> with vendor mapr repositories enabled. The history could be found >> https://www.mail-archive.com/user@flink.apache.org/msg28235.html >> >> Unfortunately, in current 1.10 RC vendor repositories were removed and >> I'm failing to get working configuration. Current situation with 1.10 RC >> and secure MapR cluster: >> >> 1. Without HA, Flink uses class path provided zookeeper jar (mapr >> specific) and everything works fine. >> >> 2. With HA enabled, Flink uses shaded zookeeper >> (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn) >> which doesn't have MapR specific changes and fails to authenticate. >> >> I would really appreciate any help in resolving this issue.I'm ready to >> provide any required details. >> >> Regards, >> Maxim. >> >> >> >
Re: Flink 1.10 on MapR secure cluster with high availability
Hi Chesnay, thanks for advise. Will it work if I include MapR specific zookeeper in job dependencies and still use out-of-box Flink binary distribution ? Regards, Maxim. On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler wrote: > You must rebuild Flink while overriding zookeeper.version property to > match your MapR setup. > For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604 > Note that you will also have to configure the MapR repository in your > local setup as described here > <https://maven.apache.org/guides/mini/guide-multiple-repositories.html>. > > On 05/02/2020 15:12, Maxim Parkachov wrote: > > Hi everyone, > > I have already written about issue with Flink 1.9 on secure MapR cluster > and high availability. The issue was resolved with custom compiled Flink > with vendor mapr repositories enabled. The history could be found > https://www.mail-archive.com/user@flink.apache.org/msg28235.html > > Unfortunately, in current 1.10 RC vendor repositories were removed and I'm > failing to get working configuration. Current situation with 1.10 RC and > secure MapR cluster: > > 1. Without HA, Flink uses class path provided zookeeper jar (mapr > specific) and everything works fine. > > 2. With HA enabled, Flink uses shaded zookeeper > (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn) > which doesn't have MapR specific changes and fails to authenticate. > > I would really appreciate any help in resolving this issue.I'm ready to > provide any required details. > > Regards, > Maxim. > > >
Flink 1.10 on MapR secure cluster with high availability
Hi everyone, I have already written about issue with Flink 1.9 on secure MapR cluster and high availability. The issue was resolved with custom compiled Flink with vendor mapr repositories enabled. The history could be found https://www.mail-archive.com/user@flink.apache.org/msg28235.html Unfortunately, in current 1.10 RC vendor repositories were removed and I'm failing to get working configuration. Current situation with 1.10 RC and secure MapR cluster: 1. Without HA, Flink uses class path provided zookeeper jar (mapr specific) and everything works fine. 2. With HA enabled, Flink uses shaded zookeeper (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn) which doesn't have MapR specific changes and fails to authenticate. I would really appreciate any help in resolving this issue.I'm ready to provide any required details. Regards, Maxim.
Re: Initialization of broadcast state before processing main stream
Hi Vasily, unfortunately, this is known issue with Flink, you could read discussion under https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API . At the moment I have seen 3 solutions for this issue: 1. You buffer fact stream in local state before broadcast is completely read 2. You create custom source for fact stream and in open method wait before broadcast stream is completely read. 3. With latest Flink version, you could pre-populate state with dimension and start Flink job with existing state. You need to take care of setting correct kafka offsets for dimension stream though, otherwise you will get a gap between pre-populated state and moment when job is started. First 2 solutions need to know when broadcast stream is "completely read". I created workaround for this issue with custom source for dimension events. It creates "stop file" on shared file system, reads with admin interface kafka end offsets for dimension topic, start processing all messages from beginning and clears "stop file" after offset of messages reached end offsets for all partitions. Instead of "stop file" you could use shared lock in zookeeper. Hope this helps, Maxim. On Thu, Nov 14, 2019 at 7:42 AM vino yang wrote: > Hi Vasily, > > Currently, Flink did not do the coordination between a general stream and > broadcast stream, they are both streams. Your scene of using the broadcast > state is a special one. In a more general scene, the states need to be > broadcasted is an unbounded stream, the state events may be broadcasted to > the downstream at any time. So it can not be wait to be done before playing > the usual stream events. > > For your scene: > > >- you can change your storage about dimension table, e.g. Redis or >MySQL and so on to do the stream and dimension table join; >- you can inject some control event in your broadcast stream to mark >the stream is end and let the fact stream wait until receiving the control >event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to >coordinate them, however, it would make your solution more complex. > > Best, > Vino > > > Vasily Melnik 于2019年11月14日周四 > 下午1:28写道: > >> Hi all. >> >> In our task we have two Kafka topics: >> - one with fact stream (web traffic) >> - one with dimension >> >> We would like to put dimension data into broadcast state and lookup on >> int with facts. But we see that not all dimension records are put into >> state before first fact record is processed, so lookup gives no data. >> >> The question is: how could we read fact topic with some "delay" to give >> dimension enough time to initialize state? >> >> >> С уважением, >> Василий Мельник >> >
Re: Flink 1.9, MapR secure cluster, high availability
Hi Stephan, sorry for the late answer, didn't have access to cluster. Here is log and stacktrace. Hope this helps, Maxim. - 2019-09-16 18:00:31,804 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 2019-09-16 18:00:31,806 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting YarnSessionClusterEntrypoint (Version: 1.9.0, Rev:9c32ed9, Date:19.08.2019 @ 16:16:55 UTC) 2019-09-16 18:00:31,806 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - OS current user: run 2019-09-16 18:00:32,285 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Current Hadoop/Kerberos user: run 2019-09-16 18:00:32,285 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.152-b16 2019-09-16 18:00:32,285 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Maximum heap size: 161 MiBytes 2019-09-16 18:00:32,285 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JAVA_HOME: /pb/apps/java/jdk1.8.0_152/ 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Hadoop version: 2.7.0-mapr-1808 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM Options: 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms168m 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx168m 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog.file=/opt/mapr/hadoop/hadoop-2.7.0/logs/userlogs/application_1568125115996_13508/container_e64_1568125115996_13508_01_01/jobmanager.log 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Dlog4j.configuration=file:log4j.properties 2019-09-16 18:00:32,286 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Program Arguments: (none) 2019-09-16 18:00:32,287 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Classpath: log4j.properties:flink.jar:flink-conf.yaml::/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0-mapr-1808-tests.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0-mapr-1808.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-nfs-2.7.0-mapr-1808.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jackson-databind-2.9.5.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/activation-1.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jsr305-3.0.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jersey-server-1.9.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/junit-4.11.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/asm-3.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/maprdb-6.1.0-mapr.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/avro-1.7.6.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mockito-all-1.8.5.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-beanutils-1.7.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jettison-1.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mapr-spark-yarn-shuffle.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-cli-1.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/netty-3.6.2.Final.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-codec-1.4.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jetty-6.1.26.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-collections-3.2.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/xz-1.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-compress-1.4.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jetty-util-6.1.26.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-configuration-1.6.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/json-smart-1.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-digester-1.8.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/maprfs-6.1.0-mapr.jar:/opt/mapr
Re: Flink 1.9, MapR secure cluster, high availability
Hi Stephan, With previous versions, I tried around 1.7, I always had to compile MapR hadoop to get it working. With 1.9 I took hadoop-less Flink, which worked with MapR FS until I switched on HA. So it is hard to say if this is regression or not. The error happens when Flink tries to initialize BLOB storage on MapR FS. Without HA it takes zookeeper from classpath (MapR org.apache.zookeeper) and with HA it takes shaded one. After fixing couple of issue with pom, I was able to compile Flink with MapR zookeeper and now when I start with HA mode it uses shaded zookeeper (which is now MapR) to initialize BLOB and org.apache.zookeeper (which is as well MapR) for HA recovery. It works, but, I was expecting it to work without compiling MapR dependencies. Hope this helps, Maxim. On Thu, Aug 29, 2019 at 7:00 PM Stephan Ewen wrote: > Hi Maxim! > > The change of the MapR dependency should not have an impact on that. > Do you know if the same thing worked in prior Flink versions? Is that a > regression in 1.9? > > The exception that you report, is that from Flink's HA services trying to > connect to ZK, or from the MapR FS client trying to connect to ZK? > > Best, > Stephan > > > On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov > wrote: > >> Hi everyone, >> >> I'm testing release 1.9 on MapR secure cluster. I took flink binaries >> from download page and trying to start Yarn session cluster. All MapR >> specific libraries and configs are added according to documentation. >> >> When I start yarn-session without high availability, it uses zookeeper >> from MapR distribution (org.apache.zookeeper) and correctly connects to >> cluster and access to maprfs works as expected. >> >> But if I add zookeeper as high-avalability option, instead of MapR >> zookeeper it tries to use shaded zookeeper and this one could not connect >> with mapr credentials: >> >> 2019-08-27 10:42:45,240 ERROR >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient >> - An error: (java.security.PrivilegedActionException: >> javax.security.sasl.SaslException: GSS initiate failed [Caused by >> GSSException: No valid credentials provided (Mechanism level: Failed to find >> any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's >> received SASL token. Zookeeper Client will go to AUTH_FAILED state. >> 2019-08-27 10:42:45,240 ERROR >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL >> authentication with Zookeeper Quorum member failed: >> javax.security.sasl.SaslException: An error: >> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: >> GSS initiate failed [Caused by GSSException: No valid credentials provided >> (Mechanism level: Failed to find any Kerberos tgt)]) occurred when >> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client >> will go to AUTH_FAILED state. >> I tried to use separate zookeeper cluster for HA, but maprfs still doesn't >> work. >> >> Is this related to removal of MapR specific settings in Release 1.9 ? >> Should I still compile custom version of Flink with MapR dependencies ? >> (trying to do now, but getting some errors during compilation). >> >> Can I somehow force flink to use MapR zookeeper even with HA mode ? >> >> Thanks in advance, >> Maxim. >> >>
Flink 1.9, MapR secure cluster, high availability
Hi everyone, I'm testing release 1.9 on MapR secure cluster. I took flink binaries from download page and trying to start Yarn session cluster. All MapR specific libraries and configs are added according to documentation. When I start yarn-session without high availability, it uses zookeeper from MapR distribution (org.apache.zookeeper) and correctly connects to cluster and access to maprfs works as expected. But if I add zookeeper as high-avalability option, instead of MapR zookeeper it tries to use shaded zookeeper and this one could not connect with mapr credentials: 2019-08-27 10:42:45,240 ERROR org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient - An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to AUTH_FAILED state. 2019-08-27 10:42:45,240 ERROR org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client will go to AUTH_FAILED state. I tried to use separate zookeeper cluster for HA, but maprfs still doesn't work. Is this related to removal of MapR specific settings in Release 1.9 ? Should I still compile custom version of Flink with MapR dependencies ? (trying to do now, but getting some errors during compilation). Can I somehow force flink to use MapR zookeeper even with HA mode ? Thanks in advance, Maxim.
Re: Exactly-once semantics in Flink Kafka Producer
Hi Vasily, as far as I know, by default console-consumer reads uncommited. Try setting isolation.level to read_committed in console-consumer properties. Hope this helps, Maxim. On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik < vasily.mel...@glowbyteconsulting.com> wrote: > Hi, Eduardo. > Maybe i should describe experiment design precisely : > 1) I run Flink on YARN (YARN Session method). > 2) I do not stop/cancell application, i just kill TaskManager process > 3) After that YARN creates another TaskManager Process and auto checkpoint > restore from HDFS happens. > > That's why i expect to see correct restore. > > С уважением, > Василий Мельник > > *Glow**Byte Consulting* <http://www.gbconsulting.ru/> > > === > > Моб. тел.: +7 (903) 101-43-71 > vasily.mel...@glowbyteconsulting.com > > > On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor < > eduardo.winpe...@gmail.com> wrote: > >> Hi Vasily, >> >> You're probably executing this from your IDE or from a local Flink >> cluster without starting your job from a checkpoint. >> >> When you start your Flink job for the second time you need to specify the >> path to the latest checkpoint as an argument, otherwise Flink will start >> from scratch. >> >> You're probably thinking that's not great, ideally Flink should be able >> to automatically continue from the last produced checkpoint, and actually >> that's what the docs say! Well, that's only when you're running in a proper >> cluster environment. Flink is able to recover using checkpoints when only >> part of the cluster fails, not when the whole job is stopped. For full >> stops you need to specify the checkpoint manually. >> >> Hope that helps! >> >> >> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, < >> vasily.mel...@glowbyteconsulting.com> wrote: >> >>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source >>> and Sink: >>> >>>1. Run flink app, simply transferring messages from one topic to >>>another with parallelism=1, checkpoint interval 20 seconds >>>2. Generate messages with incrementing integer numbers using Python >>>script each 2 seconds. >>>3. Read output topic with console consumer in read_committed >>>isolation level. >>>4. Manually kill TaskManager >>> >>> I expect to see monotonically increasing integers in output topic >>> regardless TaskManager killing and recovery. >>> >>> But actually a see something unexpected in console-consumer output: >>> >>> 32 >>> 33 >>> 34 >>> 35 >>> 36 >>> 37 >>> 38 >>> 39 >>> 40 >>> -- TaskManager Killed >>> 32 >>> 34 >>> 35 >>> 36 >>> 40 >>> 41 >>> 46 >>> 31 >>> 33 >>> 37 >>> 38 >>> 39 >>> 42 >>> 43 >>> 44 >>> 45 >>> >>> Looks like all messages between checkpoints where replayed in output >>> topic. Also i expected to see results in output topic only after >>> checkpointing i.e. each 20 seconds, but messages appeared in output >>> immediately as they where send to input. >>> Is it supposed to be correct behaviour or i do something wrong? >>> >>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional >>> producer and read-committed console comsumer with custom code and it worked >>> perfectly well reading messages only after commitTransaction on producer. >>> >>> My Flink code: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> env.getConfig().setAutoWatermarkInterval(1000); >>> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE); >>> env.setStateBackend(new >>> RocksDBStateBackend("hdfs:///checkpoints-data")); >>> >>> Properties producerProperty = new Properties(); >>> producerProperty.setProperty("bootstrap.servers", ...); >>> producerProperty.setProperty("zookeeper.connect", ...); >>> >>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1"); >>> >>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction"); >>> >>>
Re: Providing external files to flink classpath
Hi Vishwas, took me some time to find out as well. If you have your properties file under lib following will work: val kafkaPropertiesInputStream = getClass.getClassLoader.getResourceAsStream("lib/config/kafka.properties") Hope this helps, Maxim. On Wed, Jul 17, 2019 at 7:23 PM Vishwas Siravara wrote: > Does the -yt option work for standalone cluster without dedicated resource > manager ? So this property file is read by one of the dependencies inside > my application as a file, so I can't really use Parameter tool to parse the > config file. > > Thanks, > Vishwas > > On Fri, Jun 28, 2019 at 11:08 PM Yun Tang wrote: > >> Hi Vishwas >> >> >>1. You could use '-yt' to ship specified files to the class path, >>please refer to [1] for more details. >>2. If the properties are only loaded on client side before executing >>the application, you could let your application to just read from local >>property data. Flink support to load properties within the >>ParameterTool [2]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage >> [2] >> https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120 >> >> Best >> Yun Tang >> >> -- >> *From:* Vishwas Siravara >> *Sent:* Saturday, June 29, 2019 0:43 >> *To:* user >> *Subject:* Providing external files to flink classpath >> >> Hi , >> I am trying to add external property files to the flink classpath for >> my application. These files are not a part of the fat jar. I put them >> under the lib folder but flink cant find them? How can I manage >> external property files that needs to be read by flink ? >> >> Thanks, >> Vishwas >> >
Re: yarn-session vs cluster per job for streaming jobs
Hi Haibo, thanks for tip, I almost forgot about max-attempts. I understood implication of running with one AM. Maybe my question was incorrect, but what would be faster (with regards to downtime of each job): 1. In case of yarn-session: Parallel cancel all jobs with savepoints, restart yarn-session, parallel start all jobs from savepoints 2. In case of per-job mode Parallel cancel all jobs with savepoints, parallel start all jobs from savepoints. I want to optimise standard situation where I deploy new version of all my jobs. My current impression that job starts faster in yarn-session mode. Thanks, Maxim. On Thu, Jul 18, 2019 at 4:57 AM Haibo Sun wrote: > Hi, Maxim > > For the concern talking on the first point: > If HA and checkpointing are enabled, AM (the application master, that is > the job manager you said) will be restarted by YARN after it dies, and then > the dispatcher will try to restore all the previously running jobs > correctly. Note that the number of attempts be decided by the > configurations "yarn.resourcemanager.am.max-attempts" and > "yarn.application-attempts". The obvious difference between the session and > per-job modes is that if a fatal error occurs on AM, it will affect all > jobs running in it, while the per-job mode will only affect one job. > > You can look at this document to see how to configure HA for the Flink > cluster on YARN: > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability > . > > Best, > Haibo > > At 2019-07-17 23:53:15, "Maxim Parkachov" wrote: > > Hi, > > I'm looking for advice on how to run flink streaming jobs on Yarn cluster > in production environment. I tried in testing environment both approaches > with HA mode, namely yarn session + multiple jobs vs cluster per job, both > seems to work for my cases, with slight preference of yarn session mode to > centrally manage credentials. I'm looking to run about 10 streaming jobs > mostly reading/writing from kafka + cassandra with following restictions: > 1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I > have a concern here what happens when Job manager dies in session mode. > 2. there are often network interruptions/slowdowns. > 3. I'm trying to minimise time to restart job to have as much as possible > continious processing. > > Thanks in advance, > Maxim. > >
yarn-session vs cluster per job for streaming jobs
Hi, I'm looking for advice on how to run flink streaming jobs on Yarn cluster in production environment. I tried in testing environment both approaches with HA mode, namely yarn session + multiple jobs vs cluster per job, both seems to work for my cases, with slight preference of yarn session mode to centrally manage credentials. I'm looking to run about 10 streaming jobs mostly reading/writing from kafka + cassandra with following restictions: 1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have a concern here what happens when Job manager dies in session mode. 2. there are often network interruptions/slowdowns. 3. I'm trying to minimise time to restart job to have as much as possible continious processing. Thanks in advance, Maxim.
Re: Automatic deployment of new version of streaming stateful job
Hi Marc, thanks a lot for the tool. Unfortunately, I could not direcly use it, but I will take couple of ideas and will implement my own script. Nevertherless, I'm really surprised that such functionality doesn't exist out of the box. Regards, Maxim. On Tue, Jul 16, 2019 at 9:22 AM Marc Rooding wrote: > Hi Maxim > > You could write a script yourself which triggers the cancel with savepoint > and then starts a new version using the savepoint that was created during > the cancel. > > However, I’ve built a tool that allows you to perform these steps more > easily: https://github.com/ing-bank/flink-deployer. The deployer will > allow you to deploy or upgrade your jobs. All you need to do is integrate > it into your CI/CD. > > Kind regards > > Marc > On 16 Jul 2019, 02:46 +0200, Maxim Parkachov , > wrote: > > Hi, > > I'm trying to bring my first stateful streaming Flink job to production > and have trouble understanding how to integrate it with CI/CD pipeline. I > can cancel the job with savepoint, but in order to start new version of > application I need to specify savepoint path manually ? > > So, basically my question, what is best practice of automatically > restarting or deploying new version of stateful streaming application ? > Every tip is greatly appreciated. > > Thanks, > Maxim. > >
Automatic deployment of new version of streaming stateful job
Hi, I'm trying to bring my first stateful streaming Flink job to production and have trouble understanding how to integrate it with CI/CD pipeline. I can cancel the job with savepoint, but in order to start new version of application I need to specify savepoint path manually ? So, basically my question, what is best practice of automatically restarting or deploying new version of stateful streaming application ? Every tip is greatly appreciated. Thanks, Maxim.
Re: How autoscaling works on Kinesis Data Analytics for Java ?
Hi, Answering to myself in case someone else is interested as well. As per https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/ it does autoscaling itself, but in order to change parallelism it takes snapshot and restarts streaming job, so no magic here, but nicely automated. Regards, Maxim. On Tue, Jan 29, 2019 at 5:23 AM Maxim Parkachov wrote: > Hi, > > I had impression, that in order to change parallelism, one need to stop > Flink streaming job and re-start with new settings. > > According to > https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html > auto-scaling works out of the box. Could someone with experience of running > Flink on AWS Data Analytics for Java give a short explanation ? > > Thanks in advance. > Maxim. >
How autoscaling works on Kinesis Data Analytics for Java ?
Hi, I had impression, that in order to change parallelism, one need to stop Flink streaming job and re-start with new settings. According to https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html auto-scaling works out of the box. Could someone with experience of running Flink on AWS Data Analytics for Java give a short explanation ? Thanks in advance. Maxim.
Re: Ship compiled code with broadcast stream ?
Hi, This is certainly possible. What you can do is use a > BroadcastProcessFunction where you receive the rule code on the broadcast > side. > Yes, this part works, no problem. > You probably cannot send newly compiled objects this way but what you can > do is either send a reference to some compiled jars and load them with the > URLClassloader or send the actual String code and invoke the java compiler > from your function. > Compiling from source code ... actually, I can implement rules in nashorn JavaScript, but I'm pretty sure I'll get not serializable exception, but I'll check this. Referencing existing jar will not solve problem as I would need to re-submit job, which I want to avoid in the first place. I actually wanted exactly first scenario, send newly compiled objects. Regards, Maxim. >
Ship compiled code with broadcast stream ?
Hi everyone, I have a job with event stream and control stream delivering rules for event transformation. Rules are broadcasted and used in flatMat-like coProcessFunction. Rules are defined in custom JSON format. Amount of rules and complexity rises significantly with every new feature. What I would like is to ship compiled (serialized ?) code instead of JSON rules to control stream and use this compiled classes directly without additional transformation. This will allow to get more robust testing and will allow to implement much more complex rules. But I'm struggling to understand how to achieve this. Did someone implement system like this ? Is this possible at all ? Any help is greatly appreciated, Maxim.
Multiple Async IO
Hi everyone, I'm writing streaming job which needs to query Cassandra for each event multiple times, around 20. I would like to use Async IO for that but not sure which option to choose: 1. Implement One AsyncFunction with 20 queries inside 2. Implement 20 AsyncFunctions, each with 1 query inside Taking into account that each event needs all queries. Reduce amount of queries for each record is not an option. In this case I would like to minimise processing time of event, even if throughput will suffer. Any advice or consideration is greatly appreciated. Thanks, Maxim.
Re: Forcing consuming one stream completely prior to another starting
Hi Ron, I’m joining two streams - one is a “decoration” stream that we have in a > compacted Kafka topic, produced using a view on a MySQL table AND using > Kafka Connect; the other is the “event data” we want to decorate, coming in > over time via Kafka. These streams are keyed the same way - via an “id” > field, and we join them using CoFlatMap that attaches the data from the > decoration stream to the event data and publishes downstream. > > What I’d like to do in my CoFlatMap is wait to process the event data > until we’ve received the corresponding decoration data. How do I make that > happen? > > I have exactly the same scenario and researched this. Basically, what we need is https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API, but unfortunately it seems like nobody actively working on it. You could check ideas there though. Side inputs seems to be implemented in Beam API and should be working on Flink executor, but I didn’t try it for many reasons. I have ended with following solution: - in the script staring Job I create Stop file on shared filesystem (HDFS) - created 2 SourceFunction extending Kafka source - in source function for “decoration” stream in run method I consume all records from compacted topic. Here is the tricky part how to identify if everything is consumed already. I resolved it by reading kafka end offset directly with kafka admin API and checking if I arrived at this offset. After waiting a bit to make sure that event is propagated to next operator I delete Stop file on the shared file system - in source function for event data, I have implemented “open” method waiting until Stop file is deleted. This keeps it consuming event data. - in pipeline I broadcasted “decoration” event and used CoProcessFunction to store it in state and enrich main event stream. The application is not in production yet as I need to do more testing, but it seems to work. Additionally I tried to cache decorated data in state of source function to recover from checkpoint easily, but I’m still not sure if it’s better to read it from compacted topic every time or have additional cache in source function or state in CoProcessFunction is enough. Hope this helps and would be interested to hear your experience. Regards, Maxim.
Fwd: Initialise side input state
Hi Xingcan, On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingc...@gmail.com> wrote: > Hi Maxim, > > if I understand correctly, you actually need to JOIN the fast stream with > the slow stream. Could you please share more details about your problem? > Sure I can explain more, with some example of pseudo-code. I have external DB with price list with following structure: case class PriceList(productId, price) My events are purchase events with following structure: case class Purchase(productId, amount) I would like to get final stream with TotalAmount = Amount*Price in structure like this: case class PurchaseTotal(productId, totalAmount) I have 2 corresponding input streams: val prices = env.addSource(new PriceListSource).keyBy(_.productId) val purchases = env.addSource(new PurchaseSource).keyBy(_.productId) PriceListSource delivers me all CHANGES to external DB table. Calculate function looks similar to: class CalculateFunction extends CoProcessFunction[Purchase, PriceList, PurchaseTotal] { private var price: ValueState[Int] = _ override def processElement1... { out.collect(PurchaseTotal(purchase.productId, purchase.amount * priceList.value)) } override def processElement2... { price.update(priceList.value) } } And finally pipeline: purchases.connect(prices).process(new CalculateFunction).print The issue is, when I start program my price ValueState is empty and will not be populated with data which is not updated in DB. BTW, I cannot use AsyncIO to query DB, because of several technical restrictions. 1. When you mentioned "they have the same key", did you mean all the data > get the same key or the logic should be applied with fast.key = slow.key? > I meant here that productId in purchase event is definitely exist in external price list DB (so, it is kind of inner join) > 2. What should be done to initialize the state? > I need to read external DB table and populate price ValueState before processing first purchase event. Hope this minimal example helps to understand. Maxim. > > Best, > Xingcan > > > On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gop...@gmail.com> > wrote: > >> Hi Flink users, >> >> I'm struggling with some basic concept and would appreciate some help. I >> have 2 Input streams, one is fast event stream and one is slow changing >> dimension. They have the same key and I use CoProcessFunction to store >> slow data in state and enrich fast data from this state. Everything >> works as expected. >> >> Before I start processing fast streams on first run, I would like to >> completely >> initialise state. I though it could be done in open(), but I don't >> understand how it will be re-distributed across parallel operators. >> >> Another alternative would be to create custom source and push all slow >> dimension >> data downstream, but I could not find how to hold processing fast data >> until state is initialised. >> >> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other >> way to implement it now ? >> >> Thanks, >> Maxim. >> >> >
Initialise side input state
Hi Flink users, I'm struggling with some basic concept and would appreciate some help. I have 2 Input streams, one is fast event stream and one is slow changing dimension. They have the same key and I use CoProcessFunction to store slow data in state and enrich fast data from this state. Everything works as expected. Before I start processing fast streams on first run, I would like to completely initialise state. I though it could be done in open(), but I don't understand how it will be re-distributed across parallel operators. Another alternative would be to create custom source and push all slow dimension data downstream, but I could not find how to hold processing fast data until state is initialised. I realise that FLIP-17 (Side Inputs) is what I need, but is there some other way to implement it now ? Thanks, Maxim.
Control triggering on empty window
I have the following use case: Input stream of timestamped "on" and "off" events received out of order. I need to produce an event with time that system was "on" every 15 minutes. Events should be produced only for intervals that system was "on". When 15 minute window has at least one record it is triggered and the required aggregate is created, but when no event is received within 15 minute period window is not triggered and nothing is produced. I understand that it is not feasible to trigger on empty windows when the set of keys is unbounded. But it would be nice to give the control for such triggering to a window function. In my case the window function could enable the empty triggering for the current key when the last event in the evaluated window is "on" and disable it if is "off". The strawman API for such feature: public void apply(String key, TimeWindow window, Iterable input, Collector out) throws Exception { ... RuntimeContext context = this.getRuntimeContext(); if (lastEvent.isOn()) { context.enableEmptyWindowTriggering(); } else { context.disableEmptyWindowTriggering(); } } I could implement the same logic using global window and custom trigger and evictor, but it looks like ugly workaround to me. Is there any better way to solve this use case? Thanks, Maxim.
Task Slots and Heterogeneous Tasks
I'm trying to understand a behavior of Flink in case of heterogeneous operations. For example in our pipelines some operation might accumulate large windows while another performs high latency calls to external services. Obviously the former needs task slot with a large memory allocation, while the latter needs no memory but a high degree of parallelism. Is any way to have different slot types and control allocation of operations to them? May be is there another way to ensure good hardware utilization? Also from the documentation it is not clear if memory of a TaskManager is shared across all tasks running on it or each task gets its quota. Could you clarify it? Thanks, Maxim.
Re: How to perform this join operation?
You could simulate the Samza approach by having a RichFlatMapFunction over cogrouped streams that maintains the sliding window in its ListState. As I understand the drawback is that the list state is not maintained in the managed memory. I'm interested to hear about the right way to implement this. On Wed, Apr 13, 2016 at 3:53 PM, Elias Levywrote: > I am wondering how you would implement the following function in Flink. > The function takes as input two streams. One stream can be viewed a a > tuple with two value *(x, y)*, the second stream is a stream of > individual values *z*. The function keeps a time based window on the > first input (say, 24 hours). Whenever it receives an element from the > second stream, it compares the value *z* against the *x* element of each > tuple in the window, and for each match it emits *(x, y)*. You are > basically doing a join on *x=z*. Note that values from the second stream > are not windowed and they are only matched to values from the first stream > with an earlier timestamps. > > This was relatively easy to implement in Samza. Consume off two topics, > the first keyed by *x* and the second by *z*. Consume both topics in a > job. Messages with the same key would be consumed by the same task. The > task could maintain a window of messages from the first stream in its local > state, Whenever a message came in via the second stream, it could look up > in the local state for matching messages, and if it found any, send them to > the output stream. Obviously, with Samza you don't have the luxury of the > system handling event time for you, but this work well and it is easy to > implement. > > I am not clear how this would be implemented in Flink. > > It is easy enough to partition by key either stream, and to window the > first stream using a sliding window, but from there out things get > complicated. > > You can join the two streams by key, but you must do so using the same > window for both streams. That means events from the first stream may be > matched to older events of the second stream, which is not what we want. I > suppose if both included a timestamp, you could later add a filter to > remove such events from the merged stream. But you would also have to deal > with duplicates, as the window is a sliding window and the same two > elements may match across all window panes that contain the matching > elements. So you need to dedup as well. > > coGroup seems like it would suffer from the same issues. > > Maybe the answer is connected streams, but there is scant documentation on > the semantics of ConnectedStreams. There isn't even an example that I > could find that makes use of them. > > Thoughts? > > > > >
Re: RocksDB Statebackend
Is it possible to add an option to store the state in the Java HashMap and write its content to RocksDB when checkpointing? For "hot" keys that are updated very frequently such optimization would help with performance. I know that you are also working on incremental checkpoints which would also be big win for jobs with a large number of keys. Thanks, Maxim. On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <se...@apache.org> wrote: > Concerning the size of RocksDB snapshots - I am wondering if RocksDB > simply does not compact for a long time, thus having a lot of stale data in > the snapshot. > > That would be especially the case, if you have a lot of changing values > for the same set of keys. > > On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> I'm going to try and respond to each point: >> >> 1. This seems strange, could you give some background on parallelism, >> number of operators with state and so on? Also, I'm assuming you are using >> the partitioned state abstraction, i.e. getState(), correct? >> >> 2. your observations are pretty much correct. The reason why RocksDB is >> slower is that the FsStateBackend basically stores the state in a Java >> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores >> data in on-disk files and goes to them for every state access (of course >> there are caches, but generally it is like this). I'm actually impressed >> that it is still this fast in comparison. >> >> 3. see 1. (I think for now) >> >> 4. The checkpointing time is the time from the JobManager deciding to >> start a checkpoint until all tasks have confirmed that checkpoint. I have >> seen this before and I think it results from back pressure. The problem is >> that the checkpoint messages that we sent through the topology are sitting >> at the sources because they are also back pressured by the slow processing >> of normal records. You should be able to see the actual checkpointing times >> (both synchronous and asynchronous) in the log files of the task managers, >> they should be very much lower. >> >> I can go into details, I'm just writing this quickly before calling it a >> day. :-) >> >> Cheers, >> Aljoscha >> >> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> >>> Hi everyone, >>> >>> my experience with RocksDBStatebackend have left me a little bit >>> confused. Maybe you guys can confirm that my epxierence is the expected >>> behaviour ;): >>> >>> I have run a "performancetest" twice, once with FsStateBackend and once >>> RocksDBStatebackend in comparison. In this particular test the state >>> saved is generally not large (in a production scenario it will be >>> larger.) >>> >>> These are my observations: >>> >>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared >>> to <<1MB with the FSStatebackend. >>> >>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster. >>> >>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for >>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference >>> gets smaller for very large state. Can you confirm? >>> >>> 4. Checkpointing Times as reported in the Dashboard were 26secs for >>> RocksDB during the test and <1 second for FsStatebackend. Does the >>> reported time correspond to the sync. + asynchronous part of the >>> checkpointing in case of RocksDB? Is there any way to tell how long the >>> synchronous part takes? >>> >>> Form these first observations RocksDB does seem to bring a large >>> overhead for state < 1GB, I guess? Is this expected? >>> >>> Cheers, >>> >>> Konstantin >>> >> >