S3 schema for jar location?

2024-08-01 Thread Maxim Senin via user
When will Flink Operator support schemas other than `local` for application 
deployment jar files? I just tried flink operator 1.9 and it’s still not 
working with `s3` locations. If s3 is good for savepoints and checkpoints, why 
can’t the jar also be on s3?

Thanks,
Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Operator/Autoscaler/Autotuner tuning behavior question

2024-05-08 Thread Maxim Senin via user
Hello.

I have some questions about memory autotuning in the Operator.

1. Does the autotuner try to upgrade the job with more memory allocated if it 
intercepts OutOfMemoryError? Say I initially provided too little memory for TM 
`resource` - will the job fail and stop on initializing or will the operator 
try to increase memory after it catches OutOfMemoryError?
2. When autoscaler scales a TM from, say, from 1 to 2 instances with higher 
parallelism, does it also double memory and CPU requested in each iteration of 
upscaling, or it would have already measured RAM requirements when it was 
collecting stats and request just what it thinks is needed? I see there’s a 
config option where it increases memory when scaling down so that the job 
doesn’t fail because fewer TMs are doing the job of what more TMs used to be 
doing, so I was wondering if the opposite is true when scaling up.
3. Will autotuner ever increase requested memory beyond what was initially 
requested in TM’s `resource` block in the Deployment CRD? Same for CPU?..
4. Does the operator care about available resources in k8s, or it just makes an 
“optimistic” request and hopes it will be granted? What happens if it requests 
more than available? Keep retrying? Stay in pending state waiting for 
resources? Time out? Exit? Is there a rollback and retry with smaller amount of 
resources requested if a request with larger demands fails? I see there’s an 
option that, when enabled, can refresh information about available resources 
periodically, which should prevent or reduce inadvertent greedy requests. But 
what’s the strategy used by the operator if the request is already too large to 
handle?

Thanks a lot!
/Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: [External] Regarding java.lang.IllegalStateException

2024-04-26 Thread Maxim Senin via user
My guess it’s a major known issue. Need a workaround.

https://issues.apache.org/jira/browse/FLINK-32212

/Maxim

From: prashant parbhane 
Date: Tuesday, April 23, 2024 at 11:09 PM
To: user@flink.apache.org 
Subject: [External] Regarding java.lang.IllegalStateException
Hello,

We have been facing this weird issue, where we are getting below exception and 
the job is getting restarted with new task managers. We are using flink 1.17.
Same job works fine with lower number of task managers. (<10)

java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68]
new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1041)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)

Thanks,
Prashant



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with 
Foreground propagation
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager 
Deployment to zero with 300 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Scaling 
JobManager Deployment to zero
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting JobManager 
Deployment with 298 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Deleting 
JobManager Deployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting Kubernetes HA 
metadata

Any ideas?

Thanks,
Maxim

From: Gyula Fóra 
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin 
Cc: Maxim Senin via user 
Subject: Re: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi Maxim!

Regarding the status update error, it could be related to a problem that we 
have discovered recently with the Flink Operator HA. Where during a namespace 
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the 
one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick the 
fix internally to the operator or reduce the replicas to 1 if you are using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
mailto:user@flink.apache.org>> wrote:
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user mailto:user@flink.apache.org>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user mailto:user@flink.apache.org>>
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\&

Re: Regarding java.lang.IllegalStateException

2024-04-26 Thread Maxim Senin via user
We are also seeing something similar:

2024-04-26 16:30:44,401 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: Power 
Consumption:power_consumption -> Ingest Power Consumption -> PopSysFields -> 
WindowingWatermarkPreCheck (1/1) 
(cb8c425b6463b1ade9b8359c0514386b_28bc590bb7896e1df191c306d7cb6d23_0_11) 
switched from DEPLOYING to FAILED on 
f-a9ad4438-cddf-512f-94c6-c5f921f66078-taskmanager-1-1 @ 10.111.164.30 
(dataPort=42621).
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573]
new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist-1.19.0.jar:1.19.0]
  at java.lang.Thread.run(Unknown Source) ~[?:?]
2024-04-26 16:30:44,402 INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting 
job.
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573]
new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist-1.19.0.jar:1.19.0]
  at java.lang.Thread.run(Unknown Source) ~[?:?]

Flink 1.19, operator 1.9.0


. . . . . . . . . . . . . . . . . . . . .
  Maxim Senin
   Senior Backend Engineer
 COGILITY<http://cogility.com/>

From: prashant parbhane 
Date: Tuesday, April 23, 2024 at 11:09 PM
To: user@flink.apache.org 
Subject: Regarding java.lang.IllegalStateException
Hello,

We have been facing this weird issue, where we are getting below exception and 
the job is getting restarted with new task managers. We are using flink 1.17.
Same job works fine with lower number of task managers. (<10)

java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68]
new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCac

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
Hi, Gyula. Thanks for the tips.

All jobs are deployed in a single namespace, “flink”.

Which replicas? The JM replicas are already 1, I tried with TM replicas set to 
1, but same exception happens. We have only 1 instance of the operator 
(replicas=1) in this environment.

The only workarounds I discovered is either
a) disable autoscaling for the failing job (autoscaler scales the job to zero 
for “gracefully” stopping it and then never starts it)  or
b) some jobs that keep restarting can be fixed by disabling HA for that job

And ` Cannot rescale the given pointwise partitioner.` is also still a mystery.
Thanks,
Maxim

From: Gyula Fóra 
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin 
Cc: Maxim Senin via user 
Subject: Re: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi Maxim!

Regarding the status update error, it could be related to a problem that we 
have discovered recently with the Flink Operator HA. Where during a namespace 
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the 
one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick the 
fix internally to the operator or reduce the replicas to 1 if you are using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
mailto:user@flink.apache.org>> wrote:
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user mailto:user@flink.apache.org>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user mailto:user@flink.apache.org>>
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_met

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-25 Thread Maxim Senin via user
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user 
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user 
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event 
processing ExecutionScope{ resource id: 
Reso

Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-25 Thread Maxim Senin via user
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event 
processing ExecutionScope{ resource id: 
ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, 
version: 50606957} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED",

Caused by: 
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:63)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:279)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:156)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:171)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145)
... 13 more

How to fix this? Why is the deployment not coming back up after this exception? 
Is there an configuration property to set a number of retires?

Thanks,
Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Job goes into FINISHED state after rescaling - link operator

2024-04-22 Thread Maxim Senin via user
Hi. My Flink Deployment is set to use savepoint for upgrades and for taking 
savepoint before stopping.
When rescaling happens, for some reason it scales the JobManager to zero 
(“Scaling JobManager Deployment to zero with 300 seconds timeout”) and the job 
goes into FINISHED state. It doesn’t seem to be able to continue. Any ideas why 
is it deleting itself?
The savepoints are stored on s3.
I can restart the job from savepoint manually but at the next rescaling 
operation it deletes itself again.

Thanks!

