Hi Fabian,

your hint was good! Maven fools me with the dependency management. Now everything works as expected!

Many many thanks to all of you!

Greets
Dominique

Am 10.02.2016 um 08:45 schrieb Fabian Hueske:
Hi Dominique,

can you check if the versions of the remotely running job manager & task managers are the same as the Flink version that is used to submit the job? The version and commit hash are logged at the top of the JM and TM log files.

Right now, the local client optimizes the job, chooses the execution strategies, and sends the plan to the remote JobManager. Recently, we added and removed some strategies. So it might be that the strategy enum of client and jobmanager got out of sync.

Cheers, Fabian

2016-02-10 7:33 GMT+01:00 Dominique Rondé <dominique.ro...@codecentric.de <mailto:dominique.ro...@codecentric.de>>:

    Hi,

    your guess is correct. I use java all the time... Here is the
    complete stacktrace:

    Exception in thread "main"
    org.apache.flink.client.program.ProgramInvocationException: The
    program execution failed: Job execution failed.
        at
    org.apache.flink.client.program.Client.runBlocking(Client.java:367)
        at
    org.apache.flink.client.program.Client.runBlocking(Client.java:345)
        at
    org.apache.flink.client.program.Client.runBlocking(Client.java:312)
        at
    
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
        at
    org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
        at
    
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
        at
    
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
        at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
        at
    x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
    Caused by: org.apache.flink.runtime.client.JobExecutionException:
    Job execution failed.
        at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
        at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at
    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at
    
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at
    scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at
    
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at
    scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
    
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at
    
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at
    scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
    
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.lang.Exception: The data preparation for task
    'CHAIN Join(Join at main(PmcProcessor.java:103)) -> FlatMap
    (collect())' , caused an error: Unsupported driver strategy for
    join driver: CO_GROUP_RAW
        at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at
    org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.Exception: Unsupported driver strategy for
    join driver: CO_GROUP_RAW
        at
    org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)
        at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more


    Am 09.02.2016 um 21:03 schrieb Fabian Hueske:
    Hi,
    glad you could resolve the POJO issue, but the new error doesn't
    look right.
    The CO_GROUP_RAW strategy should only be used for programs that
    are implemented against the Python DataSet API.
    I guess that's not the case since all code snippets were Java so
    far.

    Can you post the full stacktrace of the exception?

    2016-02-09 20:13 GMT+01:00 Dominique Rondé
    <dominique.ro...@codecentric.de
    <mailto:dominique.ro...@codecentric.de>>:

        Hi all,

        i finally figured out that there is a getter for a boolean
        field which may be the source of the trouble. It seems that
        getBooleanField (as we use it) is not the best choice. Now
        the plan is executed with another error code. :(

        Caused by: java.lang.Exception: Unsupported driver strategy
        for join driver: CO_GROUP_RAW

        Is there any link to a documentation or some example code
        which you  may recommend beside the offical documentation?

        But folks, thanks for your greate support! A really nice
        community here!

        Greets
        Dominique


        Am 09.02.2016 um 19:41 schrieb Till Rohrmann:

        I tested the |TypeExtractor| with your |SourceA| and
        |SourceB| types (adding proper setters and getters) and it
        correctly returned a |PojoType|. Thus, I would suspect that
        you haven’t specified the proper setters and getters in your
        implementation.

        Cheers,
        Till

        ​

        On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé
        <dominique.ro...@codecentric.de
        <mailto:dominique.ro...@codecentric.de>> wrote:

            Here we go!

              ExecutionEnvironment env =
            ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
            53408,"flink-job.jar");


              DataSource<String> datasourceA=
            env.readTextFile("hdfs://dev//sourceA/");
              DataSource<String> datasourceB=
            env.readTextFile("hdfs://dev//sourceB/");

              DataSet<SourceA> sourceA= datasourceA.map(new
            SourceAMapper());
              DataSet<SourceB> sourceB= datasourceB.map(new
            SourceBMapper());

            
sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();

            Thanks a lot!
            Dominique


            Am 09.02.2016 um 14:36 schrieb Till Rohrmann:

            Could you post the complete example code (Flink job
            including the type definitions). For example, if the
            data sets are of type |DataSet<Parent>|, then it will
            be treated as a |GenericType|. Judging from your pseudo
            code, it looks fine on the first glance.

            Cheers,
            Till

            ​

            On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé
            <dominique.ro...@codecentric.de
            <mailto:dominique.ro...@codecentric.de>> wrote:

                Sorry, i was out for lunch. Maybe the problem is
                that sessionID is a String?

                public abstract class Parent{
                  private Date eventDate;
                  private EventType eventType;
                  private String sessionId;

                public Parent() { }
                //GETTER & SETTER
                }

                public class SourceA extends Parent{
                  private Boolean outboundMessage;
                  private String soapMessage;

                public SourceA () {
                    super();
                 }
                //GETTER & SETTER
                }

                public class SourceB extends Parent{
                  private Integer id;
                  private String username;

                public SourceB () {
                    super();
                 }
                //GETTER & SETTER

                }

                Am 09.02.2016 um 12:06 schrieb Till Rohrmann:

                Could you share the code for your types |SourceA|
                and |SourceB|. It seems as if Flink does not
                recognize them to be POJOs because he assigned
                them the |GenericType| type. Either there is
                something wrong with the type extractor or your
                implementation does not fulfil the requirements
                for POJOs, as indicated by Chiwan.

                Cheers,
                Till

                ​

                On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé
                <dominique.ro...@codecentric.de
                <mailto:dominique.ro...@codecentric.de>> wrote:

                    The fields in SourceA and SourceB are private
                    but have public getters and setters. The
                    classes provide an empty and public constructor.

                    Am 09.02.2016 11:47 schrieb "Chiwan Park"
                    <chiwanp...@apache.org
                    <mailto:chiwanp...@apache.org>>:

                        Oh, the fields in SourceA have public
                        getters. Does the fields in SourceA have
                        public setter? SourceA needs public setter
                        for private fields.

                        Regards,
                        Chiwan Park

                        > On Feb 9, 2016, at 7:45 PM, Chiwan Park
                        <chiwanp...@apache.org
                        <mailto:chiwanp...@apache.org>> wrote:
                        >
                        > Hi Dominique,
                        >
                        > It seems that `SourceA` is not dealt as
                        POJO. Are all fields in SourceA public?
                        There are some requirements for POJO
                        classes [1].
                        >
                        > [1]:
                        
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
                        >
                        > Regards,
                        > Chiwan Park
                        >
                        >> On Feb 9, 2016, at 7:42 PM, Dominique
                        Rondé <dominique.ro...@codecentric.de
                        <mailto:dominique.ro...@codecentric.de>>
                        wrote:
                        >>
                        >> Hi folks,
                        >>
                        >> i try to join two datasets containing
                        some PoJos. Each PoJo inherit a field
                        "sessionId" from the parent class. The
                        field is private but has a public getter.
                        >>
                        >> The join is like this:
                        >> DataSet<Tuple2<SourceA,SourceB>>
                        joinedDataSet =
                        
sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
                        >>
                        >> But the result is the following execption:
                        >>
                        >> Exception in thread "main"
                        org.apache.flink.api.common.InvalidProgramException:
                        This type
                        (GenericType<x.y.z.service.eventstore.dto.SourceA>)
                        cannot be used as key.
                        >>    at
                        
org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
                        >>    at
                        
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
                        >>    at
                        
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
                        >>
                        >> I spend some time with google around
                        but I don't get an idea what is wrong. I
                        hope some of you can give me a hint...
                        >>
                        >> Greets
                        >> Dominique
                        >>
                        >



-- Dominique Rondé | Senior Consultant

                codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | 
Deutschland
                mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
                Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht 
Wuppertal
                Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
                Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . 
Jürgen Schütz



-- Dominique Rondé | Senior Consultant

            codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | 
Deutschland
            mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
            Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
            Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
            Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . 
Jürgen Schütz



-- Dominique Rondé | Senior Consultant

        codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
        mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
        Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
        Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
        Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen 
Schütz



-- Dominique Rondé | Senior Consultant

    codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
    mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
    Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
    Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
    Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz



--
Dominique Rondé | Senior Consultant

codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil: +49 (0) 172.7182592
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | 
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Reply via email to