Makes sense. Thanks Holden.

Alexis

On Mon, Sep 12, 2016 at 5:28 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Ah yes so the Py4J conversions only apply on the driver program - your
> DStream however is RDDs of pickled objects. If you want to with a transform
> function use Spark SQL transferring DataFrames back and forth between
> Python and Scala spark can be much easier.
>
>
> On Monday, September 12, 2016, Alexis Seigneurin <aseigneu...@ippon.fr>
> wrote:
>
>> Hi,
>>
>>
>> *TL;DR - I have what looks like a DStream of Strings in a PySpark
>> application. I want to send it as a DStream[String] to a Scala library.
>> Strings are not converted by Py4j, though.*
>>
>>
>> I'm working on a PySpark application that pulls data from Kafka using
>> Spark Streaming. My messages are strings and I would like to call a method
>> in Scala code, passing it a DStream[String] instance. However, I'm unable
>> to receive proper JVM strings in the Scala code. It looks to me like the
>> Python strings are not converted into Java strings but, instead, are
>> serialized.
>>
>> My question would be: how to get Java strings out of the DStream object?
>>
>>
>> Here is the simplest Python code I came up with:
>>
>> from pyspark.streaming import StreamingContext
>> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
>>
>> from pyspark.streaming.kafka import KafkaUtils
>> stream = KafkaUtils.createDirectStream(ssc, ["IN"],
>> {"metadata.broker.list": "localhost:9092"})
>> values = stream.map(lambda tuple: tuple[1])
>>
>> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
>>
>> ssc.start()
>>
>>
>> I'm running this code in PySpark, passing it the path to my JAR:
>>
>> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
>>
>>
>> On the Scala side, I have:
>>
>> package com.seigneurin
>>
>> import org.apache.spark.streaming.api.java.JavaDStream
>>
>> object MyPythonHelper {
>>   def doSomething(jdstream: JavaDStream[String]) = {
>>     val dstream = jdstream.dstream
>>     dstream.foreachRDD(rdd => {
>>       rdd.foreach(println)
>>     })
>>   }
>> }
>>
>>
>> Now, let's say I send some data into Kafka:
>>
>> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic IN
>>
>>
>> The println statement in the Scala code prints something that looks like:
>>
>> [B@758aa4d9
>>
>>
>> I expected to get foo bar instead.
>>
>> Now, if I replace the simple println statement in the Scala code with the
>> following:
>>
>> rdd.foreach(v => println(v.getClass.getCanonicalName))
>>
>>
>> I get:
>>
>> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>>
>>
>> This suggests that the strings are actually passed as arrays of bytes.
>>
>> If I simply try to convert this array of bytes into a string (I know I'm
>> not even specifying the encoding):
>>
>>       def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
>>         val dstream = jdstream.dstream
>>         dstream.foreachRDD(rdd => {
>>           rdd.foreach(bytes => println(new String(bytes)))
>>         })
>>       }
>>
>>
>> I get something that looks like (special characters might be stripped
>> off):
>>
>> �]qXfoo barqa.
>>
>>
>> This suggests the Python string was serialized (pickled?). How could I
>> retrieve a proper Java string instead?
>>
>>
>> Thanks,
>> Alexis
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


-- 

*Alexis Seigneurin*
*Managing Consultant*
(202) 459-1591 <202%20459.1591> - LinkedIn
<http://www.linkedin.com/in/alexisseigneurin>

<http://ipponusa.com/>
Rate our service <https://www.recommendi.com/app/survey/Eh2ZnWUPTxY>

Reply via email to