Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the
user list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We
would probably need to interrupt the submitting thread after a while, which
does not always work (we made the experience that Kafka and Hadoop for
example often ignore interrupts, or even worse gets stuck afterwards). This
would just hide the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka
consumer is querying the brokers for metadata from the constructor (= on
the client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA
for this. If you have also time to work on this, you can of course also
open a pull request. Otherwise, some contributors from the Flink community
can take care of the implementation.
The main reason why we do the querying centrally is: a) avoid overloading
the brokers b) send the same list of partitions (in the same order) to all
parallel consumers to do a fixed partition assignments (also across
restarts). When we do the querying in the open() method, we need to make
sure that all partitions are assigned, without duplicates (also after
restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey <sca...@expedia.com> wrote:

> It looks like the problem is due to the stack trace below.
>
> Simply put, connection failure to Kafka when using the default settings
> causes job submission to take over (flink.get-partitions.retry * tries by
> SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 *
> (# of Kafka brokers)) seconds. In my case, since I have 36 Kafka brokers,
> it took over 108 minutes. This is beyond the maximum idle connection
> timeout of an AWS ELB of 60 minutes, and beyond the normal length of time
> most people expect an HTTP request to take. During these 108 minutes and
> after, aside from examining logs & stack traces, it is not possible to
> determine what is happening with regard to the run job request. It simply
> appears to hang and then fail, typically with a 504 Gateway Timeout status.
>
> There are a couple problems that are responsible for this situation. Let
> me know if I should move this discussion to the "devs" list: I am not a
> member there yet. I am happy to submit JIRAs and I would be able to submit
> a Pull Request for the change to FlinkKafkaConsumer08 (and 09)
> initialization as suggested below.
>
>    1. JarRunHandler is provided with a timeout value, but that timeout
>    value is ignored when calling getJobGraphAndClassLoader(). This allows HTTP
>    "run" requests to take arbitrary amounts of time during which the status of
>    the request and the job is unclear. Depending on the semantics of the work
>    that method does, perhaps it could be made asynchronous with a timeout?
>    2. FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9
>    consumer's constructor) performs network interaction & retries that can
>    take a long time, and the constructor is in the execution path beneath
>    getJobGraphAndClassLoader() via the main() method of the submitted Flink
>    job. It is not necessary to do that work (retrieving Kafka partition info)
>    in the constructor. Instead, that work should occur when the job is asked
>    to start, either by overriding the AbstractRichFunction#open() method or by
>    adding it to the top of the run() method. Alternatively, though not any
>    better, the signature of StreamExecutionEnvironment#addSource() could
>    be changed to take some kind of Factory<SourceFunction> so that
>    instantiation is deferred until necessary.
>
> "nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000
> nid=0x167d runnable [0x00007fd0cefcb000]
>    java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.Net.poll(Native Method)
> at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
> - locked <0x000000076a190060> (a java.lang.Object)
> at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
> - locked <0x000000076a1900f0> (a java.lang.Object)
> at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
> - locked <0x000000076a190180> (a java.lang.Object)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> - locked <0x000000076a190238> (a java.lang.Object)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
> at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
> at
> com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
> at
> com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
> at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
> at
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
> at
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
> at
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
> at
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
> at
> io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
> at
> org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
> io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)
>
> From: Shannon Carey <shannon.ca...@orbitz.com> on behalf of Shannon Carey
> <sca...@expedia.com>
> Date: Wednesday, June 1, 2016 at 12:54 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: API request to submit job takes over 1hr
>
> Hi folks,
>
> I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my
> test environment, and everything is working fine. However,  I am unable to
> submit jobs to the prod cluster.
>
> Uploading the JAR containing a Flink job succeeds. However, the request to
> run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so
> long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT
> response. This is the case even if the ELB timeout is set to 1hr: the
> request returns 504 after 1hr. The request appears to fail server-side,
> also, since no jobs have ever showed up in the UI as being in any status
> (successful/failed/completed or otherwise). Shortly after the request is
> made, it is interesting to note that sometimes (but not always), other
> requests by the UI to the API begin to take longer than usual, although
> they do all eventually complete.
>
> No interesting/suspicious log entries have been found. All YARN nodes
> appear healthy.
>
> Does anyone have ideas about what the problem might be? Or ideas about
> troubleshooting steps I should take?
>
> Also, I was wondering if 1GB is a reasonable amount of memory to use for
> the Flink Job Manager? It appears to be using only ~570MB but I am not sure
> if the Job Manager might be misbehaving due to resource constraints. The
> prod cluster is currently composed of six c3.2xlarge EC2 instances. Task
> memory is set to 10496, Job Manager memory is set to 1024, and there are 8
> slots set in the yarn-session.sh command. Are there any guidelines for
> memory allocation for the Job Manager?
>
> Thanks very much!
> Shannon Carey
>

Reply via email to