Hi Dominique,

can you check if the versions of the remotely running job manager & task
managers are the same as the Flink version that is used to submit the job?
The version and commit hash are logged at the top of the JM and TM log
files.

Right now, the local client optimizes the job, chooses the execution
strategies, and sends the plan to the remote JobManager. Recently, we added
and removed some strategies. So it might be that the strategy enum of
client and jobmanager got out of sync.

Cheers, Fabian

2016-02-10 7:33 GMT+01:00 Dominique Rondé <dominique.ro...@codecentric.de>:

> Hi,
>
> your guess is correct. I use java all the time... Here is the complete
> stacktrace:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:367)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:345)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:312)
>     at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
>     at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
>     at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
>     at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>     at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>     at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
>     at
> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>     at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
>     at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>     at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
> Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' , caused
> an error: Unsupported driver strategy for join driver: CO_GROUP_RAW
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>     at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Unsupported driver strategy for join
> driver: CO_GROUP_RAW
>     at
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>     ... 3 more
>
>
> Am 09.02.2016 um 21:03 schrieb Fabian Hueske:
>
> Hi,
> glad you could resolve the POJO issue, but the new error doesn't look
> right.
> The CO_GROUP_RAW strategy should only be used for programs that are
> implemented against the Python DataSet API.
> I guess that's not the case since all code snippets were Java so far.
>
> Can you post the full stacktrace of the exception?
>
> 2016-02-09 20:13 GMT+01:00 Dominique Rondé <dominique.ro...@codecentric.de
> >:
>
>> Hi all,
>>
>> i finally figured out that there is a getter for a boolean field which
>> may be the source of the trouble. It seems that getBooleanField (as we use
>> it) is not the best choice. Now the plan is executed with another error
>> code. :(
>>
>> Caused by: java.lang.Exception: Unsupported driver strategy for join
>> driver: CO_GROUP_RAW
>>
>> Is there any link to a documentation or some example code which you  may
>> recommend beside the offical documentation?
>>
>> But folks, thanks for your greate support! A really nice community here!
>>
>> Greets
>> Dominique
>>
>>
>> Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
>>
>> I tested the TypeExtractor with your SourceA and SourceB types (adding
>> proper setters and getters) and it correctly returned a PojoType. Thus,
>> I would suspect that you haven’t specified the proper setters and getters
>> in your implementation.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <
>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote:
>>
>>> Here we go!
>>>
>>>   ExecutionEnvironment env =
>>> ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
>>> 53408,"flink-job.jar");
>>>
>>>
>>>   DataSource<String> datasourceA=
>>> env.readTextFile("hdfs://dev//sourceA/");
>>>   DataSource<String> datasourceB=
>>> env.readTextFile("hdfs://dev//sourceB/");
>>>
>>>   DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
>>>   DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());
>>>
>>>   sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>>>
>>> Thanks a lot!
>>> Dominique
>>>
>>>
>>> Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>>>
>>> Could you post the complete example code (Flink job including the type
>>> definitions). For example, if the data sets are of type DataSet<Parent>,
>>> then it will be treated as a GenericType. Judging from your pseudo
>>> code, it looks fine on the first glance.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
>>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote:
>>>
>>>> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
>>>> String?
>>>>
>>>> public abstract class Parent{
>>>>   private Date eventDate;
>>>>   private EventType eventType;
>>>>   private String sessionId;
>>>>
>>>> public Parent() { }
>>>> //GETTER & SETTER
>>>> }
>>>>
>>>> public class SourceA extends Parent{
>>>>   private Boolean outboundMessage;
>>>>   private String soapMessage;
>>>>
>>>> public SourceA () {
>>>>     super();
>>>>  }
>>>> //GETTER & SETTER
>>>> }
>>>>
>>>> public class SourceB extends Parent{
>>>>   private Integer id;
>>>>   private String username;
>>>>
>>>> public SourceB () {
>>>>     super();
>>>>  }
>>>> //GETTER & SETTER
>>>>
>>>> }
>>>>
>>>> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>>>
>>>> Could you share the code for your types SourceA and SourceB. It seems
>>>> as if Flink does not recognize them to be POJOs because he assigned them
>>>> the GenericType type. Either there is something wrong with the type
>>>> extractor or your implementation does not fulfil the requirements for
>>>> POJOs, as indicated by Chiwan.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
>>>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote:
>>>>
>>>>> The fields in SourceA and SourceB are private but have public getters
>>>>> and setters. The classes provide an empty and public constructor.
>>>>> Am 09.02.2016 11:47 schrieb "Chiwan Park" < <chiwanp...@apache.org>
>>>>> chiwanp...@apache.org>:
>>>>>
>>>>>> Oh, the fields in SourceA have public getters. Does the fields in
>>>>>> SourceA have public setter? SourceA needs public setter for private 
>>>>>> fields.
>>>>>>
>>>>>> Regards,
>>>>>> Chiwan Park
>>>>>>
>>>>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < <chiwanp...@apache.org>
>>>>>> chiwanp...@apache.org> wrote:
>>>>>> >
>>>>>> > Hi Dominique,
>>>>>> >
>>>>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in
>>>>>> SourceA public? There are some requirements for POJO classes [1].
>>>>>> >
>>>>>> > [1]:
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>>>>> >
>>>>>> > Regards,
>>>>>> > Chiwan Park
>>>>>> >
>>>>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>>>>>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Hi  folks,
>>>>>> >>
>>>>>> >> i try to join two datasets containing some PoJos. Each PoJo
>>>>>> inherit a field "sessionId" from the parent class. The field is private 
>>>>>> but
>>>>>> has a public getter.
>>>>>> >>
>>>>>> >> The join is like this:
>>>>>> >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet =
>>>>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>>>>> >>
>>>>>> >> But the result is the following execption:
>>>>>> >>
>>>>>> >> Exception in thread "main"
>>>>>> org.apache.flink.api.common.InvalidProgramException: This type
>>>>>> (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as 
>>>>>> key.
>>>>>> >>    at
>>>>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
>>>>>> >>    at
>>>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>>>>> >>    at
>>>>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>>>>> >>
>>>>>> >> I spend some time with google around but I don't get an idea what
>>>>>> is wrong. I hope some of you can give me a hint...
>>>>>> >>
>>>>>> >> Greets
>>>>>> >> Dominique
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Dominique Rondé | Senior Consultant
>>>>
>>>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>>>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
>>>> www.meettheexperts.de | www.more4fi.de
>>>>
>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>>>
>>>>
>>>
>>> --
>>> Dominique Rondé | Senior Consultant
>>>
>>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
>>> www.meettheexperts.de | www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>>
>>>
>>
>> --
>> Dominique Rondé | Senior Consultant
>>
>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
>> www.meettheexperts.de | www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>
>>
>
> --
> Dominique Rondé | Senior Consultant
>
> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
> www.meettheexperts.de | www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>

Reply via email to