Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Arpan Rajani
Hi Gerard,

Excellent, indeed your inputs helped. Thank you for the quick reply.

I modified the code based on inputs.

Now the application starts and it reads from the topic. Now we stream like
50,000 messages on the Kafka topic.

After a while we terminate the application using YARN kill and check how
many messages were written in HBase (say 9,000),
Then we restart the application and wait for messages to get picked up from
the topic, application does not read anything from the topic, that
means *Streaming
Application fails to get the correct offset from where it should start*. (
this was not the case with checkpointing mechanism, where I could see all
50,000 messages after restart).

What do you think is missing in this?

Following is the improved code based on previous inputs

//create Spark Streaming Context

val stream:InputDStream[ConsumerRecord[String,String]] =
KafkaUtil.createDirectStream())

// Modified based on inputs

stream.foreachRDD { rdd  =>

 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

}

val resultStream = stream.map ( x => (“tpicName”,x.value)).filter( x =>
!x._2.trim.isEmpty)

resultStream.foreach { rdd =>

  processRDD(rdd) // this stores messages in HBase

  //commit offsets using original stream.

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

ssc.start()

ssc.awaitTermination()

.
Could you please help me, figuring out what is missing here?

Many thanks,
Arpan



On Fri, Oct 13, 2017 at 3:27 PM, Gerard Maas  wrote:

> Hi Arpan,
>
> The error suggests that the streaming context has been started with
> streamingContext.start() and after that statement, some other
> dstream operations have been attempted.
> A suggested pattern to manage the offsets is the following:
>
> var offsetRanges: Array[OffsetRanger] = _
>
> //create streaming context, streams, ...
> // as first operation after the stream has been created, do:
>
> stream.foreachRDD { rdd =>
>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> }
> //Then do other desired operations on the streaming data
> val resultStream = stream.map(...).filter(...).transform(...)
> //materialize the resulting stream
>
> resultStream.foreachRDD{rdd =>
> // do stuff... write to a db, to a kafka topic,... whatever,...
>
> //at the end of the process, commit the offsets (note that I use the
> original stream instance, not `resultStream`
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
> I hope this helps,
>
> kr, Gerard.
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:
>
>> Hi all,
>>
>> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
>> store the offsets in Kafka in order to achieve restartability of the
>> streaming application. ( Using checkpoints, I already implemented, we will
>> require to change code in production hence checkpoint won't work)
>>
>> Checking Spark Streaming documentation- Storing offsets on Kafka approach
>> :
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-int
>> egration.html#kafka-itself, which describes :
>>
>> stream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>   // some time later, after outputs have completed
>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> }
>>
>>
>> Based on this, I modified the code like following:
>>
>> val kafkaMap:Map[String,Object] = KakfaConfigs
>>
>> val stream:InputDStream[ConsumerRecord[String,String]] = 
>> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
>> (Array("topicName"),kafkaMap))
>>
>> stream.foreach { rdd =>
>> val offsetRangers : Array[OffsetRanger] = 
>> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>>
>> // Filter out the values which have empty values and get the tuple of 
>> type
>> // ( topicname, stringValue_read_from_kafka_topic)
>> stream.map(x => ("topicName",x.value)).filter(x=> 
>> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>>
>> // Sometime later, after outputs have completed.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> }
>>
>>
>> def processRDD(rdd:RDD[(String,String)]) {
>>  // Process futher to hdfs
>> }
>>
>> Now, When I try to start Streaming application, it does not start and
>> looking at the logs, here is what we see :
>>
>> java.lang.IllegalStateException: Adding new inputs, transformations, and 
>> output operations after starting a context is not supported
>> at 
>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
>> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>>
>>
>> Can anyone suggest, or help to understand what are we missing here?
>>
>>
>> Regards,
>> Arpan
>>
>
>


Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Gerard Maas
Hi Arpan,

The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.











On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:

> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] = 
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
> val offsetRangers : Array[OffsetRanger] = 
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
> // Filter out the values which have empty values and get the tuple of type
> // ( topicname, stringValue_read_from_kafka_topic)
> stream.map(x => ("topicName",x.value)).filter(x=> 
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
> // Sometime later, after outputs have completed.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
>  // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and 
> output operations after starting a context is not supported
> at 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>