Ah yes so the Py4J conversions only apply on the driver program - your
DStream however is RDDs of pickled objects. If you want to with a transform
function use Spark SQL transferring DataFrames back and forth between
Python and Scala spark can be much easier.

On Monday, September 12, 2016, Alexis Seigneurin <aseigneu...@ippon.fr>
wrote:

> Hi,
>
>
> *TL;DR - I have what looks like a DStream of Strings in a PySpark
> application. I want to send it as a DStream[String] to a Scala library.
> Strings are not converted by Py4j, though.*
>
>
> I'm working on a PySpark application that pulls data from Kafka using
> Spark Streaming. My messages are strings and I would like to call a method
> in Scala code, passing it a DStream[String] instance. However, I'm unable
> to receive proper JVM strings in the Scala code. It looks to me like the
> Python strings are not converted into Java strings but, instead, are
> serialized.
>
> My question would be: how to get Java strings out of the DStream object?
>
>
> Here is the simplest Python code I came up with:
>
> from pyspark.streaming import StreamingContext
> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
>
> from pyspark.streaming.kafka import KafkaUtils
> stream = KafkaUtils.createDirectStream(ssc, ["IN"],
> {"metadata.broker.list": "localhost:9092"})
> values = stream.map(lambda tuple: tuple[1])
>
> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
>
> ssc.start()
>
>
> I'm running this code in PySpark, passing it the path to my JAR:
>
> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
>
>
> On the Scala side, I have:
>
> package com.seigneurin
>
> import org.apache.spark.streaming.api.java.JavaDStream
>
> object MyPythonHelper {
>   def doSomething(jdstream: JavaDStream[String]) = {
>     val dstream = jdstream.dstream
>     dstream.foreachRDD(rdd => {
>       rdd.foreach(println)
>     })
>   }
> }
>
>
> Now, let's say I send some data into Kafka:
>
> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic IN
>
>
> The println statement in the Scala code prints something that looks like:
>
> [B@758aa4d9
>
>
> I expected to get foo bar instead.
>
> Now, if I replace the simple println statement in the Scala code with the
> following:
>
> rdd.foreach(v => println(v.getClass.getCanonicalName))
>
>
> I get:
>
> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>
>
> This suggests that the strings are actually passed as arrays of bytes.
>
> If I simply try to convert this array of bytes into a string (I know I'm
> not even specifying the encoding):
>
>       def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
>         val dstream = jdstream.dstream
>         dstream.foreachRDD(rdd => {
>           rdd.foreach(bytes => println(new String(bytes)))
>         })
>       }
>
>
> I get something that looks like (special characters might be stripped off):
>
> �]qXfoo barqa.
>
>
> This suggests the Python string was serialized (pickled?). How could I
> retrieve a proper Java string instead?
>
>
> Thanks,
> Alexis
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau

Reply via email to