We assumed this order due to these prints, when we run the SqlAPI test:
After parsing sql query:
6:LogicalProject(id=[$0], title=[$1], genre=[$11]): rowcount = 1500.0,
cumulative cost = 3200.0
5:LogicalJoin(condition=[=($0, $10)], joinType=[inner]): rowcount = 1500.0,
cumulative cost = 1700.0
1:LogicalTableScan(table=[[postgres, movie]]): rowcount = 100.0, cumulative
cost = 100.0
3:LogicalTableScan(table=[[postgres, movie_genre]]): rowcount = 100.0,
cumulative cost = 100.0
After translating logical intermediate plan:
23:WayangProject(id=[$0], title=[$1], genre=[$11]): rowcount = 1500.0,
cumulative cost = 3200.0
22:WayangJoin(condition=[=($0, $10)], joinType=[inner]): rowcount = 1500.0,
cumulative cost = 1700.0
15:WayangTableScan(table=[[postgres, movie]]): rowcount = 100.0, cumulative
cost = 100.0
17:WayangTableScan(table=[[postgres, movie_genre]]): rowcount = 100.0,
cumulative cost = 100.0
-----Original Message-----
From: Jorge Arnulfo Quiané Ruiz <[email protected]>
Sent: Tuesday, 25 April 2023 13.35
To: [email protected]
Subject: Re: Problem: Connect Join to Project in the SQL API
Btw, why are you projecting after the join? The project should happen before
IMHO
> On 25 Apr 2023, at 13.30, Kristian Reitzel <[email protected]> wrote:
>
> Hmm, not entirely.. I was under the impression that the join happened before
> the projection. Have I misunderstood that? At least that also seems to be the
> logical plan that is printed (2x table scan -> join -> project).
>
> Best,
> Kristian
>
> -----Original Message-----
> From: Zoi Kaoudi <[email protected]>
> Sent: Tuesday, 25 April 2023 13.23
> To: [email protected]
> Subject: Re: Problem: Connect Join to Project in the SQL API
>
> So from what I saw in the WayangProjectVisitor, the project operator will
> output a dataset of Record.
> Then, most probably you need another map operator to convert this to a Tuple2
> that will contain the record and the key and then use the join operator. Does
> this make sense?
>
> Στις Τρίτη 25 Απριλίου 2023 στις 01:14:45 μ.μ. CEST, ο χρήστης Zoi Kaoudi
> <[email protected]> έγραψε:
>
> The keyExtractor needs to output the value, not the index. This is because a
> user can define the key in some other way if necessary (could even do some
> processing on the input and then output the key).
> Could you maybe point us to the lines of code that seem problematic? Or paste
> them here?
>
> I will take a look at the WayangProjectVisitor class and if I see sth I will
> let you know.
> Best
> --
> Zoi
>
> Στις Τρίτη 25 Απριλίου 2023 στις 01:07:51 μ.μ. CEST, ο χρήστης Kristian
> Reitzel <[email protected]> έγραψε:
>
> Thank you for your response, Zoi and Jorge.
>
> I have been looking into this, but yet haven't solved it..
>
> So the problem might be in the WayangProjectVisitor.java in the SQL Api?
>
> In the WayangProjectVisitor.java, the Apply function for the MapOperator will
> only handle a Record, but it should be able to take a Tuple2<Tuple2, Tuple2>
> and return a Record ?
>
> Also, can you perhaps help me verify what the KeyExtractor method should
> return? Is it the index of the key or the value of the key? I would assume it
> to be the index (an Integer).
>
> Best!
>
> -----Original Message-----
> From: Zoi Kaoudi <[email protected]>
> Sent: Tuesday, 25 April 2023 12.13
> To: [email protected]
> Subject: Re: Problem: Connect Join to Project in the SQL API
>
> I think Jorge is correct about the output being a Tuple2 consisted of the two
> relations tuple that has been joined.
>
> Taking a look at the JavaJoinOperatorTest.java and SparkJoinOperatorTest.java
> if you define a join operator as SparkJoinOperator<Tuple2, Tuple2, Integer>
> the output will be a Tuple2<Tuple2, Tuple2>.
>
> So I assume in your case you need a join operator that receives as input the
> types <Record, Record, TypeofKey>. Maybe the order is different in the api.
> Best
> --
> Zoi
>
> Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης jorge
> Arnulfo Quiané Ruiz <[email protected]> έγραψε:
>
> Hi guys,
>
> Please check if it is indeed tuple1 and tuple2, because I might have confused
> and it is a key, values instead, where the key is the join key.
> This might affect your casting to record
>
> —
> Jorge
>
> On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]>
> wrote:
>
>> Makes sense. We made it work with the Tuple2 as input to the MapOperator.
>> So now the connectTo works, but we still get an exception.
>>
>> We think that we have located the issue to be related to when the
>> join finds two fields with the same value (i.e. the join condition is
>> satisfied). Then we get the following exception:
>>
>> Caused by: java.lang.ClassCastException: class
>> org.apache.wayang.basic.data.Tuple2 cannot be cast to class
>> org.apache.wayang.basic.data.Record
>> (org.apache.wayang.basic.data.Tuple2
>> and org.apache.wayang.basic.data.Record are in unnamed module of
>> loader
>> 'app')
>> at
>>
>> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapF
>> un
>> ctionImpl.apply(WayangProjectVisitor.java:48)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipe
>> li
>> ne.java:195)
>> at
>>
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachO
>> ps
>> .java:183)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipe
>> li
>> ne.java:195)
>> at
>>
>> java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedL
>> is
>> t.java:1239)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline
>> .j
>> ava:484)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractP
>> ip
>> eline.java:474)
>> at
>>
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Fo
>> rE
>> achOps.java:150)
>> at
>>
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequent
>> ia
>> l(ForEachOps.java:173)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline
>> .j
>> ava:234)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
>> java:497)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipe
>> li
>> ne.java:274)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipe
>> li
>> ne.java:177)
>> at
>> java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>> at
>>
>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining
>> (S
>> pliterators.java:1801)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline
>> .j
>> ava:484)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractP
>> ip
>> eline.java:474)
>> at
>>
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Fo
>> rE
>> achOps.java:150)
>> at
>>
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequent
>> ia
>> l(ForEachOps.java:173)
>> at
>>
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline
>> .j
>> ava:234)
>> at
>>
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
>> java:497)
>> at
>>
>> org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaL
>> oc
>> alCallbackSink.java:72)
>> at
>>
>> org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.ja
>> va
>> :82)
>>
>>
>>
>>
>> -----Original Message-----
>> From: Jorge Arnulfo Quiané Ruiz <[email protected]>
>> Sent: Friday, 21 April 2023 13.58
>> To: [email protected]
>> Subject: Re: Problem: Connect Join to Project in the SQL API
>>
>> Hi Michelle and Kristian,
>>
>> Yeah this is confusing at first glance because you would expect a
>> join record as output, however this would mean that the join operator
>> has to decide the kind of join to implement (i.e. left or right join).
>> That’s given the following two tuples (matching on the join attribute
>> a) from R and S, respectively:
>> — <a, b, c, d>
>> — <e, a, f>
>> One could have several kind of outputs:
>> — <a, b, c, d, e, a, f>
>> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a,
>> b, c,
>> d> — <e, a, f> — …..
>> Therefore, in Wayang we left the decision to the developer and
>> decided to output both tuples (Tuple2) as follows:
>> — <a, b, c, d>, <e, a, f>
>> The following operators must materialise such a join match.
>>
>> Therefore, in your case, the following MapOperator should receive a
>> Tuple2 as input.
>>
>> Could you please tell us why changing the input type of the
>> MapOperator is not working?
>>
>> Best,
>> Jorge
>>
>>> On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote:
>>>
>>> Hi Dev,
>>>
>>> We are working on the Join Operator in the SQL API.
>>>
>>> After connecting the two table scans of type Record, the output
>>> slots of
>> the join operator is of type Tuple2.
>>>
>>> This confuses us since we would expect the result of a join to be of
>> type Record. Furthermore, the projection (which happens after the
>> join) throws the following Exception:
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>> Cannot
>> connect out@Join[2->1, id=23ad2d17] of
>> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140]
>> of type DataSetType[BasicDataUnitType[Record]].
>>> at
>> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.ja
>> v
>> a:206)
>>> at
>> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visi
>> t
>> (WayangProjectVisitor.java:42)
>>> at
>> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.conver
>> t
>> (WayangRelConverter.java:16)
>>> at
>> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimiz
>> e
>> r.java:202)
>>> at
>> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.ja
>> v
>> a:92)
>>> at SqlAPI.examplePostgres(SqlAPI.java:46)
>>> at SqlAPI.main(SqlAPI.java:77)
>>>
>>> We tried changing the Wayang MapOperator inputTypeClass to be a
>>> Tuple2,
>> but this does not fix the problem.
>>>
>>> Is there anyone that can explain why the JoinOperator outputs a
>>> Tuple2
>> and perhaps also how we then connect the output of the join to the
>> input of the projection?
>>>
>>> Best,
>>> Michelle and Kristian
>>>
>>
>>
>
>