Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-24 Thread zhijiang
Thanks for opening this ticket and I would watch it.

Flink does not handle OOM issue specially. I remembered we ever discussed the 
similar issue before but forgot the conclusion then or have other concerns for 
it.
I am not sure whether it is worth to fix atm, maybe Till or Chesnay could give 
a final decision.

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月25日(星期二) 11:10
To:zhijiang 
Cc:Chesnay Schepler ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

Hi Zhijiang

Thank you for your analysis. I agree with it. The solution may be to let tm 
exit like you mentioned when any type of oom occurs, because the flink has no 
control on a tm when a oom occurs.

I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.

Don't know it is worth to fix.

Thank you all.

Yours sincerely
Joshua
On Fri, Jun 21, 2019 at 5:32 PM zhijiang  wrote:
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called 

Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-24 Thread Joshua Fan
Hi Zhijiang

Thank you for your analysis. I agree with it. The solution may be to let tm
exit like you mentioned when any type of oom occurs, because the flink has
no control on a tm when a oom occurs.

I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.

Don't know it is worth to fix.

Thank you all.

Yours sincerely
Joshua

On Fri, Jun 21, 2019 at 5:32 PM zhijiang  wrote:

> Thanks for the reminding @Chesnay Schepler .
>
> I just looked throught the related logs. Actually all the five
> "Source: ServiceLog" tasks are not in terminal state on JM view, the
> relevant processes are as follows:
>
> 1. The checkpoint in task causes OOM issue which would call
> `Task#failExternally` as a result, we could see the log "Attempting to
> fail task externally" in tm.
> 2. The source task would transform state from RUNNING to FAILED and then
> starts a canceler thread for canceling task, we could see log "Triggering
> cancellation of task" in tm.
> 3. When JM starts to cancel the source tasks, the rpc call
> `Task#cancelExecution` would find the task was already in FAILED state as
> above step 2, we could see log "Attempting to cancel task" in tm.
>
> At last all the five source tasks are not in terminal states from jm log,
> I guess the step 2 might not create canceler thread successfully, because
> the root failover was caused by OOM during creating native thread in step1,
> so it might exist possibilities that createing canceler thread is not
> successful as well in OOM case which is unstable. If so, the source task
> would not been interrupted at all, then it would not report to JM as well,
> but the state is already changed to FAILED before.
>
> For the other vertex tasks, it does not trigger `Task#failExternally` in
> step 1, and only receives the cancel rpc from JM in step 3. And I guess at
> this time later than the source period, the canceler thread could be
> created succesfully after some GCs, then these tasks could be canceled as
> reported to JM side.
>
> I think the key problem is under OOM case some behaviors are not within
> expectations, so it might bring problems. Maybe we should handle OOM error
> in extreme way like making TM exit to solve the potential issue.
>
> Best,
> Zhijiang
>
> --
> From:Chesnay Schepler 
> Send Time:2019年6月21日(星期五) 16:34
> To:zhijiang ; Joshua Fan <
> joshuafat...@gmail.com>
> Cc:user ; Till Rohrmann 
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> The logs are attached to the initial mail.
>
> Echoing my thoughts from earlier: from the logs it looks as if the TM
> never even submits the terminal state RPC calls for several tasks to the JM.
>
> On 21/06/2019 10:30, zhijiang wrote:
> Hi Joshua,
>
> If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really
> in CANCELED state on TM side, but in CANCELING state on JM side, then it
> might indicates the terminal state RPC was not received by JM. I am not
> sure whether the OOM would cause this issue happen resulting in unexpected
> behavior.
>
> In addition, you mentioned these tasks are still active after OOM and was
> called to cancel, so I am not sure what is the specific periods for your
> attached TM stack. I think it might provide help if you could provide
> corresponding TM log and JM log.
> From TM log it is easy to check the task final state.
>
> Best,
> Zhijiang
> --
> From:Joshua Fan  
> Send Time:2019年6月20日(星期四) 11:55
> To:zhijiang  
> Cc:user  ; Till Rohrmann
>  ; Chesnay Schepler
>  
> Subject:Re: Maybe a flink bug. Job keeps in FAILING state
>
> zhijiang
>
> I did not capture the job ui, the topology is in FAILING state, but the
> persistentbolt subtasks as can be seen in the picture attached in first
> mail was all canceled, and the parsebolt subtasks as described before had
> one subtask FAILED, other subtasks CANCELED, but the source subtasks had
> one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask
> 2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.
>
> The subtask status described above is in jm view, but in tm view, all of
> the source subtask was in FAILED, do not know why jm was not notify about
> this.
>
> As all of the failed status was triggered by a oom by the subtask can not
> create native thread when checkpointing, I also dumped the stack of the
> jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask
> 5/5) are still active after it throwed a oom and was called to cancel . I
> attached the jstack file in this email.
>
> Yours sincerely
> Joshua
>
> On Wed, Jun 19, 2019 at 4:40 PM zhijiang 
> wrote:
> As long as one task is in canceling state, then the job status might be
> still in canceling state.
>
> @Joshua Do you confirm all of the tasks in topology were already in
> terminal state such as failed or canceled?
>
> Best,
> Zhijiang
> 

