Ah yes so the Py4J conversions only apply on the driver program - your DStream however is RDDs of pickled objects. If you want to with a transform function use Spark SQL transferring DataFrames back and forth between Python and Scala spark can be much easier.
On Monday, September 12, 2016, Alexis Seigneurin <aseigneu...@ippon.fr> wrote: > Hi, > > > *TL;DR - I have what looks like a DStream of Strings in a PySpark > application. I want to send it as a DStream[String] to a Scala library. > Strings are not converted by Py4j, though.* > > > I'm working on a PySpark application that pulls data from Kafka using > Spark Streaming. My messages are strings and I would like to call a method > in Scala code, passing it a DStream[String] instance. However, I'm unable > to receive proper JVM strings in the Scala code. It looks to me like the > Python strings are not converted into Java strings but, instead, are > serialized. > > My question would be: how to get Java strings out of the DStream object? > > > Here is the simplest Python code I came up with: > > from pyspark.streaming import StreamingContext > ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) > > from pyspark.streaming.kafka import KafkaUtils > stream = KafkaUtils.createDirectStream(ssc, ["IN"], > {"metadata.broker.list": "localhost:9092"}) > values = stream.map(lambda tuple: tuple[1]) > > ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) > > ssc.start() > > > I'm running this code in PySpark, passing it the path to my JAR: > > pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar > > > On the Scala side, I have: > > package com.seigneurin > > import org.apache.spark.streaming.api.java.JavaDStream > > object MyPythonHelper { > def doSomething(jdstream: JavaDStream[String]) = { > val dstream = jdstream.dstream > dstream.foreachRDD(rdd => { > rdd.foreach(println) > }) > } > } > > > Now, let's say I send some data into Kafka: > > echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic IN > > > The println statement in the Scala code prints something that looks like: > > [B@758aa4d9 > > > I expected to get foo bar instead. > > Now, if I replace the simple println statement in the Scala code with the > following: > > rdd.foreach(v => println(v.getClass.getCanonicalName)) > > > I get: > > java.lang.ClassCastException: [B cannot be cast to java.lang.String > > > This suggests that the strings are actually passed as arrays of bytes. > > If I simply try to convert this array of bytes into a string (I know I'm > not even specifying the encoding): > > def doSomething(jdstream: JavaDStream[Array[Byte]]) = { > val dstream = jdstream.dstream > dstream.foreachRDD(rdd => { > rdd.foreach(bytes => println(new String(bytes))) > }) > } > > > I get something that looks like (special characters might be stripped off): > > �]qXfoo barqa. > > > This suggests the Python string was serialized (pickled?). How could I > retrieve a proper Java string instead? > > > Thanks, > Alexis > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau