Robert,

Thanks for your thoughtful response.


  1.  I understand your concern. User code is not guaranteed to respond to 
thread interrupts. So no matter what you do, you may end up with a stuck 
thread. But I think we can improve the user experience. First, we can update 
the documentation to make it clear that the main() method will be executed 
during job submission, and that jobs should make an effort to avoid doing 
time-consuming work in that main method. Second, I still think it's in your 
best interest to decouple the job submission thread from the HTTP thread. That 
would un-hide the problem, because the end-user could see that their job 
request has been started but is not making it past a certain point (maybe it's 
in one phase/status before main() executes, and in a different status once 
main() completes). Also, it would be obvious if they have made (and failed or 
aborted) multiple job submission API requests that those requests are still 
occupying threads. Right now, it's impossible to tell what has happened to the 
request or whether it is occupying a thread without relying on log output 
(which took us a while to get right in AWS EMR YARN) or a stack dump. Ideally, 
the UI should be able to list all the threads that are currently working on job 
submission.
  2.  I see, the main method will execute on the Application Master, right? I 
created https://issues.apache.org/jira/browse/FLINK-4069 Unfortunately, I don't 
understand very well how Kafka brokers & clients cooperate to make sure that 
partitions are distributed across consumers that share a group id (is there 
documentation about that somewhere?)… Also, I'm not sure how Flink deals with 
repartitioning.

-Shannon

From: Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>>
Date: Thursday, June 2, 2016 at 4:19 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: API request to submit job takes over 1hr

Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user 
list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would 
probably need to interrupt the submitting thread after a while, which does not 
always work (we made the experience that Kafka and Hadoop for example often 
ignore interrupts, or even worse gets stuck afterwards). This would just hide 
the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka 
consumer is querying the brokers for metadata from the constructor (= on the 
client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for 
this. If you have also time to work on this, you can of course also open a pull 
request. Otherwise, some contributors from the Flink community can take care of 
the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the 
brokers b) send the same list of partitions (in the same order) to all parallel 
consumers to do a fixed partition assignments (also across restarts). When we 
do the querying in the open() method, we need to make sure that all partitions 
are assigned, without duplicates (also after restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes 
job submission to take over (flink.get-partitions.retry * tries by 
SimpleConsumer * socket.timeout.ms<http://socket.timeout.ms> * # of Kafka 
brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I 
have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum 
idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal 
length of time most people expect an HTTP request to take. During these 108 
minutes and after, aside from examining logs & stack traces, it is not possible 
to determine what is happening with regard to the run job request. It simply 
appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me 
know if I should move this discussion to the "devs" list: I am not a member 
there yet. I am happy to submit JIRAs and I would be able to submit a Pull 
Request for the change to FlinkKafkaConsumer08 (and 09) initialization as 
suggested below.

  1.  JarRunHandler is provided with a timeout value, but that timeout value is 
ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" 
requests to take arbitrary amounts of time during which the status of the 
request and the job is unclear. Depending on the semantics of the work that 
method does, perhaps it could be made asynchronous with a timeout?
  2.  FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's 
constructor) performs network interaction & retries that can take a long time, 
and the constructor is in the execution path beneath 
getJobGraphAndClassLoader() via the main() method of the submitted Flink job. 
It is not necessary to do that work (retrieving Kafka partition info) in the 
constructor. Instead, that work should occur when the job is asked to start, 
either by overriding the AbstractRichFunction#open() method or by adding it to 
the top of the run() method. Alternatively, though not any better, the 
signature of StreamExecutionEnvironment#addSource() could be changed to take 
some kind of Factory<SourceFunction> so that instantiation is deferred until 
necessary.

"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 
nid=0x167d runnable [0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at 
com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <shannon.ca...@orbitz.com<mailto:shannon.ca...@orbitz.com>> 
on behalf of Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test 
environment, and everything is working fine. However,  I am unable to submit 
jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run 
the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to 
complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is 
the case even if the ELB timeout is set to 1hr: the request returns 504 after 
1hr. The request appears to fail server-side, also, since no jobs have ever 
showed up in the UI as being in any status (successful/failed/completed or 
otherwise). Shortly after the request is made, it is interesting to note that 
sometimes (but not always), other requests by the UI to the API begin to take 
longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear 
healthy.

Does anyone have ideas about what the problem might be? Or ideas about 
troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the 
Flink Job Manager? It appears to be using only ~570MB but I am not sure if the 
Job Manager might be misbehaving due to resource constraints. The prod cluster 
is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 
10496, Job Manager memory is set to 1024, and there are 8 slots set in the 
yarn-session.sh command. Are there any guidelines for memory allocation for the 
Job Manager?

Thanks very much!
Shannon Carey

Reply via email to