[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592354#comment-14592354
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
-----------------------------------------------

Hi, 

I have made some additional experiments:

- I have replaced the dictionary with a named tuple, this is just an aesthetic 
detail. Regarding your comment Amit, what you propose could also be a good 
option

_MessageAndMetadata = namedtuple("MessageAndMetadata", ["key", "value", 
"topic", "partition", "offset"])

so we get

-------------------------------------------
Time: 2015-06-18 20:38:46
-------------------------------------------
MessageAndMetadata(key=None, value=u'hola', topic=u'test', partition=0, 
offset=104L)
()
...

- Regarding the message handler approach, I don't know much about py4j, but 
from 
http://py4j.sourceforge.net/advanced_topics.html#implementing-java-interfaces-from-python-callback
 I understand that the limited support py4j offers for calling Java interfaces 
implemented in Python cannot be used in this situation. That would be necessary 
to wrap a Python lambda into a org.apache.spark.api.java.function.Function with 
something like this

class JFunction(object):
    def __init__(self, f):
        self._f = f

    def call(self, v):
        return self._f(v)

    class Java:
        implements = ['org.apache.spark.api.java.function.Function']


- Another option is returning MessageAndMetadata directly instead of encoding 
them with tuples and then converting to named tuples. But that leads to " 
Unexpected element type class kafka.message.MessageAndMetadata" in PythonRDD


15/06/18 20:45:50 INFO DAGScheduler: Job 9 failed: runJob at 
PythonRDD.scala:366, took 0,034251 s
Traceback (most recent call last):
  File "/home/juanrh/git/spark/python/pyspark/streaming/util.py", line 57, in 
call
    r = self.func(t, *rdds)
  File "/home/juanrh/git/spark/python/pyspark/streaming/dstream.py", line 171, 
in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/juanrh/git/spark/python/pyspark/rdd.py", line 1265, in take
    res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "/home/juanrh/git/spark/python/pyspark/context.py", line 891, in runJob
    allowLocal)
  File 
"/home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", 
line 538, in __call__
    self.target_id, self.name)
  File 
"/home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 
300, in get_return_value
    format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 
9, localhost): org.apache.spark.SparkException: Unexpected element type class 
kafka.message.MessageAndMetadata
        at 
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:422)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
        at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1771)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

I don't know the details of pyspark, and the reason why it supports so little 
data types. On the other hand an approach based on hasoffsets is complicated by 
the wrapper objects introduced when passing from Scala to Python, but maybe it 
could be possible to add an OffsetRange object to the __dict__ of each RDD. 
Again, as I don't know about the design of pyspark and its serialization 
mechanism, I don't know whether that information is erased or not. 

This is as far as I go with my limited knowledge about pyspark. So maybe, as 
you suggest Cody, it would be better that another person who knows more about 
the internals of pyspark takes the baton now. 


> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8337
>                 URL: https://issues.apache.org/jira/browse/SPARK-8337
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.4.0
>            Reporter: Amit Ramesh
>            Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to