Re: How to get help on ClassCastException when re-submitting a job

2017-01-30 Thread Giuliano Caliari
Quick update: I've closed the issue after confirming that Yuri's workaround
fixed it for us. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-tp10972p11374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to get help on ClassCastException when re-submitting a job

2017-01-25 Thread Fabian Hueske
Thank you Giuliano!

2017-01-25 6:54 GMT+01:00 Giuliano Caliari :

> Issue reported:
>
> https://issues.apache.org/jira/browse/FLINK-5633
>
> Sorry for taking so long
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-
> on-ClassCastException-when-re-submitting-a-job-tp10972p11277.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How to get help on ClassCastException when re-submitting a job

2017-01-24 Thread Giuliano Caliari
Issue reported: 

https://issues.apache.org/jira/browse/FLINK-5633

Sorry for taking so long



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-tp10972p11277.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to get help on ClassCastException when re-submitting a job

2017-01-19 Thread Fabian Hueske
Hi Giuliano,

I think it would be good to document this behavior, not sure though what
the best place would be.
It would be nice, if you could open a JIRA and describe the issue there
(basically copy Yuri's analysis).

Thank you,
Fabian

2017-01-19 8:35 GMT+01:00 Giuliano Caliari :

> Hello,
>
> Yuri's description of the issue is spot on. We are running our cluster on
> YARN and using Avro for serialization, exactly as described.
>
> @Ufuk, I'm running my Cluster on YARN, 4 Task Managers with 2 slots each
> but
> this particular job has parallelism 1.
>
> @Yuri, I'll test your fix as soon as I can and report back.
>
> @Fabian, do you still want me to open the issue?
>
> Cheers,
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-
> on-ClassCastException-when-re-submitting-a-job-tp10972p11152.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How to get help on ClassCastException when re-submitting a job

2017-01-18 Thread Giuliano Caliari
Hello,

Yuri's description of the issue is spot on. We are running our cluster on
YARN and using Avro for serialization, exactly as described. 

@Ufuk, I'm running my Cluster on YARN, 4 Task Managers with 2 slots each but
this particular job has parallelism 1. 

@Yuri, I'll test your fix as soon as I can and report back. 

@Fabian, do you still want me to open the issue? 

Cheers,



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-tp10972p11152.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to get help on ClassCastException when re-submitting a job

2017-01-18 Thread Yury Ruchin
For my case I tracked down the culprit. It's been Avro indeed. I'm
providing details below, since I believe the pattern is pretty common for
such issues.

In YARN setup there are several sources where classes are loaded from:
Flink lib directory, YARN lib directories, user code. The first two sources
are handled by system classloader, the last one is loaded by
FlinkUserCodeClassLoader.

My streaming job parses Avro-encoded data using SpecificRecord facility. In
essence, the job looks like this: Source -> Avro parser (Map) -> Sink.
Parallelism is 1. Job operates inside a long-lived YARN session. I have a
subclass of SpecificRecord, say it's name is MySpecificRecord. From class
loading perspective, Avro library classes, including the SpecificRecord,
are loaded by system class loader from YARN lib dir - such classes are
shared across different Flink tasks within task manager. On the other side,
MySpecificRecord is in the job fat jar, so it gets loaded by
FlinkUserCodeClassLoader. Upon every job restart, task gets a new
FlinkUserCodeClassLoader instance, so classes from user code are confined
to a task instance.

Simply put, the parsing itself looks like this:

val bean = new
SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)

Now, the scenario:

1. I start my job. Parsing is initiated, so the SpecificDatumReader and
SpecificData get loaded by system classloader. A new
FlinkUserCodeClassloader is instantiated, let's denote its instance as "A".
MySpecificRecord then gets loaded by A.

2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache
that maps some string key derived from Avro schema to the implementing
class. So during parsing I get MySpecificRecord (A) cached there.

3. I stop the job and re-submit it. The JVM process is the same, so all
standard Avro classes, including SpecificData, remain loaded. A new task
instance is created and gets a new FlinkUserCodeClassLoader instance, let's
name it "B". A new MySpecificRecord class incarnation is loaded by B. From
JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A),
even though their bytecode is identical.

4. The job starts parsing again. SpecificDatumReader consults
SpecificData.INSTANCE's cache for any stashed classes and finds
MySpecificRecord (A) there.

