Hello - I have the following call to addSource where I pass a Custom SourceFunction ..
env.<Data>addSource( new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new TypeHint<Data>(){})) ) where data is List<Data> and CollectionSourceFunctionJ is a Scala case class .. case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: TypeInformation[T]) extends SourceFunction[T] { def cancel(): Unit = {} def run(ctx: SourceContext[T]): Unit = { data.asScala.foreach(d ⇒ ctx.collect(d)) } } When the following transformation runs .. DataStream<Data> ins = readStream(in, Data.class, serdeData); DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); I get the following exception in the second line .. org.apache.flink.api.common.functions.InvalidTypesException: The return > type of function 'Custom Source' could not be determined automatically, due > to type erasure. You can give type information hints by using the > returns(...) method on the result of the transformation call, or by letting > your function implement the 'ResultTypeQueryable' interface. Initially the returns call was not there and I was getting the same exception. Now after adding the returns call, nothing changes. Any help will be appreciated .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg