I agree with Aljoscha. Let's give a good error message and offer a cross operator.
2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <[email protected]>: > Yes, that is the idea, but I think in this case the user must be > protected from an operation that can get ridiculously expensive. > > On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[email protected]> > wrote: > > I am also against the manual cross method. Isn't it the idea of the table > > API to hide the actual implementation from the user? > > > > Best regards, > > Felix > > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" < > [email protected]>: > > > >> Why not doing two separate joins, union the results and doing a distinct > >> operation on the combined key? > >> > >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[email protected]> > >> wrote: > >> > >> > So, the first thing is a "feature" of the Java API that removes > >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would > >> > throw an error because one 0 is removed from the first key. > >> > > >> > The second thing is a feature of the Table API where the error message > >> > is hinting at the problem: > >> > Could not derive equi-join predicates for predicate 'nodeID === 'src > >> > || 'nodeID === 'target > >> > > >> > The problem is, that this would have to be executed as a cross > >> > followed by a filter because none of the predicates are equi-join > >> > predicates that must always be true (because of the OR relation). This > >> > I don't want to allow, because a cross can be very expensive. I will > >> > add a jira ticket for adding a manual cross operation to the Table > >> > API. > >> > > >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz < > [email protected]> > >> > wrote: > >> > > Hi, > >> > > > >> > > I want to join two tables in the following way: > >> > > > >> > > case class WeightedEdge(src: Int, target: Int, weight: Double) > >> > > case class Community(communityID: Int, nodeID: Int) > >> > > > >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > >> > > > >> > > val communities: DataSet[Community] > >> > > val weightedEdges: DataSet[WeightedEdge] > >> > > > >> > > val communitiesTable = communities.toTable > >> > > val weightedEdgesTable = weightedEdges.toTable > >> > > > >> > > val sumTotal = communitiesTable.join(weightedEdgesTable) > >> > > .where("nodeID = src && nodeID = target") > >> > > .groupBy('communityID) > >> > > .select("communityID, weight.sum as > >> sumTotal").toSet[CommunitySumTotal] > >> > > > >> > > > >> > > but I get this exception: > >> > > > >> > > Exception in thread "main" > >> > > org.apache.flink.api.common.InvalidProgramException: The types of > the > >> key > >> > > fields do not match: The number of specified keys is different. > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > >> > > Moreover when I use the following where clause: > >> > > > >> > > .where("nodeID = src || nodeID = target") > >> > > > >> > > I get another error: > >> > > > >> > > Exception in thread "main" > >> > > org.apache.flink.api.table.ExpressionException: Could not derive > >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > >> > > 'target. > >> > > > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > >> > > > >> > > > >> > > Apart from that the TableApi seems really promising. It's a really > >> great > >> > tool. > >> > > > >> > > Thank you for your help, > >> > > > >> > > Felix > >> > > >> >
