Hi,

*TL;DR - I have what looks like a DStream of Strings in a PySpark
application. I want to send it as a DStream[String] to a Scala library.
Strings are not converted by Py4j, though.*


I'm working on a PySpark application that pulls data from Kafka using Spark
Streaming. My messages are strings and I would like to call a method in
Scala code, passing it a DStream[String] instance. However, I'm unable to
receive proper JVM strings in the Scala code. It looks to me like the
Python strings are not converted into Java strings but, instead, are
serialized.

My question would be: how to get Java strings out of the DStream object?


Here is the simplest Python code I came up with:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"],
{"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()


I'm running this code in PySpark, passing it the path to my JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar


On the Scala side, I have:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}


Now, let's say I send some data into Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic IN


The println statement in the Scala code prints something that looks like:

[B@758aa4d9


I expected to get foo bar instead.

Now, if I replace the simple println statement in the Scala code with the
following:

rdd.foreach(v => println(v.getClass.getCanonicalName))


I get:

java.lang.ClassCastException: [B cannot be cast to java.lang.String


This suggests that the strings are actually passed as arrays of bytes.

If I simply try to convert this array of bytes into a string (I know I'm
not even specifying the encoding):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }


I get something that looks like (special characters might be stripped off):

�]qXfoo barqa.


This suggests the Python string was serialized (pickled?). How could I
retrieve a proper Java string instead?


Thanks,
Alexis

Reply via email to