Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job 
at a time or when multiple jobs are submitted at the same time? I'm asking this 
because I noticed that you used Future to execute the job unblocking. I guess 
ThreadLocal doesn't work well in this case. 

Regards,
Dian

> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
> 
> Hi tison -
> 
> Please find my response below in >>.
> 
> regards.
> 
> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com 
> <mailto:wander4...@gmail.com>> wrote:
> Hi Debasish,
> 
> The OptimizerPlanEnvironment.ProgramAbortException should be caught at 
> OptimizerPlanEnvironment#getOptimizedPlan
> in its catch (Throwable t) branch.
> 
> >> true but what I get is a StreamPlanEnvironment. From my code I am only 
> >> doing val env = StreamExecutionEnvironment.getExecutionEnvironment. 
> 
> It should always throw a ProgramInvocationException instead of 
> OptimizerPlanEnvironment.ProgramAbortException if any
> exception thrown in the main method of your code.
> 
> Another important problem is how the code is executed, (set context 
> environment should be another flink internal operation)
> but given that you submit the job via flink k8s operator it might require 
> time to take a look at k8s operator implementation.
> 
> >> We submit the code through Kubernetes Flink Operator which uses the REST 
> >> API to submit the job to the Job Manager
> 
> However, given we catch Throwable in the place this exception thrown, I 
> highly suspect whether it is executed by an official
> flink release.
> 
> >> It is an official Flink release 1.9.0 
> 
> A completed version of the code and the submission process is helpful. 
> Besides, what is buildExecutionGraph return type,
> I think it is not ExecutionGraph in flink...
> 
> >> buildExecutionGraph is our function which returns a Unit. It's not 
> >> ExecutionGraph. It builds the DataStream s by reading from Kafka and then 
> >> finally writes to Kafka. 
> 
> Best,
> tison.
> 
> 
> Debasish Ghosh <ghosh.debas...@gmail.com <mailto:ghosh.debas...@gmail.com>> 
> 于2019年9月23日周一 下午8:21写道:
> This is the complete stack trace which we get from execution on Kubernetes 
> using the Flink Kubernetes operator .. The boxed error comes from the fact 
> that we complete a Promise with Success when it returns a JobExecutionResult 
> and with Failure when we get an exception. And here we r getting an 
> exception. So the real stack trace we have is the one below in Caused By. 
> 
> java.util.concurrent.ExecutionException: Boxed Error
> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
> at 
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
> at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
> at 
> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
> at 
> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
> at scala.util.Failure.fold(Try.scala:240)
> at 
> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
> at scala.util.Try$.apply(Try.scala:213)
> at pipelines.runner.Runner$.run(Runner.scala:43)
> at pipelines.runner.Runner$.main(Runner.scala:30)
> at pipelines.runner.Runner.main(Runner.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at 
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
> at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at 
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at 
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
> at 
> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
> at scala.util.Try$.apply(Try.scala:213)
> at 
> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
> ... 20 more
> 
> regards.
> 
> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Regarding to the code you pasted, personally I think nothing is wrong. The 
> problem is how it's executed. As you can see from the implementation of of 
> StreamExecutionEnvironment.getExecutionEnvironment, it may created different 
> StreamExecutionEnvironment implementations under different scenarios. Could 
> you paste the full exception stack if it exists? It's difficult to figure out 
> what's wrong with the current stack trace.
> 
> Regards,
> Dian
> 
>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com 
>> <mailto:ghosh.debas...@gmail.com>> 写道:
>> 
>> Can it be the case that the threadLocal stuff in 
>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>>  
>> <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609>
>>  does not behave deterministically when we submit job through a Kubernetes 
>> Flink operator ? Utils also selects the factory to create the context based 
>> on either Thread local storage or a static mutable variable. 
>> 
>> Can these be source of problems in our case ?
>> 
>> regards.
>> 
>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com 
>> <mailto:ghosh.debas...@gmail.com>> wrote:
>> ah .. Ok .. I get the Throwable part. I am using 
>> 
>> import org.apache.flink.streaming.api.scala._
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> 
>> How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?
>> 
>> regards.
>> 
>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> Hi Debasish,
>> 
>> As I said before, the exception is caught in [1]. It catches the Throwable 
>> and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". 
>> Regarding to the cause of this exception, I have the same feeling with Tison 
>> and I also think that the wrong StreamExecutionEnvironment is used.
>> 
>> Regards,
>> Dian
>> 
>> [1] 
>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>  
>> <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76>
>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com 
>>> <mailto:ghosh.debas...@gmail.com>> 写道:
>>> 
>>> Hi Tison -
>>> 
>>> This is the code that builds the computation graph. readStream reads from 
>>> Kafka and writeStream writes to Kafka.
>>> 
>>>     override def buildExecutionGraph = {
>>>       val rides: DataStream[TaxiRide] =
>>>         readStream(inTaxiRide)
>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>           .keyBy("rideId")
>>> 
>>>       val fares: DataStream[TaxiFare] =
>>>         readStream(inTaxiFare)
>>>           .keyBy("rideId")
>>> 
>>>       val processed: DataStream[TaxiRideFare] =
>>>         rides
>>>           .connect(fares)
>>>           .flatMap(new EnrichmentFunction)
>>> 
>>>       writeStream(out, processed)
>>>     }
>>> 
>>> I also checked that my code enters this function 
>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>  
>>> <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57>
>>>  and then the exception is thrown. I tried to do a grep on the Flink code 
>>> base to see where this exception is caught. If I take off the tests, I 
>>> don't see any catch of this exception ..
>>> 
>>> $ find . -name "*.java" | xargs grep 
>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>  throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>  throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>  throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>  throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>  } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>  
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>  @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
>>> timeout = 30_000)
>>> 
>>> What am I missing here ?
>>> 
>>> regards.
>>> 
>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com 
>>> <mailto:wander4...@gmail.com>> wrote:
>>> Hi Debasish,
>>> 
>>> As mentioned by Dian, it is an internal exception that should be always 
>>> caught by
>>> Flink internally. I would suggest you share the job(abstractly). Generally 
>>> it is because
>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
>>> 
>>> Best,
>>> tison.
>>> 
>>> 
>>> Austin Cawley-Edwards <austin.caw...@gmail.com 
>>> <mailto:austin.caw...@gmail.com>> 于2019年9月23日周一 上午5:09写道:
>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re usually 
>>> pretty active on there. 
>>> 
>>> Here’s the link:
>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>  
>>> <https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU>
>>>  
>>> 
>>> 
>>> Best,
>>> Austin
>>> 
>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com 
>>> <mailto:ghosh.debas...@gmail.com>> wrote:
>>> The problem is I am submitting Flink jobs to Kubernetes cluster using a 
>>> Flink Operator. Hence it's difficult to debug in the traditional sense of 
>>> the term. And all I get is the exception that I reported ..
>>> 
>>> Caused by: 
>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>> at 
>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>> at 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>> 
>>> I am thinking that this exception must be coming because of some other 
>>> exceptions, which are not reported BTW. I expected a Caused By portion in 
>>> the stack trace. Any clue as to which area I should look into to debug this.
>>> 
>>> regards.
>>> 
>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com 
>>> <mailto:ghosh.debas...@gmail.com>> wrote:
>>> Thanks for the pointer .. I will try debugging. I am getting this exception 
>>> running my application on Kubernetes using the Flink operator from Lyft. 
>>> 
>>> regards.
>>> 
>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com 
>>> <mailto:dian0511...@gmail.com>> wrote:
>>> This exception is used internally to get the plan of a job before 
>>> submitting it for execution. It's thrown with special purpose and will be 
>>> caught internally in [1] and will not be thrown to end users usually. 
>>> 
>>> You could check the following places to find out the cause to this problem:
>>> 1. Check the execution environment you used
>>> 2. If you can debug, set a breakpoint at[2] to see if the type of the env 
>>> wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it 
>>> should be.
>>> 
>>> Regards,
>>> Dian
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>  
>>> <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76>
>>> [2] 
>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>  
>>> <https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57>
>>> 
>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com 
>>>> <mailto:ghosh.debas...@gmail.com>> 写道:
>>>> 
>>>> Hi -
>>>> 
>>>> When you get an exception stack trace like this ..
>>>> 
>>>> Caused by: 
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>> at 
>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>> at 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>> at 
>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>> 
>>>> what is the recommended approach of debugging ? I mean what kind of errors 
>>>> can potentially lead to such a stacktrace ? In my case it starts from 
>>>> env.execute(..) but does not give any information as to what can go wrong.
>>>> 
>>>> Any help will be appreciated.
>>>> 
>>>> regards.
>>>> 
>>>> -- 
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>>>> http://manning.com/ghosh <http://manning.com/ghosh>
>>>> 
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>>>> Code: http://github.com/debasishg <http://github.com/debasishg>
>>> -- 
>>> Sent from my iPhone
>>> 
>>> 
>>> -- 
>>> Debasish Ghosh
>>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>>> http://manning.com/ghosh <http://manning.com/ghosh>
>>> 
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>>> Code: http://github.com/debasishg <http://github.com/debasishg>
>>> 
>>> -- 
>>> Debasish Ghosh
>>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>>> http://manning.com/ghosh <http://manning.com/ghosh>
>>> 
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>>> Code: http://github.com/debasishg <http://github.com/debasishg>
>> 
>> 
>> -- 
>> Debasish Ghosh
>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>> http://manning.com/ghosh <http://manning.com/ghosh>
>> 
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>> Code: http://github.com/debasishg <http://github.com/debasishg>
>> 
>> -- 
>> Debasish Ghosh
>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>> http://manning.com/ghosh <http://manning.com/ghosh>
>> 
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>> Code: http://github.com/debasishg <http://github.com/debasishg>
> 
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 <http://manning.com/ghosh2>
> http://manning.com/ghosh <http://manning.com/ghosh>
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> Code: http://github.com/debasishg <http://github.com/debasishg>
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 <http://manning.com/ghosh2>
> http://manning.com/ghosh <http://manning.com/ghosh>
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> Code: http://github.com/debasishg <http://github.com/debasishg>

Reply via email to