5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate
a bean for filling the parsed data in.

6. SpecificDatumReader hands the filled instance of MySpecificRecord (A)
back to job.

7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).

8. ClassCastException :^(

I fixed the issue by not using the SpecificData.INSTANCE singleton (even
though this is considered a common and expected practice). I feed every
parser a new instance of SpecificData. This way the class cache is confined
to a parser instance and gets recycled along with it.

Hope this helps,
Yury

2017-01-16 14:03 GMT+03:00 Stephan Ewen :

> Hi!
>
> I think Yury pointed out the correct diagnosis. Caching the classes across
> multiple jobs in the same session can cause these types of issues.
>
> For YARN single-job deployments, Flink 1.2 will not to any dynamic
> classloading any more, but start with everything in the application
> classpath.
> For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot
> containers.
>
> Best,
> Stephan
>
>
>
> On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi  wrote:
>
>> @Giuliano: any updates? Very curious to figure out what's causing
>> this. As Fabian said, this is most likely a class loading issue.
>> Judging from the stack trace, you are not running with YARN but a
>> standalone cluster. Is that correct? Class loading wise nothing
>> changed between Flink 1.1 and Flink 1.2 with respect to class loading
>> and standalone clusters. Did you put any JARs into the lib folder of
>> Flink before submitting the job?
>>
>> – Ufuk
>>
>> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin 
>> wrote:
>> > Hi,
>> >
>> > I'd like to chime in since I've faced the same issue running Flink
>> 1.1.4. I
>> > have a long-running YARN session which I use to run multiple streaming
>> jobs
>> > concurrently. Once after cancelling and resubmitting the job I saw the
>> "X
>> > cannot be cast to X" ClassCastException exception in logs. I restarted
>> YARN
>> > session, then the problem disappeared.
>> >
>> > The class that failed to be cast was autogenerated by Avro compiler. I
>> know
>> > that Avro's Java binding does caching schemas in some static
>> WeakHashMap.
>> > I'm wondering whether that may step in the way of Flink classloading
>> design.
>> >
>> > Anyway, I would be interested in watching the issue in Flink JIRA.
>> >
>> > Giuliano, could you provide the issue number?
>> >
>> > Thanks,
>> > Yury
>> >
>> > 2017-01-11 14:11 GMT+03:00 Fabian Hueske :
>> >>
>> >> Hi Guiliano,
>> >>
>> >> thanks for bringing up this issue.
>> >> A 

Re: How to get help on ClassCastException when re-submitting a job

2017-01-16 Thread Stephan Ewen
Hi!

I think Yury pointed out the correct diagnosis. Caching the classes across
multiple jobs in the same session can cause these types of issues.

For YARN single-job deployments, Flink 1.2 will not to any dynamic
classloading any more, but start with everything in the application
classpath.
For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot
containers.

Best,
Stephan



On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi  wrote:

> @Giuliano: any updates? Very curious to figure out what's causing
> this. As Fabian said, this is most likely a class loading issue.
> Judging from the stack trace, you are not running with YARN but a
> standalone cluster. Is that correct? Class loading wise nothing
> changed between Flink 1.1 and Flink 1.2 with respect to class loading
> and standalone clusters. Did you put any JARs into the lib folder of
> Flink before submitting the job?
>
> – Ufuk
>
> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin 
> wrote:
> > Hi,
> >
> > I'd like to chime in since I've faced the same issue running Flink
> 1.1.4. I
> > have a long-running YARN session which I use to run multiple streaming
> jobs
> > concurrently. Once after cancelling and resubmitting the job I saw the "X
> > cannot be cast to X" ClassCastException exception in logs. I restarted
> YARN
> > session, then the problem disappeared.
> >
> > The class that failed to be cast was autogenerated by Avro compiler. I
> know
> > that Avro's Java binding does caching schemas in some static WeakHashMap.
> > I'm wondering whether that may step in the way of Flink classloading
> design.
> >
> > Anyway, I would be interested in watching the issue in Flink JIRA.
> >
> > Giuliano, could you provide the issue number?
> >
> > Thanks,
> > Yury
> >
> > 2017-01-11 14:11 GMT+03:00 Fabian Hueske :
> >>
> >> Hi Guiliano,
> >>
> >> thanks for bringing up this issue.
> >> A "ClassCastException: X cannot be cast to X" often points to a
> >> classloader issue.
> >> So it might actually be a bug in Flink.
> >>
> >> I assume you submit the same application (same jar file) with the same
> >> command right?
> >> Did you cancel the job before resubmitting?
> >>
> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> >> on top) and include the commit hash from which you built Flink?
> >> It would be great if you could provide a short example program and
> >> instructions how to reproduce the problem.
> >>
> >> Thank you very much,
> >> Fabian
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK
> >>
> >>
> >>
> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari  >:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I need some guidance on how to report a bug.
> >>>
> >>>
> >>>
> >>> I’m testing version 1.2 on my local cluster and the first time I submit
> >>> the job everything works but whenever I re-submit the same job it
> fails with
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The
> program
> >>> execution failed: Job execution failed.
> >>>
> >>> at
> >>> org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> >>>
> >>> at
> >>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:101)
> >>>
> >>> at
> >>> org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> >>>
> >>> at
> >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.
> execute(StreamContextEnvironment.java:66)
> >>>
> >>> at
> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:634)
> >>>
> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.
> delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(
> TraitorApp.scala:22)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(
> TraitorApp.scala:21)
> >>>
> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> >>>
> >>> at
> >>> scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> >>>
> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
> >>>
> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
> >>>
> >>> at scala.collection.immutable.List.foreach(List.scala:381)
> >>>
> >>> at
> >>> scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> >>>
> >>> at scala.App$class.main(App.scala:76)
> >>>
> >>> at
> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(
> TraitorApp.scala:21)
> >>>
> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(
> TraitorApp.scala)
> >>>
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>
> >>> at
> >>> sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >>>
> >>> at
> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> 

