Yes, let's discuss this on Thursday. I am also in the process of putting some other things in order.
Best, Kaustubh On Tue, Apr 25, 2023, 17:15 Jorge Arnulfo Quiané Ruiz <[email protected]> wrote: > +1 > Perhaps a point to add to the agenda for this Thursday’s meeting ;) > > > On 25 Apr 2023, at 13.42, Kristian Reitzel <[email protected]> wrote: > > > > Agree, that was my thinking as well. But yes, it seems that there are > some design decisions to be made here. > > > > -----Original Message----- > > From: Zoi Kaoudi <[email protected]> > > Sent: Tuesday, 25 April 2023 13.38 > > To: [email protected] > > Subject: Re: Problem: Connect Join to Project in the SQL API > > > > Ah ok, I thought you had a project before the join. > > > > So I think you need to use a map operator to convert the output of the > join that is a Tuple2 to a Record and then feed it to the project operator. > Like unnesting the tuple2 with values: v1 and v2 to a record of value v1+v2 > where + means concatenation. > > This should work imo. > > Still, we have to discuss what kind of output we want the join SQL api > operator to output, meaning how the output record of the join should be > formed. > > > > Στις Τρίτη 25 Απριλίου 2023 στις 01:31:10 μ.μ. CEST, ο χρήστης > Kristian Reitzel <[email protected]> έγραψε: > > > > Hmm, not entirely.. I was under the impression that the join happened > before the projection. Have I misunderstood that? At least that also seems > to be the logical plan that is printed (2x table scan -> join -> project). > > > > Best, > > Kristian > > > > -----Original Message----- > > From: Zoi Kaoudi <[email protected]> > > Sent: Tuesday, 25 April 2023 13.23 > > To: [email protected] > > Subject: Re: Problem: Connect Join to Project in the SQL API > > > > So from what I saw in the WayangProjectVisitor, the project operator > will output a dataset of Record. > > Then, most probably you need another map operator to convert this to a > Tuple2 that will contain the record and the key and then use the join > operator. Does this make sense? > > > > Στις Τρίτη 25 Απριλίου 2023 στις 01:14:45 μ.μ. CEST, ο χρήστης Zoi > Kaoudi <[email protected]> έγραψε: > > > > The keyExtractor needs to output the value, not the index. This is > because a user can define the key in some other way if necessary (could > even do some processing on the input and then output the key). > > Could you maybe point us to the lines of code that seem problematic? Or > paste them here? > > > > I will take a look at the WayangProjectVisitor class and if I see sth I > will let you know. > > Best > > -- > > Zoi > > > > Στις Τρίτη 25 Απριλίου 2023 στις 01:07:51 μ.μ. CEST, ο χρήστης > Kristian Reitzel <[email protected]> έγραψε: > > > > Thank you for your response, Zoi and Jorge. > > > > I have been looking into this, but yet haven't solved it.. > > > > So the problem might be in the WayangProjectVisitor.java in the SQL Api? > > > > In the WayangProjectVisitor.java, the Apply function for the MapOperator > will only handle a Record, but it should be able to take a Tuple2<Tuple2, > Tuple2> and return a Record ? > > > > Also, can you perhaps help me verify what the KeyExtractor method should > return? Is it the index of the key or the value of the key? I would assume > it to be the index (an Integer). > > > > Best! > > > > -----Original Message----- > > From: Zoi Kaoudi <[email protected]> > > Sent: Tuesday, 25 April 2023 12.13 > > To: [email protected] > > Subject: Re: Problem: Connect Join to Project in the SQL API > > > > I think Jorge is correct about the output being a Tuple2 consisted of > the two relations tuple that has been joined. > > > > Taking a look at the JavaJoinOperatorTest.java and > SparkJoinOperatorTest.java if you define a join operator as > SparkJoinOperator<Tuple2, Tuple2, Integer> the output will be a > Tuple2<Tuple2, Tuple2>. > > > > So I assume in your case you need a join operator that receives as input > the types <Record, Record, TypeofKey>. Maybe the order is different in the > api. > > Best > > -- > > Zoi > > > > Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης > jorge Arnulfo Quiané Ruiz <[email protected]> έγραψε: > > > > Hi guys, > > > > Please check if it is indeed tuple1 and tuple2, because I might have > confused and it is a key, values instead, where the key is the join key. > > This might affect your casting to record > > > > — > > Jorge > > > > On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]> > > wrote: > > > >> Makes sense. We made it work with the Tuple2 as input to the > MapOperator. > >> So now the connectTo works, but we still get an exception. > >> > >> We think that we have located the issue to be related to when the join > >> finds two fields with the same value (i.e. the join condition is > >> satisfied). Then we get the following exception: > >> > >> Caused by: java.lang.ClassCastException: class > >> org.apache.wayang.basic.data.Tuple2 cannot be cast to class > >> org.apache.wayang.basic.data.Record > >> (org.apache.wayang.basic.data.Tuple2 > >> and org.apache.wayang.basic.data.Record are in unnamed module of > >> loader > >> 'app') > >> at > >> > >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFun > >> ctionImpl.apply(WayangProjectVisitor.java:48) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli > >> ne.java:195) > >> at > >> > >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps > >> .java:183) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli > >> ne.java:195) > >> at > >> > >> java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedLis > >> t.java:1239) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j > >> ava:484) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip > >> eline.java:474) > >> at > >> > >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE > >> achOps.java:150) > >> at > >> > >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia > >> l(ForEachOps.java:173) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j > >> ava:234) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline. > >> java:497) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeli > >> ne.java:274) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeli > >> ne.java:177) > >> at > >> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) > >> at > >> > >> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(S > >> pliterators.java:1801) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j > >> ava:484) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip > >> eline.java:474) > >> at > >> > >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE > >> achOps.java:150) > >> at > >> > >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia > >> l(ForEachOps.java:173) > >> at > >> > >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j > >> ava:234) > >> at > >> > >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline. > >> java:497) > >> at > >> > >> org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLoc > >> alCallbackSink.java:72) > >> at > >> > >> org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java > >> :82) > >> > >> > >> > >> > >> -----Original Message----- > >> From: Jorge Arnulfo Quiané Ruiz <[email protected]> > >> Sent: Friday, 21 April 2023 13.58 > >> To: [email protected] > >> Subject: Re: Problem: Connect Join to Project in the SQL API > >> > >> Hi Michelle and Kristian, > >> > >> Yeah this is confusing at first glance because you would expect a join > >> record as output, however this would mean that the join operator has > >> to decide the kind of join to implement (i.e. left or right join). > >> That’s given the following two tuples (matching on the join attribute > >> a) from R and S, respectively: > >> — <a, b, c, d> > >> — <e, a, f> > >> One could have several kind of outputs: > >> — <a, b, c, d, e, a, f> > >> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b, > >> c, > >> d> — <e, a, f> — ….. > >> Therefore, in Wayang we left the decision to the developer and decided > >> to output both tuples (Tuple2) as follows: > >> — <a, b, c, d>, <e, a, f> > >> The following operators must materialise such a join match. > >> > >> Therefore, in your case, the following MapOperator should receive a > >> Tuple2 as input. > >> > >> Could you please tell us why changing the input type of the > >> MapOperator is not working? > >> > >> Best, > >> Jorge > >> > >>> On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> > wrote: > >>> > >>> Hi Dev, > >>> > >>> We are working on the Join Operator in the SQL API. > >>> > >>> After connecting the two table scans of type Record, the output > >>> slots of > >> the join operator is of type Tuple2. > >>> > >>> This confuses us since we would expect the result of a join to be of > >> type Record. Furthermore, the projection (which happens after the > >> join) throws the following Exception: > >>> > >>> Exception in thread "main" java.lang.IllegalArgumentException: > >>> Cannot > >> connect out@Join[2->1, id=23ad2d17] of > >> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of > >> type DataSetType[BasicDataUnitType[Record]]. > >>> at > >> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.jav > >> a:206) > >>> at > >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit > >> (WayangProjectVisitor.java:42) > >>> at > >> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert > >> (WayangRelConverter.java:16) > >>> at > >> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimize > >> r.java:202) > >>> at > >> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.jav > >> a:92) > >>> at SqlAPI.examplePostgres(SqlAPI.java:46) > >>> at SqlAPI.main(SqlAPI.java:77) > >>> > >>> We tried changing the Wayang MapOperator inputTypeClass to be a > >>> Tuple2, > >> but this does not fix the problem. > >>> > >>> Is there anyone that can explain why the JoinOperator outputs a > >>> Tuple2 > >> and perhaps also how we then connect the output of the join to the > >> input of the projection? > >>> > >>> Best, > >>> Michelle and Kristian > >>> > >> > >> > > > > > > > >
