Hi Joshua,

thanks for reporting this issue. You code is fine but IMO there is a bug in
the Scala DataSet API.
It simply does not respect the type information provided by the
ResultTypeQueryable[Row] interface and defaults to a GenericType.

I think this should be fix. I'll open a JIRA issue for that.

You can explicitly declare types with implicits if you put the following
lines above the lines in which you apply the rowFn on the DataSet.

implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
  Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
  Array("id", "value")
)

When you do this, you can also remove move the ResultTypeQueryable
interface from the MapFunction.

Cheers, Fabian



2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>:

> Thank you for your response Nico. Below is a simple case where I’m trying
> to join on Row fields:
>
> package com.github.hadronzoo.rowerror
>
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable,
> RowTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.types.Row
>
> object Main {
>
>   class MakeRow extends MapFunction[(Integer, Integer), Row] with 
> ResultTypeQueryable[Row]
> {
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
>
>     override def getProducedType: TypeInformation[Row] =
>       new RowTypeInfo(
>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO),
>         Array("id", "value")
>       )
>   }
>
>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) =
> intTuple match { case (a, b) => (a, b) }
>
>   def main(args: Array[String]): Unit = {
>     val env = ExecutionEnvironment.createLocalEnvironment()
>     val rowFn = new MakeRow
>
>     val ints = 0 until 1000
>     val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(
> integerTuple)
>     val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(
> integerTuple)
>
>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
>
>     evenRows.join(oddRows).where("id").equalTo("id").print()
>   }
> }
>
>
> Executing the above yields the following error:
>
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException:
> This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
> at org.apache.flink.api.common.operators.Keys$ExpressionKeys.
> <init>(Keys.java:330)
> at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(
> unfinishedKeyPairOperation.scala:72)
> at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
> at com.github.hadronzoo.rowerror.Main.main(Main.scala)
>
>
> For my application I only have TypeInformation at runtime (before the
> execution graph is built). Is it possible to use Row fields in join
> operations or is there an error with my implementation?
>
> Thanks for your help,
>
> Joshua
>
> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com> wrote:
>
> Can you show a minimal example of the query you are trying to run?
> Maybe Timo or Fabian (cc'd) can help.
>
>
> Nico
>
> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>
> Hello,
>
> When using nested field expressions like “Account.Id" with nested rows, I
> get the following error, “This type
> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
> a way to make nested field expressions work with nested rows?
>
>
> Thanks,
>
> Joshua
>
>
>
>

Reply via email to