Re: Building some specific modules in flink

2019-06-24 Thread Yun Tang
Hi Syed

You could use 'mvn clean package -pl :flink-streaming-java_2.11 -DskipTests 
-am' to build flink-streaming-java and flink-runtime modules. If the 'already 
built binary' means the flink-dist-*.jar package, the former mvn command would 
not update the dist jar package. As far as I know, a quick solution is using 
`jar uf` [1] command to update the dist jar package with your changed classes. 
Otherwise, you need to build flink-dist module from scratch.

[1] https://docs.oracle.com/javase/tutorial/deployment/jar/update.html

Best
Yun Tang


From: syed 
Sent: Tuesday, June 25, 2019 9:14
To: user@flink.apache.org
Subject: Building some specific modules in flink

Hi;
I am trying to modify some core functionalities of flink for my through
understanding about flink.  I already build the flink from source, now I am
looking to build only a few modules which I have modified. Is this possible,
or every time I have to build the flink in full (all modules). As it takes
me about 30-35 minutes to build the flink in full.

Specifically, I have modified some classes in *flink-streaming-java* and
*flink-runtime* modules. I am looking to build only these two modules and
integrate these into already build flink (all modules). I alrady tried using
–pl option using mvn, it installs these modules but changes are not updated
in already build binaries.
Please guide me how can I do this.
Kind regards;
syed




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Building some specific modules in flink

2019-06-24 Thread Jeff Zhang
You need to specify flink-dist in -pl. Module flink-dist will build the
flink binary distribution.

syed  于2019年6月25日周二 上午9:14写道:

> Hi;
> I am trying to modify some core functionalities of flink for my through
> understanding about flink.  I already build the flink from source, now I am
> looking to build only a few modules which I have modified. Is this
> possible,
> or every time I have to build the flink in full (all modules). As it takes
> me about 30-35 minutes to build the flink in full.
>
> Specifically, I have modified some classes in *flink-streaming-java* and
> *flink-runtime* modules. I am looking to build only these two modules and
> integrate these into already build flink (all modules). I alrady tried
> using
> –pl option using mvn, it installs these modules but changes are not updated
> in already build binaries.
> Please guide me how can I do this.
> Kind regards;
> syed
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Error checkpointing to S3 like FS (EMC ECS)

2019-06-24 Thread Vishwas Siravara
Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object
storage EMC ECS(
https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not
all S3 apis are supported by EMC ESC according to this document.  Here
is my config

s3.endpoint: SU73ECSG1P1d.***.COM
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

I can access this bucket from s3cmd client.

I set the state backend from my scala application
env.setStateBackend(new FsStateBackend("s3://aip-featuretoolkit/checkpoints/"))

However when I run my application I get this exception :

ClientException: Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1336 at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
   1337 at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
   1338 at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
   1339 at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
   1340 at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1173)
   1341 at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1153)
   1342 at
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
   1343 at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
   1344 ... 10 more
   1345 Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException:
