Indeed that worked. Thanks! > On Jul 10, 2017, at 11:57 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi, > > You have to add the implicit value in the main() method before you call > .map(rowFn) and not in the MapFunction. > > Best, Fabian > > > 2017-07-10 18:54 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com > <mailto:jgriff...@campuslabs.com>>: > Hello Fabian, > > Thank you for your response. I tried your recommendation but I’m getting the > same issue. Here’s the altered MakeRow MapFunction I tried: > > class MakeRow extends MapFunction[(Integer, Integer), Row] { > implicit val rowType: TypeInformation[Row] = new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO), > Array("id", "value") > ) > override def map(tuple: (Integer, Integer)): Row = tuple match { > case (value, id) => Row.of(id, value) > } > } > > In stepping through the code execution, it looks like the problem is that > Row.isKeyType() returns false > <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.3.1-rc2%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Foperators%2FKeys.java%23L98-L100&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=lkbNW%2FpRfQEscUFgeu3ExtrmZh6N%2FOI18AfcJx3agII%3D&reserved=0>. > Any recommendations? > > Thanks, > > Joshua > > >> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> >> Hi Joshua, >> >> thanks for reporting this issue. You code is fine but IMO there is a bug in >> the Scala DataSet API. >> It simply does not respect the type information provided by the >> ResultTypeQueryable[Row] interface and defaults to a GenericType. >> >> I think this should be fix. I'll open a JIRA issue for that. >> >> You can explicitly declare types with implicits if you put the following >> lines above the lines in which you apply the rowFn on the DataSet. >> >> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo( >> Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO), >> Array("id", "value") >> ) >> When you do this, you can also remove move the ResultTypeQueryable interface >> from the MapFunction. >> >> Cheers, Fabian >> >> >> >> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com >> <mailto:jgriff...@campuslabs.com>>: >> Thank you for your response Nico. Below is a simple case where I’m trying to >> join on Row fields: >> >> package com.github.hadronzoo.rowerror >> >> import org.apache.flink.api.co >> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.functions.MapFunction >> import org.apache.flink.api.co >> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} >> import org.apache.flink.api.sc >> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.sc&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=SM6GjKvOd%2BfN9HzLdUaYe%2BJ2xLe%2FjqaqWZIMMAyR%2Bvg%3D&reserved=0>ala._ >> import org.apache.flink.types.Row >> >> object Main { >> >> class MakeRow extends MapFunction[(Integer, Integer), Row] with >> ResultTypeQueryable[Row] { >> override def map(tuple: (Integer, Integer)): Row = tuple match { >> case (value, id) => Row.of(id, value) >> } >> >> override def getProducedType: TypeInformation[Row] = >> new RowTypeInfo( >> Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.INT_TYPE_INFO), >> Array("id", "value") >> ) >> } >> >> def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple >> match { case (a, b) => (a, b) } >> >> def main(args: Array[String]): Unit = { >> val env = ExecutionEnvironment.createLocalEnvironment() >> val rowFn = new MakeRow >> >> val ints = 0 until 1000 >> val evenIntegers = (ints filter (_ % 2 == >> 0)).zipWithIndex.map(integerTuple) >> val oddIntegers = (ints filter (_ % 2 == >> 1)).zipWithIndex.map(integerTuple) >> >> val evenRows = env.fromCollection(evenIntegers).map(rowFn) >> val oddRows = env.fromCollection(oddIntegers).map(rowFn) >> >> evenRows.join(oddRows).where("id").equalTo("id").print() >> } >> } >> >> Executing the above yields the following error: >> >> Exception in thread "main" org.apache.flink.api.common.In >> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.common.In&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=MQRpagG2i%2F4GCEGuW%2FkiXdzTuNasDtCPPEZRmjSTFhQ%3D&reserved=0>validProgramException: >> This type (GenericType<org.apache.flink.types.Row>) cannot be used as key. >> at >> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) >> at >> org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72) >> at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36) >> at com.github.hadronzoo.rowerror.Main.main(Main.scala) >> >> For my application I only have TypeInformation at runtime (before the >> execution graph is built). Is it possible to use Row fields in join >> operations or is there an error with my implementation? >> >> Thanks for your help, >> >> Joshua >> >>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com >>> <mailto:n...@data-artisans.com>> wrote: >>> >>> Can you show a minimal example of the query you are trying to run? >>> Maybe Timo or Fabian (cc'd) can help. >>> >>> >>> Nico >>> >>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote: >>>> Hello, >>>> >>>> When using nested field expressions like “Account.Id" with nested rows, I >>>> get the following error, “This type >>>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there >>>> a way to make nested field expressions work with nested rows? >>> >>>> Thanks, >>>> >>>> Joshua >>> >> >> > >
signature.asc
Description: Message signed with OpenPGP