Re: How to get help on ClassCastException when re-submitting a job

2017-01-16 Thread Ufuk Celebi
@Giuliano: any updates? Very curious to figure out what's causing
this. As Fabian said, this is most likely a class loading issue.
Judging from the stack trace, you are not running with YARN but a
standalone cluster. Is that correct? Class loading wise nothing
changed between Flink 1.1 and Flink 1.2 with respect to class loading
and standalone clusters. Did you put any JARs into the lib folder of
Flink before submitting the job?

– Ufuk

On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin  wrote:
> Hi,
>
> I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
> have a long-running YARN session which I use to run multiple streaming jobs
> concurrently. Once after cancelling and resubmitting the job I saw the "X
> cannot be cast to X" ClassCastException exception in logs. I restarted YARN
> session, then the problem disappeared.
>
> The class that failed to be cast was autogenerated by Avro compiler. I know
> that Avro's Java binding does caching schemas in some static WeakHashMap.
> I'm wondering whether that may step in the way of Flink classloading design.
>
> Anyway, I would be interested in watching the issue in Flink JIRA.
>
> Giuliano, could you provide the issue number?
>
> Thanks,
> Yury
>
> 2017-01-11 14:11 GMT+03:00 Fabian Hueske :
>>
>> Hi Guiliano,
>>
>> thanks for bringing up this issue.
>> A "ClassCastException: X cannot be cast to X" often points to a
>> classloader issue.
>> So it might actually be a bug in Flink.
>>
>> I assume you submit the same application (same jar file) with the same
>> command right?
>> Did you cancel the job before resubmitting?
>>
>> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
>> on top) and include the commit hash from which you built Flink?
>> It would be great if you could provide a short example program and
>> instructions how to reproduce the problem.
>>
>> Thank you very much,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK
>>
>>
>>
>> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari :
>>>
>>> Hello,
>>>
>>>
>>>
>>> I need some guidance on how to report a bug.
>>>
>>>
>>>
>>> I’m testing version 1.2 on my local cluster and the first time I submit
>>> the job everything works but whenever I re-submit the same job it fails with
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>>
>>> at
>>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>>
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>>>
>>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>>>
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>>
>>> at
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>> at
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>>
>>> at scala.App$class.main(App.scala:76)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>>>
>>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>>>
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>>>
>>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>>
>>> at 

Re: How to get help on ClassCastException when re-submitting a job

2017-01-12 Thread Yury Ruchin
Hi,

I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
have a long-running YARN session which I use to run multiple streaming jobs
concurrently. Once after cancelling and resubmitting the job I saw the "X
cannot be cast to X" ClassCastException exception in logs. I restarted YARN
session, then the problem disappeared.

The class that failed to be cast was autogenerated by Avro compiler. I know
that Avro's Java binding does caching schemas in some static WeakHashMap.
I'm wondering whether that may step in the way of Flink classloading design.

Anyway, I would be interested in watching the issue in Flink JIRA.

Giuliano, could you provide the issue number?

Thanks,
Yury

2017-01-11 14:11 GMT+03:00 Fabian Hueske :

> Hi Guiliano,
>
> thanks for bringing up this issue.
> A "ClassCastException: X cannot be cast to X" often points to a
> classloader issue.
> So it might actually be a bug in Flink.
>
> I assume you submit the same application (same jar file) with the same
> command right?
> Did you cancel the job before resubmitting?
>
> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> on top) and include the commit hash from which you built Flink?
> It would be great if you could provide a short example program and
> instructions how to reproduce the problem.
>
> Thank you very much,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK
>
>
>
> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari :
>
>> Hello,
>>
>>
>>
>> I need some guidance on how to report a bug.
>>
>>
>>
>> I’m testing version 1.2 on my local cluster and the first time I submit
>> the job everything works but whenever I re-submit the same job it fails
>> with
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:427)
>>
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:101)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>>
>> at org.apache.flink.streaming.api.environment.StreamContextEnvi
>> ronment.execute(StreamContextEnvironment.java:66)
>>
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:634)
>>
>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi
>> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$
>> body.apply(TraitorApp.scala:21)
>>
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.runtime.AbstractFunction0.apply$mcV$sp(
>> AbstractFunction0.scala:12)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.generic.TraversableForwarder$class.foreach(
>> TraversableForwarder.scala:35)
>>
>> at scala.App$class.main(App.scala:76)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA
>> pp.scala:21)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>> od(PackagedProgram.java:528)
>>
>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>> ctiveModeForExecution(PackagedProgram.java:419)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:339)
>>
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>
>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>>
>> at org.apache.flink.runtime.security.NoOpSecurityContext.runSec
>> ured(NoOpSecurityContext.java:29)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>>
>> at 

Re: How to get help on ClassCastException when re-submitting a job

2017-01-11 Thread Fabian Hueske
Hi Guiliano,

thanks for bringing up this issue.
A "ClassCastException: X cannot be cast to X" often points to a classloader
issue.
So it might actually be a bug in Flink.

I assume you submit the same application (same jar file) with the same
command right?
Did you cancel the job before resubmitting?

Can you create a JIRA issue [1] for this bug (hit the read CREATE button on
top) and include the commit hash from which you built Flink?
It would be great if you could provide a short example program and
instructions how to reproduce the problem.

Thank you very much,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK



2017-01-11 1:22 GMT+01:00 Giuliano Caliari :

> Hello,
>
>
>
> I need some guidance on how to report a bug.
>
>
>
> I’m testing version 1.2 on my local cluster and the first time I submit
> the job everything works but whenever I re-submit the same job it fails
> with
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
>
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:101)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
>
> at org.apache.flink.streaming.api.environment.StreamContextEnvironment.
> execute(StreamContextEnvironment.java:66)
>
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:634)
>
> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>
> at au.com.my.package.pTraitor.TraitorAppOneTrait$.
> delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(
> TraitorApp.scala:22)
>
> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(
> TraitorApp.scala:21)
>
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
>
> at scala.App$$anonfun$main$1.apply(App.scala:76)
>
> at scala.App$$anonfun$main$1.apply(App.scala:76)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
>
> at scala.App$class.main(App.scala:76)
>
> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(
> TraitorApp.scala:21)
>
> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:339)
>
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1073)
>
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>
> at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(
> NoOpSecurityContext.java:29)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> mcV$sp(JobManager.scala:900)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>
> at 

How to get help on ClassCastException when re-submitting a job

2017-01-10 Thread Giuliano Caliari
Hello,



I need some guidance on how to report a bug.



I’m testing version 1.2 on my local cluster and the first time I submit the
job everything works but whenever I re-submit the same job it fails with

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)

at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)

at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)

at
au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)

at
au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)

at scala.Function0$class.apply$mcV$sp(Function0.scala:34)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.collection.immutable.List.foreach(List.scala:381)

at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)

at scala.App$class.main(App.scala:76)

at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)

at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)

at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Could not forward element to next
operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)

at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)

at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)

at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)

at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)

at