Re: Running single Flink job in a job cluster, problem starting JobManager

2019-02-12 Thread Thomas Eckestad
I have investigated this further:

During normal operation, without Spring Boot, 
OptimizerPlanEnvironment.ProgramAbortException is thrown by Flink from 
StreamPlanEnvironment::execute():70. This is caught by 
PackagedProgram::callMainMethod():537, where it is re-thrown as an Error 
exception. This Error is caught in 
OptimizerPlanEnvironment::getOptimizedPlan():88, which checks if the 
optimizerPlan field != null, if so it returns the value of that field else it 
re-throws, i.e. since the optimizerPlan IS != null the exception stops there 
and the job is executed as expected. I.e. the Flink control flow is relying on 
throwing (and handling) ProgramAbortException.

When using Spring Boot the execution fails due to the 
OptimizerPlanEnvironment.ProgramAbortException mentioned above. In that case 
Spring logic gets between PackagedProgram::callMainMethod() and the invocation 
of the method where the Flink ExecutionEnvironment is built and executed. 
Spring will catch any Throwable and interpret it as a failure and exit.

I guess when deploying the Spring Boot Flink job to a session-cluster, which I 
mentioned above works fine, the Flink job does not rely on passing exceptions 
between Flink bootstrap-code and the Flink job?

/Thomas


From: Chesnay Schepler 
Sent: Sunday, February 10, 2019 10:30:54 AM
To: Thomas Eckestad; user@flink.apache.org
Subject: Re: Running single Flink job in a job cluster, problem starting 
JobManager

I'm afraid we haven't had much experience with Spring Boot Flink applications.

It is indeed strange that the job ends up using a StreamPlanEnvironment.
As a debugging step I would look into all calls to 
ExecutionEnviroment#initializeContextEnvironment().
This is how specific execution environments are injected into 
(Stream)ExecutionEnvironment#getEnvironment().

On 08.02.2019 15:17, Thomas Eckestad wrote:
Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with Spring Boot 
when submitting jobs to a session cluster, as stated before.

/Thomas

From: Thomas Eckestad 
<mailto:thomas.eckes...@verisure.com>
Sent: Friday, February 8, 2019 12:14 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Running single Flink job in a job cluster, problem starting JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.7_flink-2Dcontainer_docker_README.md&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=CXkwM8WcThTDrIFvV0U_OQL5QmZ-Qn2g1lQSSNaAd1k&e=>

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBea

Re: Running single Flink job in a job cluster, problem starting JobManager

2019-02-10 Thread Chesnay Schepler
I'm afraid we haven't had much experience with Spring Boot Flink 
applications.


It is indeed strange that the job ends up using a StreamPlanEnvironment.
As a debugging step I would look into all calls to 
ExecutionEnviroment#initializeContextEnvironment().
This is how specific execution environments are injected into 
(Stream)ExecutionEnvironment#getEnvironment().


On 08.02.2019 15:17, Thomas Eckestad wrote:

Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with 
Spring Boot when submitting jobs to a session cluster, as stated before.


/Thomas

*From:* Thomas Eckestad 
*Sent:* Friday, February 8, 2019 12:14 PM
*To:* user@flink.apache.org
*Subject:* Running single Flink job in a job cluster, problem starting 
JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have 
created a Docker image according to:


https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125


the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error 
creating bean with name 'MyFlinkJob': Invocation of init method 
failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)

at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)

at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostPr

Re: Running single Flink job in a job cluster, problem starting JobManager

2019-02-08 Thread Thomas Eckestad
Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with Spring Boot 
when submitting jobs to a session cluster, as stated before.

/Thomas

From: Thomas Eckestad 
Sent: Friday, February 8, 2019 12:14 PM
To: user@flink.apache.org
Subject: Running single Flink job in a job cluster, problem starting JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster 
(start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing 
something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the 

Running single Flink job in a job cluster, problem starting JobManager

2019-02-08 Thread Thomas Eckestad
Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster 
(start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing 
something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the execution ends up in 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute?

>From 
>https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:

/**
 * A special {@link StreamExecutionEnvironment} that is used in the web 
frontend when generating
 * a user-inspectable graph of a streaming job.
 */
@PublicEvolving
public class StreamPlanEnvironment extends StreamExecutionEnvironment {