Hi Arpan, The error suggests that the streaming context has been started with streamingContext.start() and after that statement, some other dstream operations have been attempted. A suggested pattern to manage the offsets is the following:
var offsetRanges: Array[OffsetRanger] = _ //create streaming context, streams, ... // as first operation after the stream has been created, do: stream.foreachRDD { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges } //Then do other desired operations on the streaming data val resultStream = stream.map(...).filter(...).transform(...) //materialize the resulting stream resultStream.foreachRDD{rdd => // do stuff... write to a db, to a kafka topic,... whatever,... //at the end of the process, commit the offsets (note that I use the original stream instance, not `resultStream` stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } I hope this helps, kr, Gerard. On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani <arpa...@gmail.com> wrote: > Hi all, > > In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to > store the offsets in Kafka in order to achieve restartability of the > streaming application. ( Using checkpoints, I already implemented, we will > require to change code in production hence checkpoint won't work) > > Checking Spark Streaming documentation- Storing offsets on Kafka approach > : > > http://spark.apache.org/docs/latest/streaming-kafka-0-10- > integration.html#kafka-itself, which describes : > > stream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > > // some time later, after outputs have completed > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > > > Based on this, I modified the code like following: > > val kafkaMap:Map[String,Object] = KakfaConfigs > > val stream:InputDStream[ConsumerRecord[String,String]] = > KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] > (Array("topicName"),kafkaMap)) > > stream.foreach { rdd => > val offsetRangers : Array[OffsetRanger] = > rdd.asInstanceOf[HasOffsetRangers].offsetRanges > > // Filter out the values which have empty values and get the tuple of type > // ( topicname, stringValue_read_from_kafka_topic) > stream.map(x => ("topicName",x.value)).filter(x=> > !x._2.trim.isEmpty).foreachRDD(processRDD _) > > // Sometime later, after outputs have completed. > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > > > def processRDD(rdd:RDD[(String,String)]) { > // Process futher to hdfs > } > > Now, When I try to start Streaming application, it does not start and > looking at the logs, here is what we see : > > java.lang.IllegalStateException: Adding new inputs, transformations, and > output operations after starting a context is not supported > at > org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223) > at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65) > > > Can anyone suggest, or help to understand what are we missing here? > > > Regards, > Arpan >