Btw, why are you projecting after the join? The project should happen before IMHO
> On 25 Apr 2023, at 13.30, Kristian Reitzel <[email protected]> wrote: > > Hmm, not entirely.. I was under the impression that the join happened before > the projection. Have I misunderstood that? At least that also seems to be the > logical plan that is printed (2x table scan -> join -> project). > > Best, > Kristian > > -----Original Message----- > From: Zoi Kaoudi <[email protected]> > Sent: Tuesday, 25 April 2023 13.23 > To: [email protected] > Subject: Re: Problem: Connect Join to Project in the SQL API > > So from what I saw in the WayangProjectVisitor, the project operator will > output a dataset of Record. > Then, most probably you need another map operator to convert this to a Tuple2 > that will contain the record and the key and then use the join operator. Does > this make sense? > > Στις Τρίτη 25 Απριλίου 2023 στις 01:14:45 μ.μ. CEST, ο χρήστης Zoi Kaoudi > <[email protected]> έγραψε: > > The keyExtractor needs to output the value, not the index. This is because a > user can define the key in some other way if necessary (could even do some > processing on the input and then output the key). > Could you maybe point us to the lines of code that seem problematic? Or paste > them here? > > I will take a look at the WayangProjectVisitor class and if I see sth I will > let you know. > Best > -- > Zoi > > Στις Τρίτη 25 Απριλίου 2023 στις 01:07:51 μ.μ. CEST, ο χρήστης Kristian > Reitzel <[email protected]> έγραψε: > > Thank you for your response, Zoi and Jorge. > > I have been looking into this, but yet haven't solved it.. > > So the problem might be in the WayangProjectVisitor.java in the SQL Api? > > In the WayangProjectVisitor.java, the Apply function for the MapOperator will > only handle a Record, but it should be able to take a Tuple2<Tuple2, Tuple2> > and return a Record ? > > Also, can you perhaps help me verify what the KeyExtractor method should > return? Is it the index of the key or the value of the key? I would assume it > to be the index (an Integer). > > Best! > > -----Original Message----- > From: Zoi Kaoudi <[email protected]> > Sent: Tuesday, 25 April 2023 12.13 > To: [email protected] > Subject: Re: Problem: Connect Join to Project in the SQL API > > I think Jorge is correct about the output being a Tuple2 consisted of the two > relations tuple that has been joined. > > Taking a look at the JavaJoinOperatorTest.java and SparkJoinOperatorTest.java > if you define a join operator as SparkJoinOperator<Tuple2, Tuple2, Integer> > the output will be a Tuple2<Tuple2, Tuple2>. > > So I assume in your case you need a join operator that receives as input the > types <Record, Record, TypeofKey>. Maybe the order is different in the api. > Best > -- > Zoi > > Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης jorge > Arnulfo Quiané Ruiz <[email protected]> έγραψε: > > Hi guys, > > Please check if it is indeed tuple1 and tuple2, because I might have confused > and it is a key, values instead, where the key is the join key. > This might affect your casting to record > > — > Jorge > > On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]> > wrote: > >> Makes sense. We made it work with the Tuple2 as input to the MapOperator. >> So now the connectTo works, but we still get an exception. >> >> We think that we have located the issue to be related to when the join >> finds two fields with the same value (i.e. the join condition is >> satisfied). Then we get the following exception: >> >> Caused by: java.lang.ClassCastException: class >> org.apache.wayang.basic.data.Tuple2 cannot be cast to class >> org.apache.wayang.basic.data.Record >> (org.apache.wayang.basic.data.Tuple2 >> and org.apache.wayang.basic.data.Record are in unnamed module of >> loader >> 'app') >> at >> >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFun >> ctionImpl.apply(WayangProjectVisitor.java:48) >> at >> >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli >> ne.java:195) >> at >> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps >> .java:183) >> at >> >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli >> ne.java:195) >> at >> >> java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedLis >> t.java:1239) >> at >> >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j >> ava:484) >> at >> >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip >> eline.java:474) >> at >> >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE >> achOps.java:150) >> at >> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia >> l(ForEachOps.java:173) >> at >> >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j >> ava:234) >> at >> >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline. >> java:497) >> at >> >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeli >> ne.java:274) >> at >> >> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeli >> ne.java:177) >> at >> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) >> at >> >> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(S >> pliterators.java:1801) >> at >> >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j >> ava:484) >> at >> >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip >> eline.java:474) >> at >> >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE >> achOps.java:150) >> at >> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia >> l(ForEachOps.java:173) >> at >> >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j >> ava:234) >> at >> >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline. >> java:497) >> at >> >> org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLoc >> alCallbackSink.java:72) >> at >> >> org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java >> :82) >> >> >> >> >> -----Original Message----- >> From: Jorge Arnulfo Quiané Ruiz <[email protected]> >> Sent: Friday, 21 April 2023 13.58 >> To: [email protected] >> Subject: Re: Problem: Connect Join to Project in the SQL API >> >> Hi Michelle and Kristian, >> >> Yeah this is confusing at first glance because you would expect a join >> record as output, however this would mean that the join operator has >> to decide the kind of join to implement (i.e. left or right join). >> That’s given the following two tuples (matching on the join attribute >> a) from R and S, respectively: >> — <a, b, c, d> >> — <e, a, f> >> One could have several kind of outputs: >> — <a, b, c, d, e, a, f> >> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b, >> c, >> d> — <e, a, f> — ….. >> Therefore, in Wayang we left the decision to the developer and decided >> to output both tuples (Tuple2) as follows: >> — <a, b, c, d>, <e, a, f> >> The following operators must materialise such a join match. >> >> Therefore, in your case, the following MapOperator should receive a >> Tuple2 as input. >> >> Could you please tell us why changing the input type of the >> MapOperator is not working? >> >> Best, >> Jorge >> >>> On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote: >>> >>> Hi Dev, >>> >>> We are working on the Join Operator in the SQL API. >>> >>> After connecting the two table scans of type Record, the output >>> slots of >> the join operator is of type Tuple2. >>> >>> This confuses us since we would expect the result of a join to be of >> type Record. Furthermore, the projection (which happens after the >> join) throws the following Exception: >>> >>> Exception in thread "main" java.lang.IllegalArgumentException: >>> Cannot >> connect out@Join[2->1, id=23ad2d17] of >> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of >> type DataSetType[BasicDataUnitType[Record]]. >>> at >> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.jav >> a:206) >>> at >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit >> (WayangProjectVisitor.java:42) >>> at >> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert >> (WayangRelConverter.java:16) >>> at >> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimize >> r.java:202) >>> at >> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.jav >> a:92) >>> at SqlAPI.examplePostgres(SqlAPI.java:46) >>> at SqlAPI.main(SqlAPI.java:77) >>> >>> We tried changing the Wayang MapOperator inputTypeClass to be a >>> Tuple2, >> but this does not fix the problem. >>> >>> Is there anyone that can explain why the JoinOperator outputs a >>> Tuple2 >> and perhaps also how we then connect the output of the join to the >> input of the projection? >>> >>> Best, >>> Michelle and Kristian >>> >> >> > >
