Hi Arpan,

The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.











On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani <arpa...@gmail.com> wrote:

> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] = 
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
>     val offsetRangers : Array[OffsetRanger] = 
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
>     // Filter out the values which have empty values and get the tuple of type
>         // ( topicname, stringValue_read_from_kafka_topic)
>     stream.map(x => ("topicName",x.value)).filter(x=> 
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
>     // Sometime later, after outputs have completed.
>     stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
>  // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and 
> output operations after starting a context is not supported
>     at 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
>     at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>

Reply via email to