doesBucketExist on aip-featuretoolkit:
org.apache.flink.fs.s3base.shaded.co
m.amazonaws.SdkClientException: Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1346 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
   1347 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
   1348 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
   1349 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
   1350 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
   1351 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
   1352 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:372)
   1353 at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:308)
   1354 at
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
   1355 at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
   1356 at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
   1357 at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
   1358 at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
   1359 at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
   1360 at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
   1361 ... 17 more
   1362 Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to execute HTTP request:
aip-featuretoolkit.SU73ECSG1P1d.VISA.COM
   1363 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
   1364 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
   1365 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
   1366 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
   1367 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
   1368 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
   1369 at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
   1370 at
org.apache.flink.fs

Building some specific modules in flink

2019-06-24 Thread syed
Hi;
I am trying to modify some core functionalities of flink for my through
understanding about flink.  I already build the flink from source, now I am
looking to build only a few modules which I have modified. Is this possible,
or every time I have to build the flink in full (all modules). As it takes
me about 30-35 minutes to build the flink in full.

Specifically, I have modified some classes in *flink-streaming-java* and 
*flink-runtime* modules. I am looking to build only these two modules and
integrate these into already build flink (all modules). I alrady tried using
–pl option using mvn, it installs these modules but changes are not updated
in already build binaries.
Please guide me how can I do this.
Kind regards;
syed




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unable to set S3 like object storage for state backend.

2019-06-24 Thread Vishwas Siravara
Hi Ken,
Thanks for reaching out, I created a compliant bucket with name
aip-featuretoolkit. I now get the exception "Unable to execute HTTP
request: aip-featuretoolkit.SU73ECSG1P1d.***.COM: Name or service not
known" from 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.class
 line 56. Here is my config from flink-conf.yaml file.

s3.endpoint: SU73ECSG1P1d.***.COM
s3.access-key: vdna_np_user
security.ssl.rest.enabled: false
web.timeout: 1
s3.secret-key: J***

I have not supplied the port in the config file. Does it internally
use 9021 ? Also I am running my application as a different user not
what is specified in s3.access-key. Does that matter ?

Thanks,
Vishwas

On Thu, Jun 20, 2019 at 5:06 PM Ken Krugler  wrote:
>
> Hi Vishwas,
>
> It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as 
> AWS no longer allows bucket names to contain an underscore.
>
> I’m guessing that the Hadoop S3 code is trying to treat your path as a valid 
> URI, but the bucket name doesn’t conform, and thus you get the "null uri 
> host” issue.
>
> Could you try with a compliant bucket name?
>
> — Ken
>
> On Jun 20, 2019, at 2:46 PM, Vishwas Siravara  wrote:
>
> Hi,
> I am using flink version 1.7.2 , I am trying to use S3 like object
> storage EMC ECS(
> https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) .
>
> I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for
> s3 filesystem and I have placed it under the lib folder and is
> available to flink in its class path.
>
> My flink-conf.yaml looks like this :
>
> s3.endpoint: SU73ECSG1P1d.***.COM:9021
> s3.access-key: vdna_np_user
> security.ssl.rest.enabled: false
> web.timeout: 1
> s3.secret-key: J***
>
> And my code for statebackend is like this :
>
> env.setStateBackend(new 
> FsStateBackend("s3://aip_featuretoolkit/checkpoints/"))
>
> I have a bucket called aip_featuretoolkit in my s3 instance. I can
> connect to s3 form s3 command line utilities. However I cannot
> checkpoint with this configuration in flink. I get the following error
> message
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not
> retrieve JobResult.
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62)
> at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19)
> at com.visa.flink.cli.Main$.main(Main.scala:22)
> at com.visa.flink.cli.Main.main(Main.scala)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit job.
> at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set
> up JobManager
> at 
> org.apache.flink.util.function.CheckedSupplier.lamb

unsubscribe

2019-06-24 Thread Anton Hughes



Process Function's timers "postponing"