Log:
2024-04-22 23:39:17,016 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Observing JobManager 
deployment. Previous status: DEPLOYING
2024-04-22 23:39:17,024 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] JobManager is being deployed
2024-04-22 23:39:17,045 o.a.f.a.JobAutoScalerImpl  [INFO 
][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up autoscaling meta 
data
2024-04-22 23:39:17,046 o.a.f.k.o.s.AbstractFlinkService [INFO 
][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with 
Foreground propagation
2024-04-22 23:39:17,046 o.a.f.k.o.s.AbstractFlinkService [INFO 
][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager Deployment 
to zero with 300 seconds timeout...
2024-04-22 23:39:18,941 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2024-04-22 23:39:18,982 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
e22861b1943d40ca6f5a40ae6332d42b could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,984 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
e503bd1c0fb6799d36f0c4b786e69fd9 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,985 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
de478b666343f04920f1cd3e6e65548c could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,987 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
0956ec3e5721a3bc416c208d690e220a could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,988 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
2a0e1e1304dc61d997d3e6f7025df9b3 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,989 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
09152f8c760b2503dd4174abf81781b6 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,991 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
cbe38acc7ac41cc91794215391eedc28 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,992 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
cc2707c5c5b5cdd319d28980a6ad99d0 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,994 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
d2fd710697779b81072031f8924b5967 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,995 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
9892bf1d00288ef951498dd18f18dd24 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,997 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
acf3064b984e12010bfeccbe4e28d9a5 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,998 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
8737aeacfef5f24708290203c9de47e1 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
2024-04-22 23:39:18,999 o.a.f.a.ScalingMetricCollector [WARN 
][flink/f-cdda7a8d-8259-5137-8397-8125b212e556] pendingRecords metric for 
45aa9886e32677c3f13caa2aa54ec3ad could not be found. Either a legacy source or 
an idle source. Assuming no pending records.

Parallelism for auto-scaling, memory for auto-tuning - Flink operator

2024-04-17 Thread Maxim Senin via user
Hi.

Does it make sense to specify `parallelism` for task managers or the `job`, 
and, similarly, to specify memory amount for the task managers, or it’s better 
to leave it to autoscaler and autotuner to pick the best values? How many times 
would the autoscaler need to restart task managers before it picks the right 
values? Does `pekko.ask.timeout` need to be sufficient for task managers to get 
into running state with all the restarts?

Cheers,
Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: Long blocking call in UserFunction triggers HA leader lost?

2020-11-12 Thread Maxim Parkachov
Hi Theo,

We had a very similar problem with one of our spark streaming jobs. Best
solution was to create a custom source having all external records in
cache, periodically reading external data and comparing it to cache. All
changed records were then broadcasted to task managers. We tried to
implement background loading in separate thread, but this solution was more
complicated, we needed to create shadow copy of cache and then quickly
switch them. And with spark streaming there were additional problems.

Hope this helps,
Maxim.


Re: Flink streaming job logging reserves space

2020-08-04 Thread Maxim Parkachov
Hi Yang,

Thanks for your advice, now I have a good reason to upgrade to 1.11.

Regards,
Maxim.

On Tue, Aug 4, 2020 at 9:39 AM Yang Wang  wrote:

> AFAIK, there is no way to roll the *.out/err files except we hijack the
> stdout/stderr in Flink code. However, it is a temporary hack.
>
> A good way is to write your logs to other separate files that could roll
> via log4j. If you want to access them in the Flink webUI,
> upgrade to the 1.11 version. Then you will find a "Log List" tab under
> JobManager sidebar.
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年8月4日周二 下午2:52写道:
>
>> Hi Yang,
>>
>> you are right. Since then, I looked for open files and found *.out/*.err
>> files on that partition and as you mentioned they don't roll.
>> I could implement a workaround to restart the streaming job every week or
>> so, but I really don't want to go this way.
>>
>> I tried to forward logs to files and then I could roll them, but then I
>> don't see logs in the GUI.
>>
>> So my question would be, how to make them roll ?
>>
>> Regards,
>> Maxim.
>>
>> On Tue, Aug 4, 2020 at 4:48 AM Yang Wang  wrote:
>>
>>> Hi Maxim,
>>>
>>> First, i want to confirm with you that do you have checked all the
>>> "yarn.nodemanager.log-dirs". If you
>>> could access the logs in Flink webUI, the log files(e.g.
>>> taskmanager.log, taskmanager.out, taskmanager.err)
>>> should exist. I suggest to double check the multiple log-dirs.
>>>
>>> Since the *.out/err files do not roll, if you print some user logs to
>>> the stdout/stderr, the two files will increase
>>> over time.
>>>
>>> When you stop the Flink application, Yarn will clean up all the jars and
>>> logs, so you find that the disk space get back.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Maxim Parkachov  于2020年7月30日周四 下午10:00写道:
>>>
>>>> Hi everyone,
>>>>
>>>> I have a strange issue with flink logging. I use pretty much standard
>>>> log4 config, which is writing to standard output in order to see it in
>>>> Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no
>>>> problem. On the servers, where Flink YARN containers are running, there is
>>>> disk quota on the partition where YARN normally creates logs. I see no
>>>> specific files in the application_xx directory, but space on the disk is
>>>> actually decreasing with time. After several weeks we eventually hit quota.
>>>> It seems like some file or pipe is created but not closed, but still
>>>> reserves the space. After I restart Flink job, space is
>>>> immediately returned back. I'm sure that flink job is the problem, I have
>>>> re-produces issue on a cluster where only 1 filnk job was running. Below is
>>>> my log4 config. Any help or idea is appreciated.
>>>>
>>>> Thanks in advance,
>>>> Maxim.
>>>> ---
>>>> # This affects logging for both user code and Flink
>>>> log4j.rootLogger=INFO, file, stderr
>>>>
>>>> # Uncomment this if you want to _only_ change Flink's logging
>>>> #log4j.logger.org.apache.flink=INFO
>>>>
>>>> # The following lines keep the log level of common libraries/connectors
>>>> on
>>>> # log level INFO. The root logger does not override this. You have to
>>>> manually
>>>> # change the log levels here.
>>>> log4j.logger.akka=INFO
>>>> log4j.logger.org.apache.kafka=INFO
>>>> log4j.logger.org.apache.hadoop=INFO
>>>> log4j.logger.org.apache.zookeeper=INFO
>>>>
>>>> # Log all infos in the given file
>>>> log4j.appender.file=org.apache.log4j.FileAppender
>>>> log4j.appender.file.file=${log.file}
>>>> log4j.appender.file.append=false
>>>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>>>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
>>>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>>>>
>>>> # Suppress the irrelevant (wrong) warnings from the Netty channel
>>>> handler
>>>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>>>> file
>>>>
>>>>


Re: Flink streaming job logging reserves space

2020-08-04 Thread Maxim Parkachov
Hi Yang,

you are right. Since then, I looked for open files and found *.out/*.err
files on that partition and as you mentioned they don't roll.
I could implement a workaround to restart the streaming job every week or
so, but I really don't want to go this way.

I tried to forward logs to files and then I could roll them, but then I
don't see logs in the GUI.

So my question would be, how to make them roll ?

Regards,
Maxim.

On Tue, Aug 4, 2020 at 4:48 AM Yang Wang  wrote:

> Hi Maxim,
>
> First, i want to confirm with you that do you have checked all the
> "yarn.nodemanager.log-dirs". If you
> could access the logs in Flink webUI, the log files(e.g. taskmanager.log,
> taskmanager.out, taskmanager.err)
> should exist. I suggest to double check the multiple log-dirs.
>
> Since the *.out/err files do not roll, if you print some user logs to the
> stdout/stderr, the two files will increase
> over time.
>
> When you stop the Flink application, Yarn will clean up all the jars and
> logs, so you find that the disk space get back.
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年7月30日周四 下午10:00写道:
>
>> Hi everyone,
>>
>> I have a strange issue with flink logging. I use pretty much standard
>> log4 config, which is writing to standard output in order to see it in
>> Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no
>> problem. On the servers, where Flink YARN containers are running, there is
>> disk quota on the partition where YARN normally creates logs. I see no
>> specific files in the application_xx directory, but space on the disk is
>> actually decreasing with time. After several weeks we eventually hit quota.
>> It seems like some file or pipe is created but not closed, but still
>> reserves the space. After I restart Flink job, space is
>> immediately returned back. I'm sure that flink job is the problem, I have
>> re-produces issue on a cluster where only 1 filnk job was running. Below is
>> my log4 config. Any help or idea is appreciated.
>>
>> Thanks in advance,
>> Maxim.
>> ---
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file, stderr
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> #log4j.logger.org.apache.flink=INFO
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>> file
>>
>>


Flink streaming job logging reserves space

2020-07-30 Thread Maxim Parkachov
Hi everyone,

I have a strange issue with flink logging. I use pretty much standard log4
config, which is writing to standard output in order to see it in Flink
GUI. Deployment is on YARN with job mode. I can see logs in UI, no problem.
On the servers, where Flink YARN containers are running, there is disk
quota on the partition where YARN normally creates logs. I see no specific
files in the application_xx directory, but space on the disk is actually
decreasing with time. After several weeks we eventually hit quota. It seems
like some file or pipe is created but not closed, but still reserves the
space. After I restart Flink job, space is immediately returned back. I'm
sure that flink job is the problem, I have re-produces issue on a cluster
where only 1 filnk job was running. Below is my log4 config. Any help or
idea is appreciated.

Thanks in advance,
Maxim.
---
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file, stderr

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file


Re: Wait for cancellation event with CEP

2020-05-04 Thread Maxim Parkachov
Hi Till,

thank you for very detailed answer, now it is absolutely clear.

Regards,
Maxim.

On Thu, Apr 30, 2020 at 7:19 PM Till Rohrmann  wrote:

> Hi Maxim,
>
> I think your problem should be solvable with the CEP library:
>
> So what we are doing here is to define a pattern forward followed by any
> cancel. Moreover we say that it must happen within 1 minute. If this does
> not happen then we will see a timeout where we can create the follow-up
> event.
>
> Cheers,
> Till
>
>>
>>


Wait for cancellation event with CEP

2020-04-30 Thread Maxim Parkachov
Hi everyone,

I need to implement following functionality. There is a kafka topic where
"forward" events are coming and in the same topic there are "cancel"
events. For each "forward" event I need to wait 1 minute for possible
"cancel" event. I can uniquely match both events. If "cancel" event comes
within this minute I need to delete "forward" event, otherwise pass
"forward" event further in another kafka topic. "Cancel" events older than
1 minute could be ignored.

I was thinking to implement ProcessFunction with putting "forward" events
in state with timer. If "cancel" event is coming I will delete "forward"
event from state.

My question: Is there more simple way to implement the same logic, possibly
with CEP ?

Thanks,
Maxim.


Re: New kafka producer on each checkpoint

2020-04-13 Thread Maxim Parkachov
Hi Yun,

thanks for the answer. I did now increased checkpoint interval, but still I
don't understand reason for creating producer and re-connecting to to kafka
broker each time. According to documentation:

Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers
per each FlinkKafkaProducer011 instance. One of each of those producers is
used per one checkpoint. If the number of concurrent checkpoints exceeds
the pool size, FlinkKafkaProducer011 will throw an exception and will fail
the whole application. Please configure max pool size and max number of
concurrent checkpoints accordingly.

I assumed that this is also true for post 011 producers as well. I expected
to have 5 (default) producers created and used without re-instantiating
producer each time. In my case checkpoint is so fast that I will never have
concurrent checkpoints.

Regards,
Maxim.


On Wed, Apr 8, 2020 at 4:52 AM Yun Tang  wrote:

> Hi Maxim
>
> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
> for flink kafka producer. It will create new producer when every new
> checkpoint comes [1]. This is by design and from my point of view, the
> checkpoint interval of 10 seconds might be a bit too often. In general I
> think interval of 3 minutes should be enough. If you cannot offer the
> source rewind time after failover, you could turn the interval more often.
>
>
> [1]
> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>
> Best
> Yun Tang
> --
> *From:* Maxim Parkachov 
> *Sent:* Monday, April 6, 2020 23:16
> *To:* user@flink.apache.org 
> *Subject:* New kafka producer on each checkpoint
>
> Hi everyone,
>
> I'm trying to test exactly once functionality with my job under production
> load. The job is reading from kafka, using kafka timestamp as event time,
> aggregates every minute and outputs to other kafka topic. I use checkpoint
> interval 10 seconds.
>
> Everything seems to be working fine, but when I look to the log on INFO
> level, I see that with each checkpoint, new kafka producer is created and
> then closed again.
>
> 1. Is this how it is supposed to work ?
> 2. Is checkpoint interval 10 second too often ?
>
> Thanks,
> Maxim.
>


New kafka producer on each checkpoint

2020-04-06 Thread Maxim Parkachov
Hi everyone,

I'm trying to test exactly once functionality with my job under production
load. The job is reading from kafka, using kafka timestamp as event time,
aggregates every minute and outputs to other kafka topic. I use checkpoint
interval 10 seconds.

Everything seems to be working fine, but when I look to the log on INFO
level, I see that with each checkpoint, new kafka producer is created and
then closed again.

1. Is this how it is supposed to work ?
2. Is checkpoint interval 10 second too often ?

Thanks,
Maxim.


Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Maxim Parkachov
Hi Yang,

I've just tried your suggestions, but, unfortunately, in yarn per job mode
it doesn't work, both commands return null.
I double checked that file is shipped to yarn container, but I feel that it
happens later in process.
At the moment I'm reading file with File interface, instead of getting it
as resource, which I do in local mode.

Regards,
Maxim.


On Mon, Feb 17, 2020 at 3:03 PM Yang Wang  wrote:

> Hi Maxim,
>
> I have verified that the following two ways could both work.
>
> getClass().getClassLoader().getResource("lib/job.properties")
> getClass().getClassLoader().getResource("job.properties")
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年2月17日周一 下午6:47写道:
>
>> Hi Yang,
>>
>> thanks, this explains why classpath behavior changed, but now I struggle
>> to
>> understand how I could overwrite resource, which is already shipped in
>> job jar.
>>
>> Before I had job.properties files in JAR in under
>> resources/lib/job.properties
>> for local development and deploying on cluster it was overwritten
>> with environment specific settings in  lib/job.properties of flink
>> distribution.
>> Now this doesn't seem to work.  I'm using:
>>
>> getClass.getClassLoader.getResource("lib/job.properties")
>>
>> to get file. Could it be the problem ?
>>
>> Thanks,
>> Maxim.
>>
>> On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:
>>
>>> Hi Maxim Parkachov,
>>>
>>> The users files also have been shipped to JobManager and TaskManager.
>>> However, it
>>> is not directly added to the classpath. Instead, the parent directory is
>>> added to the
>>> classpath. This changes are to make resource classloading work. You
>>> could check more
>>> information here[1].
>>>
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-13127
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Maxim Parkachov  于2020年2月15日周六 上午12:58写道:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm trying to run my job with flink 1.10 with YARN cluster per-job
>>>> mode. In the previous versions all files in lib/ folder were automatically
>>>> included in classpath. Now, with 1.10 I see only *.jar files are included
>>>> in classpath. but not "other" files. Is this deliberate change or bug ?
>>>>
>>>> Generally, what is recommended way to include custom files in classpath
>>>> and ship it during start to all containers ?
>>>>
>>>> Thank
>>>>
>>>


Re: [Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-17 Thread Maxim Parkachov
Hi Yang,

thanks, this explains why classpath behavior changed, but now I struggle to
understand how I could overwrite resource, which is already shipped in job
jar.

Before I had job.properties files in JAR in under
resources/lib/job.properties
for local development and deploying on cluster it was overwritten
with environment specific settings in  lib/job.properties of flink
distribution.
Now this doesn't seem to work.  I'm using:

getClass.getClassLoader.getResource("lib/job.properties")

to get file. Could it be the problem ?

Thanks,
Maxim.

On Mon, Feb 17, 2020 at 4:12 AM Yang Wang  wrote:

> Hi Maxim Parkachov,
>
> The users files also have been shipped to JobManager and TaskManager.
> However, it
> is not directly added to the classpath. Instead, the parent directory is
> added to the
> classpath. This changes are to make resource classloading work. You could
> check more
> information here[1].
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13127
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年2月15日周六 上午12:58写道:
>
>> Hi everyone,
>>
>> I'm trying to run my job with flink 1.10 with YARN cluster per-job mode.
>> In the previous versions all files in lib/ folder were automatically
>> included in classpath. Now, with 1.10 I see only *.jar files are included
>> in classpath. but not "other" files. Is this deliberate change or bug ?
>>
>> Generally, what is recommended way to include custom files in classpath
>> and ship it during start to all containers ?
>>
>> Thank
>>
>


[Flink 1.10] Classpath doesn't include custom files in lib/

2020-02-14 Thread Maxim Parkachov
Hi everyone,

I'm trying to run my job with flink 1.10 with YARN cluster per-job mode. In
the previous versions all files in lib/ folder were automatically included
in classpath. Now, with 1.10 I see only *.jar files are included in
classpath. but not "other" files. Is this deliberate change or bug ?

Generally, what is recommended way to include custom files in classpath and
ship it during start to all containers ?

Thank


Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-07 Thread Maxim Parkachov
Hi Chesnay,

I managed to re-compile with MapR zookeeper and can confirm that it works
with HA as well.
Still I find it strange that HA uses shadow version of zookeeper instead of
version from classpath how it is done for hadoop.

Thanks,
Maxim.

On Wed, Feb 5, 2020 at 3:43 PM Chesnay Schepler  wrote:

> No, since a) HA will never use classes from the user-jar and b) zookeeper
> is relocated to a different package (to avoid conflicts) and hence any
> replacement has to follow the same relocation convention.
>
> On 05/02/2020 15:38, Maxim Parkachov wrote:
>
> Hi Chesnay,
>
> thanks for advise. Will it work if I include MapR specific zookeeper in
> job dependencies and still use out-of-box Flink binary distribution ?
>
> Regards,
> Maxim.
>
> On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler 
> wrote:
>
>> You must rebuild Flink while overriding zookeeper.version property to
>> match your MapR setup.
>> For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
>> Note that you will also have to configure the MapR repository in your
>> local setup as described here
>> <https://maven.apache.org/guides/mini/guide-multiple-repositories.html>.
>>
>> On 05/02/2020 15:12, Maxim Parkachov wrote:
>>
>> Hi everyone,
>>
>> I have already written about issue with Flink 1.9 on secure MapR cluster
>> and high availability. The issue was resolved with custom compiled Flink
>> with vendor mapr repositories enabled. The history could be found
>> https://www.mail-archive.com/user@flink.apache.org/msg28235.html
>>
>> Unfortunately, in current 1.10 RC vendor repositories were removed and
>> I'm failing to get working configuration.  Current situation with 1.10 RC
>> and secure MapR cluster:
>>
>> 1. Without HA, Flink uses class path provided zookeeper jar (mapr
>> specific) and everything works fine.
>>
>> 2. With HA enabled, Flink uses shaded zookeeper  
>> (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
>> which  doesn't have MapR specific changes and fails to authenticate.
>>
>> I would really appreciate any help in resolving this issue.I'm ready to
>> provide any required details.
>>
>> Regards,
>> Maxim.
>>
>>
>>
>


Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Maxim Parkachov
Hi Chesnay,

thanks for advise. Will it work if I include MapR specific zookeeper in job
dependencies and still use out-of-box Flink binary distribution ?

Regards,
Maxim.

On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler  wrote:

> You must rebuild Flink while overriding zookeeper.version property to
> match your MapR setup.
> For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
> Note that you will also have to configure the MapR repository in your
> local setup as described here
> <https://maven.apache.org/guides/mini/guide-multiple-repositories.html>.
>
> On 05/02/2020 15:12, Maxim Parkachov wrote:
>
> Hi everyone,
>
> I have already written about issue with Flink 1.9 on secure MapR cluster
> and high availability. The issue was resolved with custom compiled Flink
> with vendor mapr repositories enabled. The history could be found
> https://www.mail-archive.com/user@flink.apache.org/msg28235.html
>
> Unfortunately, in current 1.10 RC vendor repositories were removed and I'm
> failing to get working configuration.  Current situation with 1.10 RC and
> secure MapR cluster:
>
> 1. Without HA, Flink uses class path provided zookeeper jar (mapr
> specific) and everything works fine.
>
> 2. With HA enabled, Flink uses shaded zookeeper  
> (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
> which  doesn't have MapR specific changes and fails to authenticate.
>
> I would really appreciate any help in resolving this issue.I'm ready to
> provide any required details.
>
> Regards,
> Maxim.
>
>
>


Flink 1.10 on MapR secure cluster with high availability

2020-02-05 Thread Maxim Parkachov
Hi everyone,

I have already written about issue with Flink 1.9 on secure MapR cluster
and high availability. The issue was resolved with custom compiled Flink
with vendor mapr repositories enabled. The history could be found
https://www.mail-archive.com/user@flink.apache.org/msg28235.html

Unfortunately, in current 1.10 RC vendor repositories were removed and I'm
failing to get working configuration.  Current situation with 1.10 RC and
secure MapR cluster:

1. Without HA, Flink uses class path provided zookeeper jar (mapr specific)
and everything works fine.

2. With HA enabled, Flink uses shaded zookeeper
(org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
which  doesn't have MapR specific changes and fails to authenticate.

I would really appreciate any help in resolving this issue.I'm ready to
provide any required details.

Regards,
Maxim.


Re: Initialization of broadcast state before processing main stream

2019-11-14 Thread Maxim Parkachov
Hi Vasily,

unfortunately, this is known issue with Flink, you could read discussion
under
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 .

At the moment I have seen 3 solutions for this issue:

1. You buffer fact stream in local state before broadcast is completely read
2. You create custom source for fact stream and in open method wait before
broadcast stream is completely read.
3. With latest Flink version, you could pre-populate state with dimension
and start Flink job with existing state. You need to take care of setting
correct kafka offsets for dimension stream though, otherwise you will get a
gap between pre-populated state and moment when job is started.

First 2  solutions need to know when broadcast stream is "completely read".
I created workaround for this issue with custom source for dimension
events. It creates "stop file" on shared file system, reads with admin
interface kafka end offsets for dimension topic, start processing all
messages from beginning and clears "stop file" after offset of messages
reached end offsets for all partitions. Instead of "stop file" you could
use shared lock in zookeeper.

Hope this helps,
Maxim.

On Thu, Nov 14, 2019 at 7:42 AM vino yang  wrote:

> Hi Vasily,
>
> Currently, Flink did not do the coordination between a general stream and
> broadcast stream, they are both streams. Your scene of using the broadcast
> state is a special one. In a more general scene, the states need to be
> broadcasted is an unbounded stream, the state events may be broadcasted to
> the downstream at any time. So it can not be wait to be done before playing
> the usual stream events.
>
> For your scene:
>
>
>- you can change your storage about dimension table, e.g. Redis or
>MySQL and so on to do the stream and dimension table join;
>- you can inject some control event in your broadcast stream to mark
>the stream is end and let the fact stream wait until receiving the control
>event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>coordinate them, however, it would make your solution more complex.
>
> Best,
> Vino
>
>
> Vasily Melnik  于2019年11月14日周四
> 下午1:28写道:
>
>> Hi all.
>>
>> In our task we have two Kafka topics:
>> - one with fact stream (web traffic)
>> - one with dimension
>>
>> We would like to put dimension data into broadcast state and lookup on
>> int with facts. But we see that not all dimension records are put into
>> state before first fact record is processed, so lookup gives no data.
>>
>> The question is: how could we read fact topic with some "delay" to give
>> dimension enough time to initialize state?
>>
>>
>> С уважением,
>> Василий Мельник
>>
>


Re: Flink 1.9, MapR secure cluster, high availability

2019-09-16 Thread Maxim Parkachov
Hi Stephan,

sorry for the late answer, didn't have access to cluster.
Here is log and stacktrace.

Hope this helps,
Maxim.

-
2019-09-16 18:00:31,804 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2019-09-16 18:00:31,806 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting
YarnSessionClusterEntrypoint (Version: 1.9.0, Rev:9c32ed9, Date:19.08.2019
@ 16:16:55 UTC)
2019-09-16 18:00:31,806 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  OS
current user: run
2019-09-16 18:00:32,285 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Current
Hadoop/Kerberos user: run
2019-09-16 18:00:32,285 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM: Java
HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.152-b16
2019-09-16 18:00:32,285 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Maximum
heap size: 161 MiBytes
2019-09-16 18:00:32,285 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 JAVA_HOME: /pb/apps/java/jdk1.8.0_152/
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
version: 2.7.0-mapr-1808
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms168m
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx168m
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog.file=/opt/mapr/hadoop/hadoop-2.7.0/logs/userlogs/application_1568125115996_13508/container_e64_1568125115996_13508_01_01/jobmanager.log
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog4j.configuration=file:log4j.properties
2019-09-16 18:00:32,286 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program
Arguments: (none)
2019-09-16 18:00:32,287 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Classpath:
log4j.properties:flink.jar:flink-conf.yaml::/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0-mapr-1808-tests.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0-mapr-1808.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/hadoop-nfs-2.7.0-mapr-1808.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jackson-databind-2.9.5.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/activation-1.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jsr305-3.0.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jersey-server-1.9.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/junit-4.11.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/asm-3.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/maprdb-6.1.0-mapr.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/avro-1.7.6.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mockito-all-1.8.5.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-beanutils-1.7.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jettison-1.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mapr-spark-yarn-shuffle.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-cli-1.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/netty-3.6.2.Final.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-codec-1.4.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jetty-6.1.26.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-collections-3.2.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/xz-1.0.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-compress-1.4.1.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/jetty-util-6.1.26.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-configuration-1.6.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/json-smart-1.2.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/commons-digester-1.8.jar:/opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/maprfs-6.1.0-mapr.jar:/opt/mapr

Re: Flink 1.9, MapR secure cluster, high availability

2019-08-30 Thread Maxim Parkachov
Hi Stephan,

With previous versions, I tried around 1.7, I always had to compile MapR
hadoop to get it working.
With 1.9 I took hadoop-less Flink, which worked with MapR FS until I
switched on HA.
So it is hard to say if this is regression or not.

The error happens when Flink tries to initialize BLOB storage on MapR FS.
Without HA it takes
zookeeper from classpath (MapR org.apache.zookeeper) and with HA it takes
shaded one.

After fixing couple of issue with pom, I was able to compile Flink with
MapR zookeeper and now
when I start with HA mode it uses shaded zookeeper (which is now MapR) to
initialize BLOB and
org.apache.zookeeper (which is as well MapR) for HA recovery.

It works, but, I was expecting it to work without compiling MapR
dependencies.

Hope this helps,
Maxim.

On Thu, Aug 29, 2019 at 7:00 PM Stephan Ewen  wrote:

> Hi Maxim!
>
> The change of the MapR dependency should not have an impact on that.
> Do you know if the same thing worked in prior Flink versions? Is that a
> regression in 1.9?
>
> The exception that you report, is that from Flink's HA services trying to
> connect to ZK, or from the MapR FS client trying to connect to ZK?
>
> Best,
> Stephan
>
>
> On Tue, Aug 27, 2019 at 11:03 AM Maxim Parkachov 
> wrote:
>
>> Hi everyone,
>>
>> I'm testing release 1.9 on MapR secure cluster. I took flink binaries
>> from download page and trying to start Yarn session cluster. All MapR
>> specific libraries and configs are added according to documentation.
>>
>> When I start yarn-session without high availability, it uses zookeeper
>> from MapR distribution (org.apache.zookeeper) and correctly connects to
>> cluster and access to maprfs works as expected.
>>
>> But if I add zookeeper as high-avalability option, instead of MapR
>> zookeeper it tries to use shaded zookeeper and this one could not connect
>> with mapr credentials:
>>
>> 2019-08-27 10:42:45,240 ERROR 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient
>>   - An error: (java.security.PrivilegedActionException: 
>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>> GSSException: No valid credentials provided (Mechanism level: Failed to find 
>> any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum Member's  
>> received SASL token. Zookeeper Client will go to AUTH_FAILED state.
>> 2019-08-27 10:42:45,240 ERROR 
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
>> authentication with Zookeeper Quorum member failed: 
>> javax.security.sasl.SaslException: An error: 
>> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
>> GSS initiate failed [Caused by GSSException: No valid credentials provided 
>> (Mechanism level: Failed to find any Kerberos tgt)]) occurred when 
>> evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client 
>> will go to AUTH_FAILED state.
>> I tried to use separate zookeeper cluster for HA, but maprfs still doesn't 
>> work.
>>
>> Is this related to removal of MapR specific settings in Release 1.9 ?
>> Should I still compile custom version of Flink with MapR dependencies ?
>> (trying to do now, but getting some errors during compilation).
>>
>> Can I somehow force flink to use MapR zookeeper even with HA mode ?
>>
>> Thanks in advance,
>> Maxim.
>>
>>


Flink 1.9, MapR secure cluster, high availability

2019-08-27 Thread Maxim Parkachov
Hi everyone,

I'm testing release 1.9 on MapR secure cluster. I took flink binaries from
download page and trying to start Yarn session cluster. All MapR specific
libraries and configs are added according to documentation.

When I start yarn-session without high availability, it uses zookeeper from
MapR distribution (org.apache.zookeeper) and correctly connects to cluster
and access to maprfs works as expected.

But if I add zookeeper as high-avalability option, instead of MapR
zookeeper it tries to use shaded zookeeper and this one could not connect
with mapr credentials:

2019-08-27 10:42:45,240 ERROR
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient
 - An error: (java.security.PrivilegedActionException:
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed
to find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum
Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED
state.
2019-08-27 10:42:45,240 ERROR
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
SASL authentication with Zookeeper Quorum member failed:
javax.security.sasl.SaslException: An error:
(java.security.PrivilegedActionException:
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed
to find any Kerberos tgt)]) occurred when evaluating Zookeeper Quorum
Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED
state.
I tried to use separate zookeeper cluster for HA, but maprfs still doesn't work.

Is this related to removal of MapR specific settings in Release 1.9 ?
Should I still compile custom version of Flink with MapR dependencies ?
(trying to do now, but getting some errors during compilation).

Can I somehow force flink to use MapR zookeeper even with HA mode ?

Thanks in advance,
Maxim.


Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Maxim Parkachov
Hi Vasily,

as far as I know, by default console-consumer reads uncommited.
Try setting  isolation.level to read_committed in console-consumer
properties.

Hope this helps,
Maxim.

On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi, Eduardo.
> Maybe i should describe experiment design  precisely :
> 1) I run Flink on YARN (YARN Session method).
> 2) I do not stop/cancell application, i just kill TaskManager process
> 3) After that YARN creates another TaskManager Process and auto checkpoint
> restore from HDFS happens.
>
> That's why i expect to see correct restore.
>
> С уважением,
> Василий Мельник
>
> *Glow**Byte Consulting* <http://www.gbconsulting.ru/>
>
> ===
>
> Моб. тел.: +7 (903) 101-43-71
> vasily.mel...@glowbyteconsulting.com
>
>
> On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com> wrote:
>
>> Hi Vasily,
>>
>> You're probably executing this from your IDE or from a local Flink
>> cluster without starting your job from a checkpoint.
>>
>> When you start your Flink job for the second time you need to specify the
>> path to the latest checkpoint as an argument, otherwise Flink will start
>> from scratch.
>>
>> You're probably thinking that's not great, ideally Flink should be able
>> to automatically continue from the last produced checkpoint, and actually
>> that's what the docs say! Well, that's only when you're running in a proper
>> cluster environment. Flink is able to recover using checkpoints when only
>> part of the cluster fails, not when the whole job is stopped. For full
>> stops you need to specify the checkpoint manually.
>>
>> Hope that helps!
>>
>>
>> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
>> vasily.mel...@glowbyteconsulting.com> wrote:
>>
>>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>>> and Sink:
>>>
>>>1. Run flink app, simply transferring messages from one topic to
>>>another with parallelism=1, checkpoint interval 20 seconds
>>>2. Generate messages with incrementing integer numbers using Python
>>>script each 2 seconds.
>>>3. Read output topic with console consumer in read_committed
>>>isolation level.
>>>4. Manually kill TaskManager
>>>
>>> I expect to see monotonically increasing integers in output topic
>>> regardless TaskManager killing and recovery.
>>>
>>> But actually a see something unexpected in console-consumer output:
>>>
>>> 32
>>> 33
>>> 34
>>> 35
>>> 36
>>> 37
>>> 38
>>> 39
>>> 40
>>> -- TaskManager Killed
>>> 32
>>> 34
>>> 35
>>> 36
>>> 40
>>> 41
>>> 46
>>> 31
>>> 33
>>> 37
>>> 38
>>> 39
>>> 42
>>> 43
>>> 44
>>> 45
>>>
>>> Looks like all messages between checkpoints where replayed in output
>>> topic. Also i expected to see results in output topic only after
>>> checkpointing i.e. each 20 seconds, but messages appeared in output
>>> immediately as they where send to input.
>>> Is it supposed to be correct behaviour or i do something wrong?
>>>
>>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>>> producer and read-committed console comsumer with custom code and it worked
>>> perfectly well reading messages only after commitTransaction on producer.
>>>
>>> My Flink code:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.getConfig().setAutoWatermarkInterval(1000);
>>> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
>>> env.setStateBackend(new 
>>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>>
>>> Properties producerProperty = new Properties();
>>> producerProperty.setProperty("bootstrap.servers", ...);
>>> producerProperty.setProperty("zookeeper.connect", ...);
>>> 
>>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");
>>> 
>>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>>> 
>>>

Re: Providing external files to flink classpath

2019-07-18 Thread Maxim Parkachov
Hi Vishwas,

took me some time to find out as well. If you have your properties file
under lib following will work:

val kafkaPropertiesInputStream =
getClass.getClassLoader.getResourceAsStream("lib/config/kafka.properties")

Hope this helps,
Maxim.

On Wed, Jul 17, 2019 at 7:23 PM Vishwas Siravara 
wrote:

> Does the -yt option work for standalone cluster without dedicated resource
> manager ? So this property file is read by one of the dependencies inside
> my application as a file, so I can't really use Parameter tool to parse the
> config file.
>
> Thanks,
> Vishwas
>
> On Fri, Jun 28, 2019 at 11:08 PM Yun Tang  wrote:
>
>> Hi Vishwas
>>
>>
>>1. You could use '-yt' to ship specified files to the class path,
>>please refer to [1] for more details.
>>2. If the properties are only loaded on client side before executing
>>the application, you could let your application to just read from local
>>property data. Flink support to load properties within the
>>ParameterTool [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage
>> [2]
>> https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120
>>
>> Best
>> Yun Tang
>>
>> --
>> *From:* Vishwas Siravara 
>> *Sent:* Saturday, June 29, 2019 0:43
>> *To:* user
>> *Subject:* Providing external files to flink classpath
>>
>> Hi ,
>> I am trying to add external property files to the flink classpath for
>> my application. These files are not a part of the fat jar. I put them
>> under the lib folder but flink cant find them? How can I manage
>> external property files that needs to be read by flink ?
>>
>> Thanks,
>> Vishwas
>>
>


Re: yarn-session vs cluster per job for streaming jobs

2019-07-18 Thread Maxim Parkachov
Hi Haibo,

thanks for tip, I almost forgot about max-attempts. I understood
implication of running with one AM.

Maybe my question was incorrect, but what would be faster (with regards to
downtime of each job):

1. In case of yarn-session: Parallel cancel all jobs with savepoints,
restart yarn-session, parallel start all jobs from savepoints
2. In case of per-job mode Parallel cancel all jobs with savepoints,
parallel start all jobs from savepoints.

I want to optimise standard situation where I deploy new version of all my
jobs. My current impression that job starts faster in yarn-session mode.

Thanks,
Maxim.


On Thu, Jul 18, 2019 at 4:57 AM Haibo Sun  wrote:

> Hi, Maxim
>
> For the concern talking on the first point:
> If HA and checkpointing are enabled, AM (the application master, that is
> the job manager you said) will be restarted by YARN after it dies, and then
> the dispatcher will try to restore all the previously running jobs
> correctly. Note that the number of attempts be decided by the
> configurations "yarn.resourcemanager.am.max-attempts" and
> "yarn.application-attempts". The obvious difference between the session and
> per-job modes is that if a fatal error occurs on AM, it will affect all
> jobs running in it, while the per-job mode will only affect one job.
>
> You can look at this document to see how to configure HA for the Flink
> cluster on YARN:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
>  .
>
> Best,
> Haibo
>
> At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:
>
> Hi,
>
> I'm looking for advice on how to run flink streaming jobs on Yarn cluster
> in production environment. I tried in testing environment both approaches
> with HA mode, namely yarn session + multiple jobs vs cluster per job, both
> seems to work for my cases, with slight preference of yarn session mode to
> centrally manage credentials. I'm looking to run about 10 streaming jobs
> mostly reading/writing from kafka + cassandra with following restictions:
> 1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I
> have a concern here what happens when Job manager dies in session mode.
> 2. there are often network interruptions/slowdowns.
> 3. I'm trying to minimise time to restart job to have as much as possible
> continious processing.
>
> Thanks in advance,
> Maxim.
>
>


yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Maxim Parkachov
Hi,

I'm looking for advice on how to run flink streaming jobs on Yarn cluster
in production environment. I tried in testing environment both approaches
with HA mode, namely yarn session + multiple jobs vs cluster per job, both
seems to work for my cases, with slight preference of yarn session mode to
centrally manage credentials. I'm looking to run about 10 streaming jobs
mostly reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I
have a concern here what happens when Job manager dies in session mode.
2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible
continious processing.

Thanks in advance,
Maxim.


Re: Automatic deployment of new version of streaming stateful job

2019-07-17 Thread Maxim Parkachov
Hi Marc,

thanks a lot for the tool. Unfortunately, I could not direcly use it, but I
will take couple of ideas and will implement my own script.

Nevertherless, I'm really surprised that such functionality doesn't exist
out of the box.

Regards,
Maxim.

On Tue, Jul 16, 2019 at 9:22 AM Marc Rooding  wrote:

> Hi Maxim
>
> You could write a script yourself which triggers the cancel with savepoint
> and then starts a new version using the savepoint that was created during
> the cancel.
>
> However, I’ve built a tool that allows you to perform these steps more
> easily: https://github.com/ing-bank/flink-deployer. The deployer will
> allow you to deploy or upgrade your jobs. All you need to do is integrate
> it into your CI/CD.
>
> Kind regards
>
> Marc
> On 16 Jul 2019, 02:46 +0200, Maxim Parkachov ,
> wrote:
>
> Hi,
>
> I'm trying to bring my first stateful streaming Flink job to production
> and have trouble understanding how to integrate it with CI/CD pipeline. I
> can cancel the job with savepoint, but in order to start new version of
> application I need to specify savepoint path manually ?
>
> So, basically my question, what is best practice of automatically
> restarting or deploying new version of stateful streaming application ?
> Every tip is greatly appreciated.
>
> Thanks,
> Maxim.
>
>


Automatic deployment of new version of streaming stateful job

2019-07-15 Thread Maxim Parkachov
Hi,

I'm trying to bring my first stateful streaming Flink job to production and
have trouble understanding how to integrate it with CI/CD pipeline. I can
cancel the job with savepoint, but in order to start new version of
application I need to specify savepoint path manually ?

So, basically my question, what is best practice of automatically
restarting or deploying new version of stateful streaming application ?
Every tip is greatly appreciated.

Thanks,
Maxim.


Re: How autoscaling works on Kinesis Data Analytics for Java ?

2019-04-22 Thread Maxim Parkachov
Hi,

Answering to myself in case someone else is interested as well. As per
https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/
it
does autoscaling itself, but in order to change parallelism it takes
snapshot and restarts streaming job, so no magic here, but nicely
automated.

Regards,
Maxim.

On Tue, Jan 29, 2019 at 5:23 AM Maxim Parkachov 
wrote:

> Hi,
>
> I had impression, that in order to change parallelism, one need to stop
> Flink streaming job and re-start with new settings.
>
> According to
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html
> auto-scaling works out of the box. Could someone with experience of running
> Flink on AWS Data Analytics for Java give a short explanation ?
>
> Thanks in advance.
> Maxim.
>


How autoscaling works on Kinesis Data Analytics for Java ?

2019-01-28 Thread Maxim Parkachov
Hi,

I had impression, that in order to change parallelism, one need to stop
Flink streaming job and re-start with new settings.

According to
https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html
auto-scaling works out of the box. Could someone with experience of running
Flink on AWS Data Analytics for Java give a short explanation ?

Thanks in advance.
Maxim.


Re: Ship compiled code with broadcast stream ?

2018-10-09 Thread Maxim Parkachov
Hi,

This is certainly possible. What you can do is use a
> BroadcastProcessFunction where you receive the rule code on the broadcast
> side.
>

Yes, this part works, no problem.


> You probably cannot send newly compiled objects this way but what you can
> do is either send a reference to some compiled jars and load them with the
> URLClassloader or send the actual String code and invoke the java compiler
> from your function.
>

Compiling from source code ... actually, I can implement rules in nashorn
JavaScript, but I'm pretty sure I'll get not serializable exception, but
I'll check this.

Referencing existing jar will not solve problem as I would need to
re-submit job, which I want to avoid in the first place. I actually wanted
exactly first scenario, send newly compiled objects.

Regards,
Maxim.

>


Ship compiled code with broadcast stream ?

2018-10-09 Thread Maxim Parkachov
Hi everyone,

I have a job with event stream and control stream delivering rules for
event transformation. Rules are broadcasted and used in flatMat-like
coProcessFunction. Rules are defined in custom JSON format. Amount of rules
and complexity rises significantly with every new feature.

What I would like is to ship compiled (serialized ?) code instead of JSON
rules to control stream and use this compiled classes directly without
additional transformation. This will allow to get more robust testing and
will allow to implement much more complex rules. But I'm struggling to
understand how to achieve this.

Did someone implement system like this ? Is this possible at all ?

Any help is greatly appreciated,
Maxim.


Multiple Async IO

2018-04-03 Thread Maxim Parkachov
Hi everyone,

I'm writing streaming job which needs to query Cassandra for each event
multiple times, around 20. I would like to use Async IO for that but not
sure which option to choose:

1. Implement One AsyncFunction with 20 queries inside
2. Implement 20 AsyncFunctions, each with 1 query inside

Taking into account that each event needs all queries. Reduce amount of
queries for each record is not an option.

In this case I would like to minimise processing time of event, even if
throughput will suffer. Any advice or consideration is greatly appreciated.

Thanks,
Maxim.


Re: Forcing consuming one stream completely prior to another starting

2018-01-20 Thread Maxim Parkachov
Hi Ron,

I’m joining two streams - one is a “decoration” stream that we have in a
> compacted Kafka topic, produced using a view on a MySQL table AND using
> Kafka Connect; the other is the “event data” we want to decorate, coming in
> over time via Kafka. These streams are keyed the same way - via an “id”
> field, and we join them using CoFlatMap that attaches the data from the
> decoration stream to the event data and publishes downstream.
>
> What I’d like to do in my CoFlatMap is wait to process the event data
> until we’ve received the corresponding decoration data. How do I make that
> happen?
>
>
I have exactly the same scenario and researched this. Basically, what we
need is
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API,
but unfortunately it seems like nobody actively working on it. You could
check ideas there though.

Side inputs seems to be implemented in Beam API and should be working on
Flink executor, but I didn’t try it for many reasons.

I have ended with following solution:

- in the script staring Job I create Stop file on shared filesystem (HDFS)
- created 2 SourceFunction extending Kafka source
- in source function for “decoration” stream in run method I consume all
records from compacted topic. Here is the tricky part how to identify if
everything is consumed already. I resolved it by reading kafka end offset
directly with kafka admin API and checking if I arrived at this offset.
After waiting a bit to make sure that event is propagated to next operator
I delete Stop file on the shared file system
- in source function for event data, I have implemented “open” method
waiting until Stop file is deleted. This keeps it consuming event data.
- in pipeline I broadcasted “decoration” event and used CoProcessFunction
to store it in state and enrich main event stream.

The application is not in production yet as I need to do more testing, but
it seems to work.

Additionally I tried to cache decorated data in state of source function to
recover from checkpoint easily, but I’m still not sure if it’s better to
read it from compacted topic every time or have additional cache in source
function or state in CoProcessFunction is enough.

Hope this helps and would be interested to hear your experience.

Regards,
Maxim.


Fwd: Initialise side input state

2017-11-03 Thread Maxim Parkachov
Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingc...@gmail.com> wrote:

> Hi Maxim,
>
> if I understand correctly, you actually need to JOIN the fast stream with
> the slow stream. Could you please share more details about your problem?
>

Sure I can explain more, with some example of pseudo-code. I have external
DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in
structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
PurchaseTotal] {

  private var price: ValueState[Int] = _

  override def processElement1... {
out.collect(PurchaseTotal(purchase.productId, purchase.amount *
priceList.value))
  }

  override def processElement2... {
price.update(priceList.value)
  }
}

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will
not be populated with data which is not updated in DB.
BTW, I cannot use AsyncIO to query DB, because of several technical
restrictions.

1. When you mentioned "they have the same key", did you mean all the data
> get the same key or the logic should be applied with fast.key = slow.key?
>

I meant here that productId in purchase event is definitely exist in
external price list DB (so, it is kind of inner join)


> 2. What should be done to initialize the state?
>

I need to read external DB table and populate price ValueState before
processing first purchase event.

Hope this minimal example helps to understand.
Maxim.


>
> Best,
> Xingcan
>
>
> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gop...@gmail.com>
> wrote:
>
>> Hi Flink users,
>>
>> I'm struggling with some basic concept and would appreciate some help. I
>> have 2 Input streams, one is fast event stream and one is slow changing
>> dimension. They have the same key and I use CoProcessFunction to store
>> slow data in state and enrich fast data from this state. Everything
>> works as expected.
>>
>> Before I start processing fast streams on first run, I would like to 
>> completely
>> initialise state. I though it could be done in open(), but I don't
>> understand how it will be re-distributed across parallel operators.
>>
>> Another alternative would be to create custom source and push all slow 
>> dimension
>> data downstream, but I could not find how to hold processing fast data
>> until state is initialised.
>>
>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>> way to implement it now ?
>>
>> Thanks,
>> Maxim.
>>
>>
>


Initialise side input state

2017-11-02 Thread Maxim Parkachov
Hi Flink users,

I'm struggling with some basic concept and would appreciate some help. I
have 2 Input streams, one is fast event stream and one is slow changing
dimension. They have the same key and I use CoProcessFunction to store slow
data in state and enrich fast data from this state. Everything works as
expected.

Before I start processing fast streams on first run, I would like to completely
initialise state. I though it could be done in open(), but I don't
understand how it will be re-distributed across parallel operators.

Another alternative would be to create custom source and push all slow
dimension
data downstream, but I could not find how to hold processing fast data
until state is initialised.

I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
way to implement it now ?

Thanks,
Maxim.


Control triggering on empty window

2016-04-20 Thread Maxim
I have the following use case:

Input stream of timestamped "on" and "off" events received out of order.
I need to produce an event with time that system was "on" every 15 minutes.
Events should be produced only for intervals that system was "on".

When 15 minute window has at least one record it is triggered and the
required aggregate is created, but when no event is received within 15
minute period window is not triggered and nothing is produced.

I understand that it is not feasible to trigger on empty windows when the
set of keys is unbounded. But it would be nice to give the control for such
triggering to a window function. In my case the window function could
enable the empty triggering for the current key when the last event in the
evaluated window is "on" and disable it if is "off".
The strawman API for such feature:

public void apply(String key, TimeWindow window, Iterable
input, Collector out) throws Exception {

...

RuntimeContext context = this.getRuntimeContext();

if (lastEvent.isOn()) {

   context.enableEmptyWindowTriggering();

} else {

   context.disableEmptyWindowTriggering();

}

}

I could implement the same logic using global window and custom trigger and
evictor, but it looks like ugly workaround to me.

Is there any better way to solve this use case?

Thanks,

Maxim.


Task Slots and Heterogeneous Tasks

2016-04-14 Thread Maxim
I'm trying to understand a behavior of Flink in case of heterogeneous
operations. For example in our pipelines some operation might accumulate
large windows while another performs high latency calls to external
services. Obviously the former needs task slot with a large memory
allocation, while the latter needs no memory but a high degree of
parallelism.

Is any way to have different slot types and control allocation of
operations to them? May be is there another way to ensure good hardware
utilization?

Also from the documentation it is not clear if memory of a TaskManager is
shared across all tasks running on it or each task gets its quota. Could
you clarify it?

Thanks,

Maxim.


Re: How to perform this join operation?

2016-04-13 Thread Maxim
You could simulate the Samza approach by having a RichFlatMapFunction over
cogrouped streams that maintains the sliding window in its ListState. As I
understand the drawback is that the list state is not maintained in the
managed memory.
I'm interested to hear about the right way to implement this.

On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy 
wrote:

> I am wondering how you would implement the following function in Flink.
> The function takes as input two streams.  One stream can be viewed a a
> tuple with two value *(x, y)*, the second stream is a stream of
> individual values *z*.  The function keeps a time based window on the
> first input (say, 24 hours).  Whenever it receives an element from the
> second stream, it compares the value *z* against the *x* element of each
> tuple in the window, and for each match it emits *(x, y)*.  You are
> basically doing a join on *x=z*.  Note that values from the second stream
> are not windowed and they are only matched to values from the first stream
> with an earlier timestamps.
>
> This was relatively easy to implement in Samza.  Consume off two topics,
> the first keyed by *x* and the second by *z*.  Consume both topics in a
> job.  Messages with the same key would be consumed by the same task.  The
> task could maintain a window of messages from the first stream in its local
> state,  Whenever a message came in via the second stream, it could look up
> in the local state for matching messages, and if it found any, send them to
> the output stream.  Obviously, with Samza you don't have the luxury of the
> system handling event time for you, but this work well and it is easy to
> implement.
>
> I am not clear how this would be implemented in Flink.
>
> It is easy enough to partition by key either stream, and to window the
> first stream using a sliding window, but from there out things get
> complicated.
>
> You can join the two streams by key, but you must do so using the same
> window for both streams.  That means events from the first stream may be
> matched to older events of the second stream, which is not what we want.  I
> suppose if both included a timestamp, you could later add a filter to
> remove such events from the merged stream.  But you would also have to deal
> with duplicates, as the window is a sliding window and the same two
> elements may match across all window panes that contain the matching
> elements.  So you need to dedup as well.
>
> coGroup seems like it would suffer from the same issues.
>
> Maybe the answer is connected streams, but there is scant documentation on
> the semantics of ConnectedStreams.  There isn't even an example that I
> could find that makes use of them.
>
> Thoughts?
>
>
>
>
>


Re: RocksDB Statebackend

2016-04-12 Thread Maxim
Is it possible to add an option to store the state in the Java HashMap and
write its content to RocksDB when checkpointing? For "hot" keys that are
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would
also be big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen <se...@apache.org> wrote:

> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
> simply does not compact for a long time, thus having a lot of stale data in
> the snapshot.
>
> That would be especially the case, if you have a lot of changing values
> for the same set of keys.
>
> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> I'm going to try and respond to each point:
>>
>> 1. This seems strange, could you give some background on parallelism,
>> number of operators with state and so on? Also, I'm assuming you are using
>> the partitioned state abstraction, i.e. getState(), correct?
>>
>> 2. your observations are pretty much correct. The reason why RocksDB is
>> slower is that the FsStateBackend basically stores the state in a Java
>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>> data in on-disk files and goes to them for every state access (of course
>> there are caches, but generally it is like this). I'm actually impressed
>> that it is still this fast in comparison.
>>
>> 3. see 1. (I think for now)
>>
>> 4. The checkpointing time is the time from the JobManager deciding to
>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>> seen this before and I think it results from back pressure. The problem is
>> that the checkpoint messages that we sent through the topology are sitting
>> at the sources because they are also back pressured by the slow processing
>> of normal records. You should be able to see the actual checkpointing times
>> (both synchronous and asynchronous) in the log files of the task managers,
>> they should be very much lower.
>>
>> I can go into details, I'm just writing this quickly before calling it a
>> day. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>>
>>> Hi everyone,
>>>
>>> my experience with RocksDBStatebackend have left me a little bit
>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>> behaviour ;):
>>>
>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>> RocksDBStatebackend in comparison. In this particular test the state
>>> saved is generally not large (in a production scenario it will be
>>> larger.)
>>>
>>> These are my observations:
>>>
>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>> to <<1MB with the FSStatebackend.
>>>
>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>
>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>>> gets smaller for very large state. Can you confirm?
>>>
>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>> reported time correspond to the sync. + asynchronous part of the
>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>> synchronous part takes?
>>>
>>> Form these first observations RocksDB does seem to bring a large
>>> overhead for state < 1GB, I guess? Is this expected?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>
>