Hi Debasish, In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case.
Regards, Dian > 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: > > Hi tison - > > Please find my response below in >>. > > regards. > > On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com > <mailto:wander4...@gmail.com>> wrote: > Hi Debasish, > > The OptimizerPlanEnvironment.ProgramAbortException should be caught at > OptimizerPlanEnvironment#getOptimizedPlan > in its catch (Throwable t) branch. > > >> true but what I get is a StreamPlanEnvironment. From my code I am only > >> doing val env = StreamExecutionEnvironment.getExecutionEnvironment. > > It should always throw a ProgramInvocationException instead of > OptimizerPlanEnvironment.ProgramAbortException if any > exception thrown in the main method of your code. > > Another important problem is how the code is executed, (set context > environment should be another flink internal operation) > but given that you submit the job via flink k8s operator it might require > time to take a look at k8s operator implementation. > > >> We submit the code through Kubernetes Flink Operator which uses the REST > >> API to submit the job to the Job Manager > > However, given we catch Throwable in the place this exception thrown, I > highly suspect whether it is executed by an official > flink release. > > >> It is an official Flink release 1.9.0 > > A completed version of the code and the submission process is helpful. > Besides, what is buildExecutionGraph return type, > I think it is not ExecutionGraph in flink... > > >> buildExecutionGraph is our function which returns a Unit. It's not > >> ExecutionGraph. It builds the DataStream s by reading from Kafka and then > >> finally writes to Kafka. > > Best, > tison. > > > Debasish Ghosh <ghosh.debas...@gmail.com <mailto:ghosh.debas...@gmail.com>> > 于2019年9月23日周一 下午8:21写道: > This is the complete stack trace which we get from execution on Kubernetes > using the Flink Kubernetes operator .. The boxed error comes from the fact > that we complete a Promise with Success when it returns a JobExecutionResult > and with Failure when we get an exception. And here we r getting an > exception. So the real stack trace we have is the one below in Caused By. > > java.util.concurrent.ExecutionException: Boxed Error > at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at scala.concurrent.Promise.tryFailure(Promise.scala:112) > at scala.concurrent.Promise.tryFailure$(Promise.scala:112) > at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) > at scala.util.Failure.fold(Try.scala:240) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) > at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) > at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) > at scala.util.Try$.apply(Try.scala:213) > at pipelines.runner.Runner$.run(Runner.scala:43) > at pipelines.runner.Runner$.main(Runner.scala:30) > at pipelines.runner.Runner.main(Runner.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException > at > org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) > at scala.util.Try$.apply(Try.scala:213) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) > ... 20 more > > regards. > > On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Regarding to the code you pasted, personally I think nothing is wrong. The > problem is how it's executed. As you can see from the implementation of of > StreamExecutionEnvironment.getExecutionEnvironment, it may created different > StreamExecutionEnvironment implementations under different scenarios. Could > you paste the full exception stack if it exists? It's difficult to figure out > what's wrong with the current stack trace. > > Regards, > Dian > >> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com >> <mailto:ghosh.debas...@gmail.com>> 写道: >> >> Can it be the case that the threadLocal stuff in >> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >> >> <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609> >> does not behave deterministically when we submit job through a Kubernetes >> Flink operator ? Utils also selects the factory to create the context based >> on either Thread local storage or a static mutable variable. >> >> Can these be source of problems in our case ? >> >> regards. >> >> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com >> <mailto:ghosh.debas...@gmail.com>> wrote: >> ah .. Ok .. I get the Throwable part. I am using >> >> import org.apache.flink.streaming.api.scala._ >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ? >> >> regards. >> >> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com >> <mailto:dian0511...@gmail.com>> wrote: >> Hi Debasish, >> >> As I said before, the exception is caught in [1]. It catches the Throwable >> and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". >> Regarding to the cause of this exception, I have the same feeling with Tison >> and I also think that the wrong StreamExecutionEnvironment is used. >> >> Regards, >> Dian >> >> [1] >> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >> >> <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76> >>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com >>> <mailto:ghosh.debas...@gmail.com>> 写道: >>> >>> Hi Tison - >>> >>> This is the code that builds the computation graph. readStream reads from >>> Kafka and writeStream writes to Kafka. >>> >>> override def buildExecutionGraph = { >>> val rides: DataStream[TaxiRide] = >>> readStream(inTaxiRide) >>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>> .keyBy("rideId") >>> >>> val fares: DataStream[TaxiFare] = >>> readStream(inTaxiFare) >>> .keyBy("rideId") >>> >>> val processed: DataStream[TaxiRideFare] = >>> rides >>> .connect(fares) >>> .flatMap(new EnrichmentFunction) >>> >>> writeStream(out, processed) >>> } >>> >>> I also checked that my code enters this function >>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>> >>> <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57> >>> and then the exception is thrown. I tried to do a grep on the Flink code >>> base to see where this exception is caught. If I take off the tests, I >>> don't see any catch of this exception .. >>> >>> $ find . -name "*.java" | xargs grep >>> "OptimizerPlanEnvironment.ProgramAbortException" >>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>> >>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, >>> timeout = 30_000) >>> >>> What am I missing here ? >>> >>> regards. >>> >>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com >>> <mailto:wander4...@gmail.com>> wrote: >>> Hi Debasish, >>> >>> As mentioned by Dian, it is an internal exception that should be always >>> caught by >>> Flink internally. I would suggest you share the job(abstractly). Generally >>> it is because >>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. >>> >>> Best, >>> tison. >>> >>> >>> Austin Cawley-Edwards <austin.caw...@gmail.com >>> <mailto:austin.caw...@gmail.com>> 于2019年9月23日周一 上午5:09写道: >>> Have you reached out to the FlinkK8sOperator team on Slack? They’re usually >>> pretty active on there. >>> >>> Here’s the link: >>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>> >>> <https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU> >>> >>> >>> >>> Best, >>> Austin >>> >>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com >>> <mailto:ghosh.debas...@gmail.com>> wrote: >>> The problem is I am submitting Flink jobs to Kubernetes cluster using a >>> Flink Operator. Hence it's difficult to debug in the traditional sense of >>> the term. And all I get is the exception that I reported .. >>> >>> Caused by: >>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>> at >>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>> >>> I am thinking that this exception must be coming because of some other >>> exceptions, which are not reported BTW. I expected a Caused By portion in >>> the stack trace. Any clue as to which area I should look into to debug this. >>> >>> regards. >>> >>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com >>> <mailto:ghosh.debas...@gmail.com>> wrote: >>> Thanks for the pointer .. I will try debugging. I am getting this exception >>> running my application on Kubernetes using the Flink operator from Lyft. >>> >>> regards. >>> >>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com >>> <mailto:dian0511...@gmail.com>> wrote: >>> This exception is used internally to get the plan of a job before >>> submitting it for execution. It's thrown with special purpose and will be >>> caught internally in [1] and will not be thrown to end users usually. >>> >>> You could check the following places to find out the cause to this problem: >>> 1. Check the execution environment you used >>> 2. If you can debug, set a breakpoint at[2] to see if the type of the env >>> wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it >>> should be. >>> >>> Regards, >>> Dian >>> >>> [1] >>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>> >>> <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76> >>> [2] >>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>> >>> <https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57> >>> >>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com >>>> <mailto:ghosh.debas...@gmail.com>> 写道: >>>> >>>> Hi - >>>> >>>> When you get an exception stack trace like this .. >>>> >>>> Caused by: >>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>> at >>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>> at >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>> >>>> what is the recommended approach of debugging ? I mean what kind of errors >>>> can potentially lead to such a stacktrace ? In my case it starts from >>>> env.execute(..) but does not give any information as to what can go wrong. >>>> >>>> Any help will be appreciated. >>>> >>>> regards. >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 <http://manning.com/ghosh2> >>>> http://manning.com/ghosh <http://manning.com/ghosh> >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >>>> Code: http://github.com/debasishg <http://github.com/debasishg> >>> -- >>> Sent from my iPhone >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 <http://manning.com/ghosh2> >>> http://manning.com/ghosh <http://manning.com/ghosh> >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >>> Code: http://github.com/debasishg <http://github.com/debasishg> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 <http://manning.com/ghosh2> >>> http://manning.com/ghosh <http://manning.com/ghosh> >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >>> Code: http://github.com/debasishg <http://github.com/debasishg> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 <http://manning.com/ghosh2> >> http://manning.com/ghosh <http://manning.com/ghosh> >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >> Code: http://github.com/debasishg <http://github.com/debasishg> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 <http://manning.com/ghosh2> >> http://manning.com/ghosh <http://manning.com/ghosh> >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >> Code: http://github.com/debasishg <http://github.com/debasishg> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 <http://manning.com/ghosh2> > http://manning.com/ghosh <http://manning.com/ghosh> > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> > Code: http://github.com/debasishg <http://github.com/debasishg> > > -- > Debasish Ghosh > http://manning.com/ghosh2 <http://manning.com/ghosh2> > http://manning.com/ghosh <http://manning.com/ghosh> > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> > Code: http://github.com/debasishg <http://github.com/debasishg>