Thank you for your response Nico. Below is a simple case where I’m trying to 
join on Row fields:

package com.github.hadronzoo.rowerror

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object Main {

  class MakeRow extends MapFunction[(Integer, Integer), Row] with 
ResultTypeQueryable[Row] {
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }

    override def getProducedType: TypeInformation[Row] =
      new RowTypeInfo(
        Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
        Array("id", "value")
      )
  }

  def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match { 
case (a, b) => (a, b) }

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.createLocalEnvironment()
    val rowFn = new MakeRow

    val ints = 0 until 1000
    val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(integerTuple)
    val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(integerTuple)

    val evenRows = env.fromCollection(evenIntegers).map(rowFn)
    val oddRows = env.fromCollection(oddIntegers).map(rowFn)

    evenRows.join(oddRows).where("id").equalTo("id").print()
  }
}

Executing the above yields the following error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
        at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
        at 
org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
        at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
        at com.github.hadronzoo.rowerror.Main.main(Main.scala)

For my application I only have TypeInformation at runtime (before the execution 
graph is built). Is it possible to use Row fields in join operations or is 
there an error with my implementation?

Thanks for your help,

Joshua

> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com> wrote:
> 
> Can you show a minimal example of the query you are trying to run?
> Maybe Timo or Fabian (cc'd) can help.
> 
> 
> Nico
> 
> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>> Hello,
>> 
>> When using nested field expressions like “Account.Id" with nested rows, I
>> get the following error, “This type
>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
>> a way to make nested field expressions work with nested rows?
> 
>> Thanks,
>> 
>> Joshua
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to