Hi,

I have been trying to get a case class with a generic parameter working with 
Filnk 1.0.3 and have been having some trouble. However when I compile I get the 
following error:
debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40:
 error: could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
[ERROR]           .apply(new AggregateOrigins)

I am importing org.apache.flink.api.scala._ and the generic type is defined as 
[T: TypeInformation] as suggested here: 
https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html

The full code for the program is as follows:

package com.example.flink.jobs

import java.util.{Properties}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
import org.apache.flink.api.scala._

object CaseClassWithGeneric {
  case class TestGen[T: TypeInformation](item: T) {}

  class AggregateOrigins extends WindowFunction[String, TestGen[String], 
String, TimeWindow] {
    def apply(key: String, win: TimeWindow, values: Iterable[String], col: 
Collector[TestGen[String]]): Unit = {
      values.foreach(x => { })
      col.collect(new TestGen[String]("Foo"))
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties();
    val messageStream = env.addSource(
      new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
          .keyBy(s => s)
          .timeWindow(Time.days(1))
          .apply(new AggregateOrigins)
    messageStream.print()
    env.execute("Simple Job")
  }
}

When I dug into the apply() function definition I found the following:

def apply[R: TypeInformation](
    function: WindowFunction[T, R, K, W]): DataStream[R] = {

  val cleanFunction = clean(function)
  val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
  asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}

As Far as I can tell TestGen[String] should correspond to [R: TypeInformation] 
in apply. Am I missing something or is it not possible to define case class 
with a generic parameter?

Thanks,

James Bucher

Reply via email to