Hi, I want to join two tables in the following way:
case class WeightedEdge(src: Int, target: Int, weight: Double) case class Community(communityID: Int, nodeID: Int) case class CommunitySumTotal(communityID: Int, sumTotal: Double) val communities: DataSet[Community] val weightedEdges: DataSet[WeightedEdge] val communitiesTable = communities.toTable val weightedEdgesTable = weightedEdges.toTable val sumTotal = communitiesTable.join(weightedEdgesTable) .where("nodeID = src && nodeID = target") .groupBy('communityID) .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal] but I get this exception: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The types of the key fields do not match: The number of specified keys is different. at org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) at org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Moreover when I use the following where clause: .where("nodeID = src || nodeID = target") I get another error: Exception in thread "main" org.apache.flink.api.table.ExpressionException: Could not derive equi-join predicates for predicate 'nodeID === 'src || 'nodeID === 'target. at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Apart from that the TableApi seems really promising. It's a really great tool. Thank you for your help, Felix