Makes sense. Thanks Holden. Alexis
On Mon, Sep 12, 2016 at 5:28 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > Ah yes so the Py4J conversions only apply on the driver program - your > DStream however is RDDs of pickled objects. If you want to with a transform > function use Spark SQL transferring DataFrames back and forth between > Python and Scala spark can be much easier. > > > On Monday, September 12, 2016, Alexis Seigneurin <aseigneu...@ippon.fr> > wrote: > >> Hi, >> >> >> *TL;DR - I have what looks like a DStream of Strings in a PySpark >> application. I want to send it as a DStream[String] to a Scala library. >> Strings are not converted by Py4j, though.* >> >> >> I'm working on a PySpark application that pulls data from Kafka using >> Spark Streaming. My messages are strings and I would like to call a method >> in Scala code, passing it a DStream[String] instance. However, I'm unable >> to receive proper JVM strings in the Scala code. It looks to me like the >> Python strings are not converted into Java strings but, instead, are >> serialized. >> >> My question would be: how to get Java strings out of the DStream object? >> >> >> Here is the simplest Python code I came up with: >> >> from pyspark.streaming import StreamingContext >> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) >> >> from pyspark.streaming.kafka import KafkaUtils >> stream = KafkaUtils.createDirectStream(ssc, ["IN"], >> {"metadata.broker.list": "localhost:9092"}) >> values = stream.map(lambda tuple: tuple[1]) >> >> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) >> >> ssc.start() >> >> >> I'm running this code in PySpark, passing it the path to my JAR: >> >> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar >> >> >> On the Scala side, I have: >> >> package com.seigneurin >> >> import org.apache.spark.streaming.api.java.JavaDStream >> >> object MyPythonHelper { >> def doSomething(jdstream: JavaDStream[String]) = { >> val dstream = jdstream.dstream >> dstream.foreachRDD(rdd => { >> rdd.foreach(println) >> }) >> } >> } >> >> >> Now, let's say I send some data into Kafka: >> >> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list >> localhost:9092 --topic IN >> >> >> The println statement in the Scala code prints something that looks like: >> >> [B@758aa4d9 >> >> >> I expected to get foo bar instead. >> >> Now, if I replace the simple println statement in the Scala code with the >> following: >> >> rdd.foreach(v => println(v.getClass.getCanonicalName)) >> >> >> I get: >> >> java.lang.ClassCastException: [B cannot be cast to java.lang.String >> >> >> This suggests that the strings are actually passed as arrays of bytes. >> >> If I simply try to convert this array of bytes into a string (I know I'm >> not even specifying the encoding): >> >> def doSomething(jdstream: JavaDStream[Array[Byte]]) = { >> val dstream = jdstream.dstream >> dstream.foreachRDD(rdd => { >> rdd.foreach(bytes => println(new String(bytes))) >> }) >> } >> >> >> I get something that looks like (special characters might be stripped >> off): >> >> �]qXfoo barqa. >> >> >> This suggests the Python string was serialized (pickled?). How could I >> retrieve a proper Java string instead? >> >> >> Thanks, >> Alexis >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > > -- *Alexis Seigneurin* *Managing Consultant* (202) 459-1591 <202%20459.1591> - LinkedIn <http://www.linkedin.com/in/alexisseigneurin> <http://ipponusa.com/> Rate our service <https://www.recommendi.com/app/survey/Eh2ZnWUPTxY>