I agree with Aljoscha.
Let's give a good error message and offer a cross operator.

2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <aljos...@apache.org>:

> Yes, that is the idea, but I think in this case the user must be
> protected from an operation that can get ridiculously expensive.
>
> On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <neut...@googlemail.com>
> wrote:
> > I am also against the manual cross method. Isn't it the idea of the table
> > API to hide the actual implementation from the user?
> >
> > Best regards,
> > Felix
> > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <
> till.rohrm...@gmail.com>:
> >
> >> Why not doing two separate joins, union the results and doing a distinct
> >> operation on the combined key?
> >>
> >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <aljos...@apache.org>
> >> wrote:
> >>
> >> > So, the first thing is a "feature" of the Java API that removes
> >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> >> > throw an error because one 0 is removed from the first key.
> >> >
> >> > The second thing is a feature of the Table API where the error message
> >> > is hinting at the problem:
> >> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> >> > || 'nodeID === 'target
> >> >
> >> > The problem is, that this would have to be executed as a cross
> >> > followed by a filter because none of the predicates are equi-join
> >> > predicates that must always be true (because of the OR relation). This
> >> > I don't want to allow, because a cross can be very expensive. I will
> >> > add a jira ticket for adding a manual cross operation to the Table
> >> > API.
> >> >
> >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <
> neut...@googlemail.com>
> >> > wrote:
> >> > > Hi,
> >> > >
> >> > > I want to join two tables in the following way:
> >> > >
> >> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> >> > > case class Community(communityID: Int, nodeID: Int)
> >> > >
> >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> >> > >
> >> > > val communities: DataSet[Community]
> >> > > val weightedEdges: DataSet[WeightedEdge]
> >> > >
> >> > > val communitiesTable = communities.toTable
> >> > > val weightedEdgesTable = weightedEdges.toTable
> >> > >
> >> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> >> > >  .where("nodeID = src && nodeID = target")
> >> > >  .groupBy('communityID)
> >> > >  .select("communityID, weight.sum as
> >> sumTotal").toSet[CommunitySumTotal]
> >> > >
> >> > >
> >> > > but I get this exception:
> >> > >
> >> > > Exception in thread "main"
> >> > > org.apache.flink.api.common.InvalidProgramException: The types of
> the
> >> key
> >> > > fields do not match: The number of specified keys is different.
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >> > > Moreover when I use the following where clause:
> >> > >
> >> > > .where("nodeID = src || nodeID = target")
> >> > >
> >> > > I get another error:
> >> > >
> >> > > Exception in thread "main"
> >> > > org.apache.flink.api.table.ExpressionException: Could not derive
> >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> >> > > 'target.
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >> > >
> >> > >
> >> > > Apart from that the TableApi seems really promising. It's a really
> >> great
> >> > tool.
> >> > >
> >> > > Thank you for your help,
> >> > >
> >> > > Felix
> >> >
> >>
>

Reply via email to