Makes sense. We made it work with the Tuple2 as input to the MapOperator. So 
now the connectTo works, but we still get an exception.

We think that we have located the issue to be related to when the join finds 
two fields with the same value (i.e. the join condition is satisfied). Then we 
get the following exception:

Caused by: java.lang.ClassCastException: class 
org.apache.wayang.basic.data.Tuple2 cannot be cast to class 
org.apache.wayang.basic.data.Record (org.apache.wayang.basic.data.Tuple2 and 
org.apache.wayang.basic.data.Record are in unnamed module of loader 'app')
        at 
org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor$MapFunctionImpl.apply(WayangProjectVisitor.java:48)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
org.apache.wayang.java.operators.JavaLocalCallbackSink.evaluate(JavaLocalCallbackSink.java:72)
        at 
org.apache.wayang.java.execution.JavaExecutor.execute(JavaExecutor.java:82)




-----Original Message-----
From: Jorge Arnulfo Quiané Ruiz <[email protected]> 
Sent: Friday, 21 April 2023 13.58
To: [email protected]
Subject: Re: Problem: Connect Join to Project in the SQL API

Hi Michelle and Kristian,

Yeah this is confusing at first glance because you would expect a join record 
as output, however this would mean that the join operator has to decide the 
kind of join to implement (i.e. left or right join). That’s given the following 
two tuples (matching on the join attribute a) from R and S, respectively:
— <a, b, c, d>
— <e, a, f>
One could have several kind of outputs:
— <a, b, c, d, e, a, f>
— <a, b, c, d, e, f> (without duplicating the join attribute) — <a, b, c, d> — 
<e, a, f> — …..
Therefore, in Wayang we left the decision to the developer and decided to 
output both tuples (Tuple2) as follows:
— <a, b, c, d>, <e, a, f>
The following operators must materialise such a join match.

Therefore, in your case, the following MapOperator should receive a Tuple2 as 
input.

Could you please tell us why changing the input type of the MapOperator is not 
working?

Best,
Jorge

> On 21 Apr 2023, at 11.56, Kristian Reitzel <[email protected]> wrote:
> 
> Hi Dev,
> 
> We are working on the Join Operator in the SQL API.
> 
> After connecting the two table scans of type Record, the output slots of the 
> join operator is of type Tuple2.
> 
> This confuses us since we would expect the result of a join to be of type 
> Record. Furthermore, the projection (which happens after the join) throws the 
> following Exception:
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot connect 
> out@Join[2->1, id=23ad2d17] of DataSetType[BasicDataUnitType[Tuple2]] to 
> in@Map[1->1, id=6bce4140] of type DataSetType[BasicDataUnitType[Record]].
>                at 
> org.apache.wayang.core.plan.wayangplan.Operator.connectTo(Operator.java:206)
>                at 
> org.apache.wayang.api.sql.calcite.converter.WayangProjectVisitor.visit(WayangProjectVisitor.java:42)
>                at 
> org.apache.wayang.api.sql.calcite.converter.WayangRelConverter.convert(WayangRelConverter.java:16)
>                at 
> org.apache.wayang.api.sql.calcite.optimizer.Optimizer.convert(Optimizer.java:202)
>                at 
> org.apache.wayang.api.sql.context.SqlContext.executeSql(SqlContext.java:92)
>                at SqlAPI.examplePostgres(SqlAPI.java:46)
>                at SqlAPI.main(SqlAPI.java:77)
> 
> We tried changing the Wayang MapOperator inputTypeClass to be a Tuple2, but 
> this does not fix the problem.
> 
> Is there anyone that can explain why the JoinOperator outputs a Tuple2 and 
> perhaps also how we then connect the output of the join to the input of the 
> projection?
> 
> Best,
> Michelle and Kristian
> 

Reply via email to