Hi, I have been trying to get a case class with a generic parameter working with Filnk 1.0.3 and have been having some trouble. However when I compile I get the following error: debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]] [ERROR] .apply(new AggregateOrigins)
I am importing org.apache.flink.api.scala._ and the generic type is defined as [T: TypeInformation] as suggested here: https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html The full code for the program is as follows: package com.example.flink.jobs import java.util.{Properties} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08} import org.apache.flink.api.scala._ object CaseClassWithGeneric { case class TestGen[T: TypeInformation](item: T) {} class AggregateOrigins extends WindowFunction[String, TestGen[String], String, TimeWindow] { def apply(key: String, win: TimeWindow, values: Iterable[String], col: Collector[TestGen[String]]): Unit = { values.foreach(x => { }) col.collect(new TestGen[String]("Foo")) } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties(); val messageStream = env.addSource( new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties)) .keyBy(s => s) .timeWindow(Time.days(1)) .apply(new AggregateOrigins) messageStream.print() env.execute("Simple Job") } } When I dug into the apply() function definition I found the following: def apply[R: TypeInformation]( function: WindowFunction[T, R, K, W]): DataStream[R] = { val cleanFunction = clean(function) val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction) asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } As Far as I can tell TestGen[String] should correspond to [R: TypeInformation] in apply. Am I missing something or is it not possible to define case class with a generic parameter? Thanks, James Bucher