Hi,
I want to join two tables in the following way:
case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)
case class CommunitySumTotal(communityID: Int, sumTotal: Double)
val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]
val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable
val sumTotal = communitiesTable.join(weightedEdgesTable)
.where("nodeID = src && nodeID = target")
.groupBy('communityID)
.select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]
but I get this exception:
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:
.where("nodeID = src || nodeID = target")
I get another error:
Exception in thread "main"
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Apart from that the TableApi seems really promising. It's a really great tool.
Thank you for your help,
Felix