It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes 
job submission to take over (flink.get-partitions.retry * tries by 
SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of 
Kafka brokers)) seconds. In my case, since I have 36 Kafka brokers, it took 
over 108 minutes. This is beyond the maximum idle connection timeout of an AWS 
ELB of 60 minutes, and beyond the normal length of time most people expect an 
HTTP request to take. During these 108 minutes and after, aside from examining 
logs & stack traces, it is not possible to determine what is happening with 
regard to the run job request. It simply appears to hang and then fail, 
typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me 
know if I should move this discussion to the "devs" list: I am not a member 
there yet. I am happy to submit JIRAs and I would be able to submit a Pull 
Request for the change to FlinkKafkaConsumer08 (and 09) initialization as 
suggested below.

  1.  JarRunHandler is provided with a timeout value, but that timeout value is 
ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" 
requests to take arbitrary amounts of time during which the status of the 
request and the job is unclear. Depending on the semantics of the work that 
method does, perhaps it could be made asynchronous with a timeout?
  2.  FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's 
constructor) performs network interaction & retries that can take a long time, 
and the constructor is in the execution path beneath 
getJobGraphAndClassLoader() via the main() method of the submitted Flink job. 
It is not necessary to do that work (retrieving Kafka partition info) in the 
constructor. Instead, that work should occur when the job is asked to start, 
either by overriding the AbstractRichFunction#open() method or by adding it to 
the top of the run() method. Alternatively, though not any better, the 
signature of StreamExecutionEnvironment#addSource() could be changed to take 
some kind of Factory<SourceFunction> so that instantiation is deferred until 
necessary.

"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 
nid=0x167d runnable [0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at 
com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <shannon.ca...@orbitz.com<mailto:shannon.ca...@orbitz.com>> 
on behalf of Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test 
environment, and everything is working fine. However,  I am unable to submit 
jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run 
the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to 
complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is 
the case even if the ELB timeout is set to 1hr: the request returns 504 after 
1hr. The request appears to fail server-side, also, since no jobs have ever 
showed up in the UI as being in any status (successful/failed/completed or 
otherwise). Shortly after the request is made, it is interesting to note that 
sometimes (but not always), other requests by the UI to the API begin to take 
longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear 
healthy.

Does anyone have ideas about what the problem might be? Or ideas about 
troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the 
Flink Job Manager? It appears to be using only ~570MB but I am not sure if the 
Job Manager might be misbehaving due to resource constraints. The prod cluster 
is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 
10496, Job Manager memory is set to 1024, and there are 8 slots set in the 
yarn-session.sh command. Are there any guidelines for memory allocation for the 
Job Manager?

Thanks very much!
Shannon Carey

Reply via email to