2019-06-24 Thread Andrea Spina
Dear Community,
I am using Flink (processing-time) timers along with a Process Function.
What I would like to do is to "postpone" eventually registered timers for
the given key: I would like to do it since I might process plenty of events
in a row (think about it as a session) so that I will able to trigger the
computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know
the previous timer triggering time, which I guess is not possible for me
since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to
me; for instance, I don't understand what "Since Flink maintains only one
timer per key and timestamp...". Does this imply that a new PT timer will
automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Linkage Error RocksDB and flink-1.6.4

2019-06-24 Thread Andrea Spina
Hi Shu Su,
the first point exactly pinpointed the issue I bumped into. I forgot to put
that dependency to "provided". Thank you!

Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su  ha
scritto:

> Hi Andrea
>
> Actually It’s caused by Flink’s ClassLoader. It’s because flink use
> parent Classloader to load jar first and then you use it in your user’s
> code, then user-code classloader will load it again so it raised the error.
> There are two solutions.
> 1.  Add scope “provided” to maven pom.xml
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> ${flink_version}
> *provided*
> 
> 2. Set this classloader.resolve-order: parent-first in flink-conf.yml
>
> Hope this will help you.
>
> Thanks,
> Simon
> On 06/24/2019 11:27,Yun Tang  wrote:
>
> Hi Andrea
>
> Since I have not written Scala for a while, I wonder why you need to
> instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions
> on JM side. As far as I can see, you could instantiate your on your TM side
> like code:
>
> val rocksdbConfig = new OptionsFactory() {
>   override def createDBOptions(currentOptions: DBOptions): DBOptions =
>  currentOptions.setIncreaseParallelism(properties.threadNo)
>
>   override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
> ColumnFamilyOptions =
>  
> currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
> }
>
> You just need to serialize the properties via closure to TMs. Hope this could 
> help you.
>
> Best
> Yun Tang
> --
> *From:* Andrea Spina 
> *Sent:* Monday, June 24, 2019 2:20
> *To:* user
> *Subject:* Linkage Error RocksDB and flink-1.6.4
>
> Dear community,
> I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
> At the job Startp the following exception happens (it's recorded by the Job
> Manager).
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> previously initiated loading for a different type with name
> "org/rocksdb/DBOptions" at
> java.lang.ClassLoader.defineClass1(Native Method) at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at
> java.security.AccessController.doPrivileged(Native Method) at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*
>
> For this job, I programmatically set some RocksDB options by using the
> code appended below. Anybody can help with this? Thank you so much,
> Andrea
>
>
> import org.apache.flink.configuration.MemorySize
> import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
> PredefinedOptions, RocksDBStateBackend}
> import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
>
> object ConfigurableRocksDB {
>
>   lazy val columnOptions = new ColumnFamilyOptions() with Serializable
>   lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
>   lazy val dbOptions = new DBOptions() with Serializable
>
>   def configureStateBackendRocksDB(properties: FlinkDeployment): 
> RocksDBStateBackend = {
> properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
>
> properties.blockSize.foreach(bs => 
> tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
> properties.cacheSize.foreach(cs => 
> tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
> properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
> tableConfig.cacheIndexAndFilterBlocks())
> properties.writeBufferSize.foreach(wbs => 
> columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
>
> columnOptions.setTableFormatConfig(tableConfig)
> properties.writeBufferToMerge.foreach(bm => 
> columnOptions.setMinWriteBufferNumberToMerge(bm))
> properties.writeBufferCount.foreach(bc => 
> columnOptions.setMaxWriteBufferNumber(bc))
> properties.optimizeFilterForHits.foreach(op => if (op) 
> columnOptions.optimizeFiltersForHits())
>
> val rocksdbConfig = new OptionsFactory() {
>   override def createDBOptions(currentOptions: DBOptions): DBOptions  
>= dbOptions
>   override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
> ColumnFamilyOptions = columnOptions
> }
>
> val stateBE =
>   new RocksDBStateBackend(properties.checkpo

Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread xwang355
I posted my related observation here in a separated thread.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-making-synchronize-call-might-choke-the-whole-pipeline-tc28383.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread xwang355
private static void doWork(long tid) throws InterruptedException
{   
if (!sortedTid.contains(tid)) {
sortedTid.add(tid);
}

   // simulate a straggler, make the thread with the lowest tid a 
slow
processor
if (sortedTid.first() == tid) {
if (counter++ == 1000){
Thread.sleep(60,000);
}

Thread.sleep(20);
} else {
Thread.sleep(20);
}
}

Just for testing purpose, the thread with the lowest tid sleeps 60s when the
counter reaches 1000. Will 'sleep' causes any issues?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread Fabian Hueske
Hi,

What kind of function do you use to implement the operator that has the
blocking call?
Did you have a look at the AsyncIO operator? It was designed for exactly
such use cases.
It issues multiple asynchronous requests to an external service and waits
for the response.

Best, Fabian

Am Mo., 24. Juni 2019 um 17:01 Uhr schrieb xwang355 :

> Fabian,
>
> Does the above stack trace looks like a deadlock?
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
>
> - locked <0x0007baf84040> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread xwang355
Fabian,

Does the above stack trace looks like a deadlock?

at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
 
- locked <0x0007baf84040> (a java.util.ArrayDeque) 
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread xwang355
Fabian,

Thank you for replying.

If I understand your previous comment correctly, I setup up a consumer with
parallelism 1 and connect a worker task with parallelism 2. 

If worker thread one is making a block call and stuck for 60s, the consumer
thread should continue fetching from the partition and feeding thread two.
>From my reading of Flink documentation, if checkpointing is enabled, the
consumer should commit its own internal state back to Kafka to show progress
to external monitoring tool. 

If that`s the case, during the 60s when thread one is stuck, checkpoint
should all succeed, thread two continuing chucking along merrily. Even
though the highest offset committed is the one less than the offset hold by
thread 1. After 60s, I should see a huge jump from the monitoring tool due
to the fact the thread 1 has released the offset and all offsets consumed by
thread 2 during the 60s can be committed.

However, what I have observed is that the as soon as thread one get stuck,
checkpointing is choked, consumer thread stopped feeding thread two and the
whole pipeline became stagnant.

Could you please help me understand this behavior.

Thanks again.
Ben 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unexpected behavior from interval join in Flink

2019-06-24 Thread Fabian Hueske
Ah, that's great!
Thanks for letting us know :-)

Am Mo., 24. Juni 2019 um 11:33 Uhr schrieb Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hi Fabian,
>
> Thanks for your reply. I managed to resolve this issue. Actually this
> behavior was not so unexpected, I messed up using xStream as a 'base' while
> I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <=
> xStream.element <= yStream.element + 30 min. Interchanging both datastreams
> fixed this issue.
>
> Thanks anyways.
>
> Cheers, Wouter
>
>
>
> Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske :
>
>> Hi Wouter,
>>
>> Not sure what is going wrong there, but something that you could try is
>> to use a custom watemark assigner and always return a watermark of 0.
>> When the source finished serving the watermarks, it emits a final
>> Long.MAX_VALUE watermark.
>> Hence the join should consume all events and store them in state. When
>> both sources are finished, it would start to join the data and clean up the
>> state.
>> This test would show if there are any issue with late data.
>>
>> Best, Fabian
>>
>> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>>
>>> Anyone some leads on this issue? Have been looking into the
>>> IntervalJoinOperator code, but that didn't really help. My intuition is
>>> that it is rejected because of lateness, however that still confuses me
>>> since I'm sure that both datastreams have monotonic increasing timestamps.
>>>
>>> Thx, Wouter
>>>
>>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
>>> w.d.zorgdra...@tudelft.nl>:
>>>
 Hi all,

 I'm experiencing some unexpected behavior using an interval join in
 Flink.
 I'm dealing with two data sets, lets call them X and Y. They are finite
 (10k elements) but I interpret them as a DataStream. The data needs to be
 joined for enrichment purposes. I use event time and I know (because I
 generated the data myself) that the timestamp of an element Y is always
 between -60 minutes and +30 minutes of the element with the same key in set
 X. Both datasets are in-order (in terms of timestamps), equal in size,
 share a common key and parallelism is set to 1 throughout the whole 
 program.

 The code to join looks something like this:

 xStream
   .assignAscendingTimestamps(_.date.getTime)
   .keyBy(_.commonKey)
   .intervalJoin(
 yStream
   .assignAscendingTimestamps(_.date.getTime)
   .keyBy(_.commonKey))
   .between(Time.minutes(-60), Time.minutes(30))
   .process(new ProcessJoinFunction[X, Y, String] {
 override def processElement(
 left: X,
 right: Y,
 ctx: ProcessJoinFunction[X, Y, String]#Context,
 out: Collector[String]): Unit = {

   out.collect(left + ":" + right)
 }


 However, about 30% percent of the data is not joined. Is there a proper
 way to debug this? For instance, in windows you can side-output late data.
 Is there a possibility to side-output unjoinable data?

 Thx a lot,
 Wouter





Re: Unexpected behavior from interval join in Flink

2019-06-24 Thread Wouter Zorgdrager
Hi Fabian,

Thanks for your reply. I managed to resolve this issue. Actually this
behavior was not so unexpected, I messed up using xStream as a 'base' while
I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <=
xStream.element <= yStream.element + 30 min. Interchanging both datastreams
fixed this issue.

Thanks anyways.

Cheers, Wouter



Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske :

> Hi Wouter,
>
> Not sure what is going wrong there, but something that you could try is to
> use a custom watemark assigner and always return a watermark of 0.
> When the source finished serving the watermarks, it emits a final
> Long.MAX_VALUE watermark.
> Hence the join should consume all events and store them in state. When
> both sources are finished, it would start to join the data and clean up the
> state.
> This test would show if there are any issue with late data.
>
> Best, Fabian
>
> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
>
>> Anyone some leads on this issue? Have been looking into the
>> IntervalJoinOperator code, but that didn't really help. My intuition is
>> that it is rejected because of lateness, however that still confuses me
>> since I'm sure that both datastreams have monotonic increasing timestamps.
>>
>> Thx, Wouter
>>
>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>>
>>> Hi all,
>>>
>>> I'm experiencing some unexpected behavior using an interval join in
>>> Flink.
>>> I'm dealing with two data sets, lets call them X and Y. They are finite
>>> (10k elements) but I interpret them as a DataStream. The data needs to be
>>> joined for enrichment purposes. I use event time and I know (because I
>>> generated the data myself) that the timestamp of an element Y is always
>>> between -60 minutes and +30 minutes of the element with the same key in set
>>> X. Both datasets are in-order (in terms of timestamps), equal in size,
>>> share a common key and parallelism is set to 1 throughout the whole program.
>>>
>>> The code to join looks something like this:
>>>
>>> xStream
>>>   .assignAscendingTimestamps(_.date.getTime)
>>>   .keyBy(_.commonKey)
>>>   .intervalJoin(
>>> yStream
>>>   .assignAscendingTimestamps(_.date.getTime)
>>>   .keyBy(_.commonKey))
>>>   .between(Time.minutes(-60), Time.minutes(30))
>>>   .process(new ProcessJoinFunction[X, Y, String] {
>>> override def processElement(
>>> left: X,
>>> right: Y,
>>> ctx: ProcessJoinFunction[X, Y, String]#Context,
>>> out: Collector[String]): Unit = {
>>>
>>>   out.collect(left + ":" + right)
>>> }
>>>
>>>
>>> However, about 30% percent of the data is not joined. Is there a proper
>>> way to debug this? For instance, in windows you can side-output late data.
>>> Is there a possibility to side-output unjoinable data?
>>>
>>> Thx a lot,
>>> Wouter
>>>
>>>
>>>


Re: Unexpected behavior from interval join in Flink

2019-06-24 Thread Fabian Hueske
Hi Wouter,

Not sure what is going wrong there, but something that you could try is to
use a custom watemark assigner and always return a watermark of 0.
When the source finished serving the watermarks, it emits a final
Long.MAX_VALUE watermark.
Hence the join should consume all events and store them in state. When both
sources are finished, it would start to join the data and clean up the
state.
This test would show if there are any issue with late data.

Best, Fabian

Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Anyone some leads on this issue? Have been looking into the
> IntervalJoinOperator code, but that didn't really help. My intuition is
> that it is rejected because of lateness, however that still confuses me
> since I'm sure that both datastreams have monotonic increasing timestamps.
>
> Thx, Wouter
>
> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
>
>> Hi all,
>>
>> I'm experiencing some unexpected behavior using an interval join in Flink.
>> I'm dealing with two data sets, lets call them X and Y. They are finite
>> (10k elements) but I interpret them as a DataStream. The data needs to be
>> joined for enrichment purposes. I use event time and I know (because I
>> generated the data myself) that the timestamp of an element Y is always
>> between -60 minutes and +30 minutes of the element with the same key in set
>> X. Both datasets are in-order (in terms of timestamps), equal in size,
>> share a common key and parallelism is set to 1 throughout the whole program.
>>
>> The code to join looks something like this:
>>
>> xStream
>>   .assignAscendingTimestamps(_.date.getTime)
>>   .keyBy(_.commonKey)
>>   .intervalJoin(
>> yStream
>>   .assignAscendingTimestamps(_.date.getTime)
>>   .keyBy(_.commonKey))
>>   .between(Time.minutes(-60), Time.minutes(30))
>>   .process(new ProcessJoinFunction[X, Y, String] {
>> override def processElement(
>> left: X,
>> right: Y,
>> ctx: ProcessJoinFunction[X, Y, String]#Context,
>> out: Collector[String]): Unit = {
>>
>>   out.collect(left + ":" + right)
>> }
>>
>>
>> However, about 30% percent of the data is not joined. Is there a proper
>> way to debug this? For instance, in windows you can side-output late data.
>> Is there a possibility to side-output unjoinable data?
>>
>> Thx a lot,
>> Wouter
>>
>>
>>


Re: Flink Kafka consumer with low latency requirement

2019-06-24 Thread Fabian Hueske
Hi Ben,

Flink's Kafka consumers track their progress independent of any worker.
They keep track of the reading offset for themselves (committing progress
to Kafka is optional and only necessary to have progress monitoring in
Kafka's metrics).
As soon as a consumer reads and forwards an event, it is considered to be
read. This means, the progress of the downstream worker does not influence
the progress tracking at all.

In case of a topic with a single partition, you can use a consumer with
parallelism 1 and connect a worker task with a higher parallelism to it.
The single consumer task will send the read events round-robin to the
worker tasks.

Best, Fabian

Am Fr., 21. Juni 2019 um 05:48 Uhr schrieb wang xuchen :

>
> Dear Flink experts,
>
> I am experimenting Flink for a use case where there is a tight latency
> requirements.
>
> A stackoverflow article suggests that I can use setParallism(n) to process
> a Kafka partition in a multi-threaded way. My understanding is there is
> still one kafka consumer per partition, but by using setParallelism, I can
> spin up multiple worker threads to process the messages read from the
> consumer.
>
> And according to Fabian`s comments in this link:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
> Flink is able to manage the offset correctly (commit in the right order).
>
> Here is my questions, let`s say there is a Kafka topic with only one
> partition, and I setup a consumer with setParallism(2). Hypothetically,
> worker threads call out to a REST service which may get slow or stuck
> periodically. If I want to make sure that the consumer overall is making
> progress even in face of a 'slow woker'. In other words, I`d like to have
> multiple pending but uncommitted offsets by the fast worker even when the
> other worker is stuck. Is there such a knob  to tune in Flink?
>
> From my own experiment, I use Kafka consume group tool to to monitor the
> offset lag,  soon as one worker thread is stuck, the other cannot make any
> progress either. I really want the fast worker still progress to certain
> extend. For this use case, exactly once processing is not required.
>
> Thanks for helping.
> Ben
>
>
>