[ https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14706186#comment-14706186 ]
Saisai Shao commented on SPARK-10122: ------------------------------------- Hi [~aramesh], thanks a lot for pointing this out. This is actually a bug, sorry for not covering it in the unit test. The problem is Python will compact a series of {{TransformedDStream}} into one: {code} if (isinstance(prev, TransformedDStream) and not prev.is_cached and not prev.is_checkpointed): prev_func = prev.func self.func = lambda t, rdd: func(t, prev_func(t, rdd)) self.prev = prev.prev {code} As {{KafkaTransformedDStream}} is a subclass of {{TransformedDStream}}, so it will be compacted to replace with its parent DStream, as the code shows {{self.prev = prev.prev}}, which is a DStream, get offset ranges on DStream will throw an exception as you mentioned before. I will submit a PR to fix this, so you could try with the patch to see if it is fixed. > AttributeError: 'RDD' object has no attribute 'offsetRanges' > ------------------------------------------------------------ > > Key: SPARK-10122 > URL: https://issues.apache.org/jira/browse/SPARK-10122 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Reporter: Amit Ramesh > Labels: kafka > > SPARK-8389 added the offsetRanges interface to Kafka direct streams. This > however appears to break when chaining operations after a transform > operation. Following is example code that would result in an error (stack > trace below). Note that if the 'count()' operation is taken out of the > example code then this error does not occur anymore, and the Kafka data is > printed. > {code:title=kafka_test.py|collapse=true} > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > def attach_kafka_metadata(kafka_rdd): > offset_ranges = kafka_rdd.offsetRanges() > return kafka_rdd > if __name__ == "__main__": > sc = SparkContext(appName='kafka-test') > ssc = StreamingContext(sc, 10) > kafka_stream = KafkaUtils.createDirectStream( > ssc, > [TOPIC], > kafkaParams={ > 'metadata.broker.list': BROKERS, > }, > ) > kafka_stream.transform(attach_kafka_metadata).count().pprint() > ssc.start() > ssc.awaitTermination() > {code} > {code:title=Stack trace|collapse=true} > Traceback (most recent call last): > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", > line 62, in call > r = self.func(t, *rdds) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File > "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line > 616, in <lambda> > self.func = lambda t, rdd: func(t, prev_func(t, rdd)) > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", > line 332, in <lambda> > func = lambda t, rdd: oldfunc(rdd) > File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in > attach_kafka_metadata > offset_ranges = kafka_rdd.offsetRanges() > AttributeError: 'RDD' object has no attribute 'offsetRanges' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org