Hi, following the socketStream[T] function implementation from the official spark GitHub repo :
ef socketStream[T]( hostname: String, port: Int, converter: JFunction[InputStream, java.lang.Iterable[T]], storageLevel: StorageLevel) : JavaReceiverInputDStream[T] = { def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).iterator().asScala implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.socketStream(hostname, port, fn, storageLevel) } I'm implementing a custom receiver that works great with used in Scala. I'm trying to use it from Java and the createStream in MyReceiverUtils.scala is the following : def createStream[T]( jssc: JavaStreamingContext, host: String, port: Int, address: String, messageConverter: Function[Message, Option[T]], storageLevel: StorageLevel ): JavaReceiverInputDStream[T] = { def fn: (Message) => Option[T] = (x: Message) => messageConverter.call(x) implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] new MyInputDStream(jssc.ssc, host, port, address, fn, storageLevel) } Trying to use it I receive : org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 465, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.spark.streaming.amqp.JavaMyReceiverStreamSuite If I change the fn definition with something simpler like (x: Message) => None for example, the error goes away. Why the call on messageConverter is producing this problem ? Thanks, Paolo