Ah ok, I thought you had a project before the join.
So I think you need to use a map operator to convert the output of the join
that is a Tuple2 to a Record and then feed it to the project operator. Like
unnesting the tuple2 with values: v1 and v2 to a record of value v1+v2 where +
means concatenation.
This should work imo.
Still, we have to discuss what kind of output we want the join SQL api operator
to output, meaning how the output record of the join should be formed.
Στις Τρίτη 25 Απριλίου 2023 στις 01:31:10 μ.μ. CEST, ο χρήστης Kristian
Reitzel <[email protected]> έγραψε:
Hmm, not entirely.. I was under the impression that the join happened before
the projection. Have I misunderstood that? At least that also seems to be the
logical plan that is printed (2x table scan -> join -> project).
Best,
Kristian
-----Original Message-----
From: Zoi Kaoudi <[email protected]>
Sent: Tuesday, 25 April 2023 13.23
To: [email protected]
Subject: Re: Problem: Connect Join to Project in the SQL API
So from what I saw in the WayangProjectVisitor, the project operator will
output a dataset of Record.
Then, most probably you need another map operator to convert this to a Tuple2
that will contain the record and the key and then use the join operator. Does
this make sense?
Στις Τρίτη 25 Απριλίου 2023 στις 01:14:45 μ.μ. CEST, ο χρήστης Zoi Kaoudi
<[email protected]> έγραψε:
The keyExtractor needs to output the value, not the index. This is because a
user can define the key in some other way if necessary (could even do some
processing on the input and then output the key).
Could you maybe point us to the lines of code that seem problematic? Or paste
them here?
I will take a look at the WayangProjectVisitor class and if I see sth I will
let you know.
Best
--
Zoi
Στις Τρίτη 25 Απριλίου 2023 στις 01:07:51 μ.μ. CEST, ο χρήστης Kristian
Reitzel <[email protected]> έγραψε:
Thank you for your response, Zoi and Jorge.
I have been looking into this, but yet haven't solved it..
So the problem might be in the WayangProjectVisitor.java in the SQL Api?
In the WayangProjectVisitor.java, the Apply function for the MapOperator will
only handle a Record, but it should be able to take a Tuple2<Tuple2, Tuple2>
and return a Record ?
Also, can you perhaps help me verify what the KeyExtractor method should
return? Is it the index of the key or the value of the key? I would assume it
to be the index (an Integer).
Best!
-----Original Message-----
From: Zoi Kaoudi <[email protected]>
Sent: Tuesday, 25 April 2023 12.13
To: [email protected]
Subject: Re: Problem: Connect Join to Project in the SQL API
I think Jorge is correct about the output being a Tuple2 consisted of the two
relations tuple that has been joined.
Taking a look at the JavaJoinOperatorTest.java and SparkJoinOperatorTest.java
if you define a join operator as SparkJoinOperator<Tuple2, Tuple2, Integer> the
output will be a Tuple2<Tuple2, Tuple2>.
So I assume in your case you need a join operator that receives as input the
types <Record, Record, TypeofKey>. Maybe the order is different in the api.
Best
--
Zoi
Στις Σάββατο 22 Απριλίου 2023 στις 09:14:59 μ.μ. CEST, ο χρήστης jorge
Arnulfo Quiané Ruiz <[email protected]> έγραψε:
Hi guys,
Please check if it is indeed tuple1 and tuple2, because I might have confused
and it is a key, values instead, where the key is the join key.
This might affect your casting to record
—
Jorge
On Fri, 21 Apr 2023 at 2:14 PM Kristian Reitzel <[email protected]>
wrote:
> Makes sense. We made it work with the Tuple2 as input to the MapOperator.
> So now the connectTo works, but we still get an exception.
>
> We think that we have located the issue to be related to when the join
> finds two fields with the same value (i.e. the join condition is
> satisfied). Then we get the following exception:
>
> Caused by: java.lang.ClassCastException: class
> org.apache.wayang.basic.data.Tuple2 cannot be cast to class
>org.apache.wayang.basic.data.Record
>(org.apache.wayang.basic.data.Tuple2
> and org.apache.wayang.basic.data.Record are in unnamed module of
>loader
> 'app')
> at
>
>org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFun
>ctionImpl.apply(WayangProjectVisitor.java:48)
> at
>
>java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli
>ne.java:195)
> at
>
>java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps
>.java:183)
> at
>
>java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeli
>ne.java:195)
> at
>
>java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedLis
>t.java:1239)
> at
>
>java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j
>ava:484)
> at
>
>java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip
>eline.java:474)
> at
>
>java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE
>achOps.java:150)
> at
>
>java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia
>l(ForEachOps.java:173)
> at
>
>java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j
>ava:234)
> at
>
>java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
>java:497)
> at
>
>java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeli
>ne.java:274)
> at
>
>java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeli
>ne.java:177)
> at
>java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> at
>
>java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(S
>pliterators.java:1801)
> at
>
>java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.j
>ava:484)
> at
>
>java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPip
>eline.java:474)
> at
>
>java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForE
>achOps.java:150)
> at
>
>java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequentia
>l(ForEachOps.java:173)
> at
>
>java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.j
>ava:234)
> at
>
>java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.
>java:497)
> at
>
>org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLoc
>alCallbackSink.java:72)
> at
>
>org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java
>:82)
>
>
>
>
> -----Original Message-----
> From: Jorge Arnulfo Quiané Ruiz <[email protected]>
> Sent: Friday, 21 April 2023 13.58
> To: [email protected]
> Subject: Re: Problem: Connect Join to Project in the SQL API
>
> Hi Michelle and Kristian,
>
> Yeah this is confusing at first glance because you would expect a join
> record as output, however this would mean that the join operator has
> to decide the kind of join to implement (i.e. left or right join).
> That’s given the following two tuples (matching on the join attribute
> a) from R and S, respectively:
> — <a, b, c, d>
> — <e, a, f>
> One could have several kind of outputs:
> — <a, b, c, d, e, a, f>
> — <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b,
> c,
> d> — <e, a, f> — …..
> Therefore, in Wayang we left the decision to the developer and decided
> to output both tuples (Tuple2) as follows:
> — <a, b, c, d>, <e, a, f>
> The following operators must materialise such a join match.
>
> Therefore, in your case, the following MapOperator should receive a
> Tuple2 as input.
>
> Could you please tell us why changing the input type of the
> MapOperator is not working?
>
> Best,
> Jorge
>
> > On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote:
> >
> > Hi Dev,
> >
> > We are working on the Join Operator in the SQL API.
> >
> > After connecting the two table scans of type Record, the output
> > slots of
> the join operator is of type Tuple2.
> >
> > This confuses us since we would expect the result of a join to be of
> type Record. Furthermore, the projection (which happens after the
> join) throws the following Exception:
> >
> > Exception in thread "main" java.lang.IllegalArgumentException:
> > Cannot
> connect out@Join[2->1, id=23ad2d17] of
> DataSetType[BasicDataUnitType[Tuple2]] to in@Map[1->1, id=6bce4140] of
> type DataSetType[BasicDataUnitType[Record]].
> > at
> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.jav
> a:206)
> > at
> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit
> (WayangProjectVisitor.java:42)
> > at
> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert
> (WayangRelConverter.java:16)
> > at
> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimize
> r.java:202)
> > at
> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.jav
> a:92)
> > at SqlAPI.examplePostgres(SqlAPI.java:46)
> > at SqlAPI.main(SqlAPI.java:77)
> >
> > We tried changing the Wayang MapOperator inputTypeClass to be a
> > Tuple2,
> but this does not fix the problem.
> >
> > Is there anyone that can explain why the JoinOperator outputs a
> > Tuple2
> and perhaps also how we then connect the output of the join to the
> input of the projection?
> >
> > Best,
> > Michelle and Kristian
> >
>
>