[ 
https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Ramesh updated SPARK-10122:
--------------------------------
    Description: 
SPARK-8389 added the offsetRanges interface to Kafka direct streams. This 
however appears to break when chaining operations after a transform operation. 
Following is example code that would result in an error (stack trace below). 
Note that if the 'count()' operation is taken out of the example code then this 
error does not occur anymore, and the Kafka data is printed.

{code:title=kafka_test.py|collapse=true}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def attach_kafka_metadata(kafka_rdd):
    offset_ranges = kafka_rdd.offsetRanges()

    return kafka_rdd


if __name__ == "__main__":
    sc = SparkContext(appName='kafka-test')
    ssc = StreamingContext(sc, 10)

    kafka_stream = KafkaUtils.createDirectStream(
        ssc,
        [TOPIC],
        kafkaParams={
            'metadata.broker.list': BROKERS,
        },
    )
    kafka_stream.transform(attach_kafka_metadata).count().pprint()

    ssc.start()
    ssc.awaitTermination()
{code}

{code:title=Stack trace|collapse=true}
Traceback (most recent call last):
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
line 62, in call
    r = self.func(t, *rdds)
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
line 332, in <lambda>
    func = lambda t, rdd: oldfunc(rdd)
  File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in 
attach_kafka_metadata
    offset_ranges = kafka_rdd.offsetRanges()
AttributeError: 'RDD' object has no attribute 'offsetRanges'
{code}

  was:
SPARK-8337 added the offsetRanges interface to Kafka direct streams. This 
however appears to break when chaining operations after a transform operation. 
Following is example code that would result in an error (stack trace below). 
Note that if the 'count()' operation is taken out of the example code then this 
error does not occur anymore, and the Kafka data is printed.

{code:title=kafka_test.py|collapse=true}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def attach_kafka_metadata(kafka_rdd):
    offset_ranges = kafka_rdd.offsetRanges()

    return kafka_rdd


if __name__ == "__main__":
    sc = SparkContext(appName='kafka-test')
    ssc = StreamingContext(sc, 10)

    kafka_stream = KafkaUtils.createDirectStream(
        ssc,
        [TOPIC],
        kafkaParams={
            'metadata.broker.list': BROKERS,
        },
    )
    kafka_stream.transform(attach_kafka_metadata).count().pprint()

    ssc.start()
    ssc.awaitTermination()
{code}

{code:title=Stack trace|collapse=true}
Traceback (most recent call last):
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
line 62, in call
    r = self.func(t, *rdds)
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", 
line 616, in <lambda>
    self.func = lambda t, rdd: func(t, prev_func(t, rdd))
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
line 332, in <lambda>
    func = lambda t, rdd: oldfunc(rdd)
  File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in 
attach_kafka_metadata
    offset_ranges = kafka_rdd.offsetRanges()
AttributeError: 'RDD' object has no attribute 'offsetRanges'
{code}


> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> ------------------------------------------------------------
>
>                 Key: SPARK-10122
>                 URL: https://issues.apache.org/jira/browse/SPARK-10122
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>            Reporter: Amit Ramesh
>            Priority: Critical
>              Labels: kafka
>
> SPARK-8389 added the offsetRanges interface to Kafka direct streams. This 
> however appears to break when chaining operations after a transform 
> operation. Following is example code that would result in an error (stack 
> trace below). Note that if the 'count()' operation is taken out of the 
> example code then this error does not occur anymore, and the Kafka data is 
> printed.
> {code:title=kafka_test.py|collapse=true}
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
> def attach_kafka_metadata(kafka_rdd):
>     offset_ranges = kafka_rdd.offsetRanges()
>     return kafka_rdd
> if __name__ == "__main__":
>     sc = SparkContext(appName='kafka-test')
>     ssc = StreamingContext(sc, 10)
>     kafka_stream = KafkaUtils.createDirectStream(
>         ssc,
>         [TOPIC],
>         kafkaParams={
>             'metadata.broker.list': BROKERS,
>         },
>     )
>     kafka_stream.transform(attach_kafka_metadata).count().pprint()
>     ssc.start()
>     ssc.awaitTermination()
> {code}
> {code:title=Stack trace|collapse=true}
> Traceback (most recent call last):
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
> line 62, in call
>     r = self.func(t, *rdds)
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
> line 332, in <lambda>
>     func = lambda t, rdd: oldfunc(rdd)
>   File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in 
> attach_kafka_metadata
>     offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to