[ 
https://issues.apache.org/jira/browse/BEAM-6289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730412#comment-16730412
 ] 

Shahar Frank commented on BEAM-6289:
------------------------------------

Thanks for the info.

I will give these a try later on.

Just a question re. "Of course this wouldn't happen with a remote Flink 
cluster." - The issue was first detected on a remote Flink cluster - I will 
recheck it with this demo code - could this then be some other issue then?

And can you please tell me what the akka.ask.timeout value which is currently 
being used? Is it very short? Because this entire code fail within a few 
seconds - well below 10 seconds - so it's not that long - is it?

> Running a join on two Cassandra tables using FlinkRunner fails
> --------------------------------------------------------------
>
>                 Key: BEAM-6289
>                 URL: https://issues.apache.org/jira/browse/BEAM-6289
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-cassandra, runner-flink
>    Affects Versions: 2.8.0, 2.9.0
>         Environment: Tested on Ubuntu 18
> Beam 2.8
> Tested with Flink:
> 1) [local]
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
> Tested using Cassandra [cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | 
> Native protocol v4]:
> 1) In a local container
> 2) Cluster inside a K8S cluster on minikube
> 3) Cluster inside a K8S cluster on GCP
>            Reporter: Shahar Frank
>            Assignee: Maximilian Michels
>            Priority: Critical
>              Labels: FlinkRunner, beam, bug, cassandra, flink, join
>         Attachments: direct_runner_build.log, flink_runner_build.log
>
>
> Can't make a simple join on two Cassandra tables when using FlinkRunner.
> The same code works with a DirectRunner fails when used with FlinkRunner 
> giving these (as well as many other) errors:
> {code:java}
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatchere1f5abe7-6299-43ea-9182-24a2193e078f#-1757043920]]
>  after [10000 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>     at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>     at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>     at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>     at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>     at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>     at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>     at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The code can be found [here|https://github.com/srfrnk/beam-playground]
> Steps to reproduce:
>  # Clone the repo to a linux (I;m on Ubuntu 18 but any *nix system would 
> probably work - i.e. repl.it)
>  # Follow the README to set up a Cassandra container + schema
>  # Run with 
> {code}
> gradle --console=plain join-from-cassandra -Drunner=flink > output/build.log 
> 2>&1{code}
> to use FlinkRunner. See error in log at ./output/build.log
>  # Run with 
> {code}
> gradle --console=plain join-from-cassandra -Drunner=direct > output/build.log 
> 2>&1{code}
> to use DirectRunner. See error in log at ./output/build.log



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to