Thank you for your response Nico. Below is a simple case where I’m trying to join on Row fields:
package com.github.hadronzoo.rowerror import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} import org.apache.flink.api.scala._ import org.apache.flink.types.Row object Main { class MakeRow extends MapFunction[(Integer, Integer), Row] with ResultTypeQueryable[Row] { override def map(tuple: (Integer, Integer)): Row = tuple match { case (value, id) => Row.of(id, value) } override def getProducedType: TypeInformation[Row] = new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), Array("id", "value") ) } def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { case (a, b) => (a, b) } def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.createLocalEnvironment() val rowFn = new MakeRow val ints = 0 until 1000 val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple) val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple) val evenRows = env.fromCollection(evenIntegers).map(rowFn) val oddRows = env.fromCollection(oddIntegers).map(rowFn) evenRows.join(oddRows).where("id").equalTo("id").print() } } Executing the above yields the following error: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.types.Row>) cannot be used as key. at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72) at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36) at com.github.hadronzoo.rowerror.Main.main(Main.scala) For my application I only have TypeInformation at runtime (before the execution graph is built). Is it possible to use Row fields in join operations or is there an error with my implementation? Thanks for your help, Joshua > On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com> wrote: > > Can you show a minimal example of the query you are trying to run? > Maybe Timo or Fabian (cc'd) can help. > > > Nico > > On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote: >> Hello, >> >> When using nested field expressions like “Account.Id" with nested rows, I >> get the following error, “This type >> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there >> a way to make nested field expressions work with nested rows? > >> Thanks, >> >> Joshua >
signature.asc
Description: Message signed with OpenPGP