Hi,

I want to join two tables in the following way:

case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
 .where("nodeID = src && nodeID = target")
 .groupBy('communityID)
 .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]


but I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:

.where("nodeID = src || nodeID = target")

I get another error:

Exception in thread "main"
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.

at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)


Apart from that the TableApi seems really promising. It's a really great tool.

Thank you for your help,

Felix

Reply via email to