Yes, let's discuss this on Thursday. I am also in the process of putting
some other things in order.

Best,
Kaustubh

On Tue, Apr 25, 2023, 17:15 Jorge Arnulfo Quiané Ruiz <[email protected]>
wrote:

> +1
> Perhaps a point to add to the agenda for this Thursday’s meeting ;)
>
> > On 25 Apr 2023, at 13.42, Kristian Reitzel <[email protected]> wrote:
> >
> > Agree, that was my thinking as well. But yes, it seems that there are
> some design decisions to be made here.
> >
> > -----Original Message-----
> > From: Zoi Kaoudi <[email protected]>
> > Sent: Tuesday, 25 April 2023 13.38
> > To: [email protected]
> > Subject: Re: Problem: Connect Join to Project in the SQL API
> >
> > Ah ok, I thought you had a project before the join.
> >
> > So I think you need to use a map operator to convert the output of the
> join that is a Tuple2 to a Record and then feed it to the project operator.
> Like unnesting the tuple2 with values: v1 and v2 to a record of value v1+v2
> where + means concatenation.
> > This should work imo.
> > Still, we have to discuss what kind of output we want the join SQL api
> operator to output, meaning how the output record of the join should be
> formed.
> >
> >    Στις Τρίτη 25 Απριλίου 2023 στις 01:31:10 μ.μ. CEST, ο χρήστης
> Kristian Reitzel <[email protected]> έγραψε:
> >
> > Hmm, not entirely.. I was under the impression that the join happened
> before the projection. Have I misunderstood that? At least that also seems
> to be the logical plan that is printed (2x table scan -> join -> project).
> >
> > Best,
> > Kristian
> >
> > -----Original Message-----
> > From: Zoi Kaoudi <[email protected]>
> > Sent: Tuesday, 25 April 2023 13.23
> > To: [email protected]
> > Subject: Re: Problem: Connect Join to Project in the SQL API
> >
> > So from what I saw in the WayangProjectVisitor, the project operator
> will output a dataset of Record.
> > Then, most probably you need another map operator to convert this to a
> Tuple2 that will contain the record and the key and then use the join
> operator. Does this make sense?
> >
> >     Στις Τρίτη 25 Απριλίου 2023 στις 01:14:45 μ.μ. CEST, ο χρήστης Zoi
> Kaoudi <[email protected]> έγραψε:
> >
> >   The keyExtractor needs to output the value, not the index. This is
> because a user can define the key in some other way if necessary (could
> even do some processing on the input and then output the key).
> > Could you maybe point us to the lines of code that seem problematic? Or
> paste them here?
> >
> > I will take a look at the WayangProjectVisitor class and if I see sth I
> will let you know.
> > Best
> > --
> > Zoi
> >
> >     Στις Τρίτη 25 Απριλίου 2023 στις 01:07:51 μ.μ. CEST, ο χρήστης
> Kristian Reitzel <[email protected]> έγραψε:
> >
> > Thank you for your response, Zoi and Jorge.
> >
> > I have been looking into this, but yet haven't solved it..
> >
> > So the problem might be in the WayangProjectVisitor.java in the SQL Api?
> >
> > In the WayangProjectVisitor.java, the Apply function for the MapOperator
> will only handle a Record, but it should be able to take a Tuple2<Tuple2,
> Tuple2> and return a Record ?
> >
> > Also, can you perhaps help me verify what the KeyExtractor method should
> return? Is it the index of the key or the value of the key? I would assume
> it to be the index (an Integer).
> >
> > Best!
> >
> > -----Original Message-----
> > From: Zoi Kaoudi <[email protected]>
> > Sent: Tuesday, 25 April 2023 12.13
> > To: [email protected]
> > Subject: Re: Problem: Connect Join to Project in the SQL API
> >
> > I think Jorge is correct about the output being a Tuple2 consisted of
> the two relations tuple that has been joined.
> >
> > Taking a look at the JavaJoinOperatorTest.java and
> SparkJoinOperatorTest.java if you define a join operator as
> SparkJoinOperator<Tuple2, Tuple2, Integer> the output will be a
> Tuple2<Tuple2, Tuple2>.
> >
> > So I assume in your case you need a join operator that receives as input
> the types <Record, Record, TypeofKey>. Maybe the order is different in the
> api.
> > Best
> > --
> > Zoi
> >
> >     Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης
> jorge Arnulfo Quiané Ruiz <[email protected]> έγραψε:
> >
> > Hi guys,
> >
> > Please check if it is indeed tuple1 and tuple2, because I might have
> confused and it is a key, values instead, where the key is the join key.
> > This might affect your casting to record
> >
> > —
> > Jorge
> >
> > On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]>
> > wrote:
> >
> >> Makes sense. We made it work with the Tuple2 as input to the
> MapOperator.
> >> So now the connectTo works, but we still get an exception.
> >>
> >> We think that we have located the issue to be related to when the join
> >> finds two fields with the same value (i.e. the join condition is
> >> satisfied). Then we get the following exception:
> >>
> >> Caused by: java.lang.ClassCastException: class
> >> org.apache.wayang.basic.data.Tuple2 cannot be cast to class
> >> org.apache.wayang.basic.data.Record
> >> (org.apache.wayang.basic.data.Tuple2
> >> and org.apache.wayang.basic.data.Record are in unnamed module of
> >> loader
> >> 'app')
> >>         at
> >>
> >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFun
> >> ctionImpl.apply(WayangProjectVisitor.java:48)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli
> >> ne.java:195)
> >>         at
> >>
> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps
> >> .java:183)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli
> >> ne.java:195)
> >>         at
> >>
> >> java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedLis
> >> t.java:1239)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j
> >> ava:484)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip
> >> eline.java:474)
> >>         at
> >>
> >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE
> >> achOps.java:150)
> >>         at
> >>
> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia
> >> l(ForEachOps.java:173)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j
> >> ava:234)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
> >> java:497)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeli
> >> ne.java:274)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeli
> >> ne.java:177)
> >>         at
> >> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> >>         at
> >>
> >> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(S
> >> pliterators.java:1801)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j
> >> ava:484)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip
> >> eline.java:474)
> >>         at
> >>
> >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE
> >> achOps.java:150)
> >>         at
> >>
> >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia
> >> l(ForEachOps.java:173)
> >>         at
> >>
> >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j
> >> ava:234)
> >>         at
> >>
> >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
> >> java:497)
> >>         at
> >>
> >> org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLoc
> >> alCallbackSink.java:72)
> >>         at
> >>
> >> org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java
> >> :82)
> >>
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: Jorge Arnulfo Quiané Ruiz <[email protected]>
> >> Sent: Friday, 21 April 2023 13.58
> >> To: [email protected]
> >> Subject: Re: Problem: Connect Join to Project in the SQL API
> >>
> >> Hi Michelle and Kristian,
> >>
> >> Yeah this is confusing at first glance because you would expect a join
> >> record as output, however this would mean that the join operator has
> >> to decide the kind of join to implement (i.e. left or right join).
> >> That’s given the following two tuples (matching on the join attribute
> >> a) from R and S, respectively:
> >> — <a, b, c, d>
> >> — <e, a, f>
> >> One could have several kind of outputs:
> >> — <a, b, c, d, e, a, f>
> >> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b,
> >> c,
> >> d> — <e, a, f> — …..
> >> Therefore, in Wayang we left the decision to the developer and decided
> >> to output both tuples (Tuple2) as follows:
> >> — <a, b, c, d>, <e, a, f>
> >> The following operators must materialise such a join match.
> >>
> >> Therefore, in your case, the following MapOperator should receive a
> >> Tuple2 as input.
> >>
> >> Could you please tell us why changing the input type of the
> >> MapOperator is not working?
> >>
> >> Best,
> >> Jorge
> >>
> >>> On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]>
> wrote:
> >>>
> >>> Hi Dev,
> >>>
> >>> We are working on the Join Operator in the SQL API.
> >>>
> >>> After connecting the two table scans of type Record, the output
> >>> slots of
> >> the join operator is of type Tuple2.
> >>>
> >>> This confuses us since we would expect the result of a join to be of
> >> type Record. Furthermore, the projection (which happens after the
> >> join) throws the following Exception:
> >>>
> >>> Exception in thread "main" java.lang.IllegalArgumentException:
> >>> Cannot
> >> connect out@Join[2->1, id=23ad2d17] of
> >> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of
> >> type DataSetType[BasicDataUnitType[Record]].
> >>>                 at
> >> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.jav
> >> a:206)
> >>>                 at
> >> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit
> >> (WayangProjectVisitor.java:42)
> >>>                 at
> >> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert
> >> (WayangRelConverter.java:16)
> >>>                 at
> >> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimize
> >> r.java:202)
> >>>                 at
> >> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.jav
> >> a:92)
> >>>                 at SqlAPI.examplePostgres(SqlAPI.java:46)
> >>>                 at SqlAPI.main(SqlAPI.java:77)
> >>>
> >>> We tried changing the Wayang MapOperator inputTypeClass to be a
> >>> Tuple2,
> >> but this does not fix the problem.
> >>>
> >>> Is there anyone that can explain why the JoinOperator outputs a
> >>> Tuple2
> >> and perhaps also how we then connect the output of the join to the
> >> input of the projection?
> >>>
> >>> Best,
> >>> Michelle and Kristian
> >>>
> >>
> >>
> >
> >
> >
>
>

Reply via email to