[ https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704674#comment-14704674 ]
Amit Ramesh edited comment on SPARK-10122 at 8/20/15 10:51 AM: --------------------------------------------------------------- [~srowen] as you can see in the example, offsetRanges() is being applied to the initial RDD as part of the transform operation. And the code works fine if the line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'. was (Author: aramesh): [~srowen] as you can see in the example, offsetRanges() is being applied to the initial RDD as part of the transform operation. And the code works file if the line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'. > AttributeError: 'RDD' object has no attribute 'offsetRanges' > ------------------------------------------------------------ > > Key: SPARK-10122 > URL: https://issues.apache.org/jira/browse/SPARK-10122 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Reporter: Amit Ramesh > Priority: Critical > Labels: kafka > > SPARK-8389 added the offsetRanges interface to Kafka direct streams. This > however appears to break when chaining operations after a transform > operation. Following is example code that would result in an error (stack > trace below). Note that if the 'count()' operation is taken out of the > example code then this error does not occur anymore, and the Kafka data is > printed. > {code:title=kafka_test.py|collapse=true} > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > def attach_kafka_metadata(kafka_rdd): > offset_ranges = kafka_rdd.offsetRanges() > return kafka_rdd > if __name__ == "__main__": > sc = SparkContext(appName='kafka-test') > ssc = StreamingContext(sc, 10) > kafka_stream = KafkaUtils.createDirectStream( > ssc, > [TOPIC], > kafkaParams={ > 'metadata.broker.list': BROKERS, > }, > ) > kafka_stream.transform(attach_kafka_metadata).count().pprint() > ssc.start() > ssc.awaitTermination() > {code} > {code:title=Stack trace|collapse=true} > Traceback (most recent call last): > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", > line 62, in call > r = self.func(t, *rdds) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", > line 332, in <lambda> > func = lambda t, rdd: oldfunc(rdd) > File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in > attach_kafka_metadata > offset_ranges = kafka_rdd.offsetRanges() > AttributeError: 'RDD' object has no attribute 'offsetRanges' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org