[ https://issues.apache.org/jira/browse/SPARK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu resolved SPARK-12002. ---------------------------------- Resolution: Fixed Assignee: Saisai Shao Fix Version/s: 1.6.0 > offsetRanges attribute missing in Kafka RDD when resuming from checkpoint > ------------------------------------------------------------------------- > > Key: SPARK-12002 > URL: https://issues.apache.org/jira/browse/SPARK-12002 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Reporter: Amit Ramesh > Assignee: Saisai Shao > Fix For: 1.6.0 > > > SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed > the issue of not ending up with non-Kafka RDDs when chaining transforms to > Kafka RDDs. It appears that this issue remains for the case where a streaming > application using Kafka direct streams is initialized from the checkpoint > directory. The following is a representative example where everything works > as expected during the first run, but exceptions are thrown on a subsequent > run when the context is being initialized from the checkpoint directory. > {code:title=test_checkpoint.py|language=python} > from pyspark import SparkContext > > from pyspark.streaming import StreamingContext > > from pyspark.streaming.kafka import KafkaUtils > > def attach_kafka_metadata(kafka_rdd): > > offset_ranges = kafka_rdd.offsetRanges() > > > > return kafka_rdd > > > > > > def create_context(): > > sc = SparkContext(appName='kafka-test') > > ssc = StreamingContext(sc, 10) > > ssc.checkpoint(CHECKPOINT_URI) > > > > kafka_stream = KafkaUtils.createDirectStream( > > ssc, > > [TOPIC], > > kafkaParams={ > > 'metadata.broker.list': BROKERS, > > }, > > ) > > kafka_stream.transform(attach_kafka_metadata).count().pprint() > > > > return ssc > > > > > > if __name__ == "__main__": > > ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context) > > ssc.start() > > ssc.awaitTermination() > {code} > {code:title=Exception on resuming from checkpoint} > Traceback (most recent call last): > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", > line 62, in call > r = self.func(t, *rdds) > File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", > line 344, in <lambda> > File "/home/spark/batch/test_checkpoint.py", line 12, in > attach_kafka_metadata > offset_ranges = kafka_rdd.offsetRanges() > AttributeError: 'RDD' object has no attribute 'offsetRanges' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org