Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-14 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Thanks everyone, that was the problem. the "create new streaming
context" function was supposed to setup the stream processing as well
as the checkpoint directory. I had missed the whole process of
checkpoint setup. With that done, everything works as expected.

For the benefit of others, my final version of the code that works
looks like this and it works correctly:


object RawLogProcessor extends Logging {

  import TacomaHelper._

  val checkpointDir = "/tmp/checkpointDir_tacoma"
  var ssc: Option[StreamingContext] = None

  def createSparkConf(config: Config): SparkConf = {
val sparkConf = new SparkConf()
config.entrySet.asScala
  .map(kv => kv.getKey -> kv.getValue)
  .foreach { case (k, v) => sparkConf.set(s"spark.$k",
unquote(v.render())) }

sparkConf.registerKryoClasses(Array(classOf[VideoView],
classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
sparkConf
  }

  // a function that returns a function of type: `() => StreamingContext
`
  def createContext(sparkConfig: Config, kafkaConf: Config)(f:
StreamingContext => StreamingContext) = () => {
val batchDurationSecs =
sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
val sparkConf = createSparkConf(sparkConfig)

// calculate sparkContext and streamingContext
val streamingContext = new StreamingContext(sparkConf,
Durations.seconds(batchDurationSecs))
streamingContext.checkpoint(checkpointDir)

// apply the streaming context function to the function
f(streamingContext)
  }

  def createNewContext(sparkConf: Config, kafkaConf: Config, f:
StreamingContext => StreamingContext) = {
logInfo("Create new Spark streamingContext with provided pipeline
function")
StreamingContext.getOrCreate(
  checkpointPath = checkpointDir,
  creatingFunc = createContext(sparkConf, kafkaConf)(f),
  createOnError = true)
  }

  def apply(sparkConfig: Config, kafkaConf: Config): StreamingContext =
{
rawlogTopic = kafkaConf.getString("rawlog.topic")
kafkaParams = kafkaConf.entrySet.asScala
  .map(kv => kv.getKey -> unquote(kv.getValue.render()))
  .toMap

if (ssc.isEmpty) {
  ssc = Some(createNewContext(sparkConfig, kafkaConf, setupPipeline)
)
}
ssc.get
  }

  var rawlogTopic: String = "qa-rawlog"
  var kafkaParams: Map[String, String] = Map()

  def setupPipeline(streamingContext: StreamingContext):
StreamingContext = {

logInfo("Creating new kafka rawlog stream")
// TODO: extract this and pass it around somehow
val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
Set(rawlogTopic))

logInfo("adding step to parse kafka stream into RawLog types
(Normalizer)")
val eventStream = rawlogDStream
  .map({
  case (key, rawlogVal) =>
val record = rawlogVal.asInstanceOf[GenericData.Record]
val rlog = RawLog.newBuilder()
  .setId(record.get("id").asInstanceOf[String])
  .setAccount(record.get("account").asInstanceOf[String])
  .setEvent(record.get("event").asInstanceOf[String])
  .setTimestamp(record.get("timestamp").asInstanceOf[Long])
  .setUserAgent(record.get("user_agent").asInstanceOf[String])

.setParams(record.get("params").asInstanceOf[java.util.Map[String,
String]])
  .build()
val norm = Normalizer(rlog)
(key, rlog.getEvent, norm)
})

logInfo("Adding step to filter out VideoView only events and cache
them")
val videoViewStream = eventStream
  .filter(_._2 == "video_view")
  .filter(_._3.isDefined)
  .map((z) => (z._1, z._3.get))
  .map((z) => (z._1, z._2.asInstanceOf[VideoView]))
  .cache()

// repartition by account
logInfo("repartition videoView by account and calculate stats")
videoViewStream.map((v) => (v._2.getAccount, 1))
  .filter(_._1 != null)
  .window(Durations.seconds(20))
  .reduceByKey(_ + _)
  .print()

// repartition by (deviceType, DeviceOS)
logInfo("repartition videoView by (DeviceType, DeviceOS) and
calculate stats")
videoViewStream.map((v) => ((v._2.getDeviceType,
v._2.getDeviceOs), 1))
  .reduceByKeyAndWindow(_ + _, Durations.seconds(10))
  .print()

streamingContext
  }

}

- - Ankur

On 13/05/2015 23:52, NB wrote:
> The data pipeline (DAG) should not be added to the StreamingContext
> in the case of a recovery scenario. The pipeline metadata is
> recovered from the checkpoint folder. That is one thing you will
> need to fix in your code. Also, I don't think the
> ssc.checkpoint(folder) call should be made in case of the
> recovery.
> 
> The idiom to follow is to set up the DAG in the creatingFunc and
> not outside of it. This will ensure that if a new context is being
> created i.e. checkpoint folder does not exist, the DAG will get
> added to it and then checkpointed. Once a recovery happens, this
> function

Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread NB
The data pipeline (DAG) should not be added to the StreamingContext in the
case of a recovery scenario. The pipeline metadata is recovered from the
checkpoint folder. That is one thing you will need to fix in your code.
Also, I don't think the ssc.checkpoint(folder) call should be made in case
of the recovery.

The idiom to follow is to set up the DAG in the creatingFunc and not outside
of it. This will ensure that if a new context is being created i.e.
checkpoint folder does not exist, the DAG will get added to it and then
checkpointed. Once a recovery happens, this function is not invoked but
everything is recreated from the checkpointed data.

Hope this helps,
NB



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864p22878.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Cody Koeninger
cala)
>
> The relavant source is:
>
> class RawLogProcessor(ssc: StreamingContext, topic: String,
> kafkaParams: Map[String, String]) {
>  // create kafka stream
>  val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
> StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
>  //KafkaUtils.createStream[String, Object, StringDecoder,
> KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10),
> StorageLevel.MEMORY_AND_DISK_2)
>
>  val eventStream = rawlogDStream
>.map({
>  case (key, rawlogVal) =>
>val record = rawlogVal.asInstanceOf[GenericData.Record]
>val rlog = RawLog.newBuilder()
>  .setId(record.get("id").asInstanceOf[String])
>  .setAccount(record.get("account").asInstanceOf[String])
>  .setEvent(record.get("event").asInstanceOf[String])
>  .setTimestamp(record.get("timestamp").asInstanceOf[Long])
>  .setUserAgent(record.get("user_agent").asInstanceOf[String])
>
> .setParams(record.get("params").asInstanceOf[java.util.Map[String,
> String]])
>  .build()
>val norm = Normalizer(rlog)
>(key, rlog.getEvent, norm)
>})
>
>  val videoViewStream = eventStream
>.filter(_._2 == "video_view")
>.filter(_._3.isDefined)
>.map((z) => (z._1, z._3.get))
>.map((z) => (z._1, z._2.asInstanceOf[VideoView]))
>.cache()
>
>  // repartition by (deviceType, DeviceOS)
>  val deviceTypeVideoViews = videoViewStream.map((v) =>
> ((v._2.getDeviceType, v._2.getDeviceOs), 1))
>.reduceByKeyAndWindow(_ + _, Durations.seconds(10))
>.print()
> }
>
> object RawLogProcessor extends Logging {
>
>  /**
>   * If str is surrounded by quotes it return the content between the
> quotes
>   */
>  def unquote(str: String) = {
>if (str != null && str.length >= 2 && str.charAt(0) == '\"' &&
> str.charAt(str.length - 1) == '\"')
>  str.substring(1, str.length - 1)
>else
>  str
>  }
>
>  val checkpointDir = "/tmp/checkpointDir_tacoma"
>  var sparkConfig: Config = _
>  var ssc: StreamingContext = _
>  var processor: Option[RawLogProcessor] = None
>
>  val createContext: () => StreamingContext = () => {
>val batchDurationSecs =
> sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
>val sparkConf = new SparkConf()
>sparkConf.registerKryoClasses(Array(classOf[VideoView],
> classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
>sparkConfig.entrySet.asScala
>  .map(kv => kv.getKey -> kv.getValue)
>  .foreach {
>case (k, v) =>
>  val value = unquote(v.render())
>
>  logInfo(s"spark.$k = $value")
>
>  sparkConf.set(s"spark.$k", value)
>  }
>
>// calculate sparkContext and streamingContext
>new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs))
>  }
>
>  def createProcessor(sparkConf: Config, kafkaConf: Config):
> RawLogProcessor = {
>sparkConfig = sparkConf
>ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir,
> creatingFunc = createContext, createOnError = true)
>ssc.checkpoint(checkpointDir)
>// kafkaProperties
>val kafkaParams = kafkaConf.entrySet.asScala
>  .map(kv => kv.getKey -> unquote(kv.getValue.render()))
>  .toMap
>
>logInfo(s"Initializing kafkaParams = $kafkaParams")
>// create processor
>    new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"),
> kafkaParams)
>  }
>
>  def apply(sparkConfig: Config, kafkaConf: Config) = {
>if (processor.isEmpty) {
>  processor = Some(createProcessor(sparkConfig, kafkaConf))
>}
>processor.get
>  }
>
>  def start() = {
>ssc.start()
>ssc.awaitTermination()
>  }
>
> }
>
> Extended logs:
> https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9
> 6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log
>
> Could someone tell me what it causes this problem? I tried looking at
> the stacktrace but I am not very familiar with the codebase to make
> solid assertions.
> Any ideas as to what may be happening here.
>
> --- Ankur Chauhan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread ankurcha
tanceOf[java.util.Map[String,
String]])
 .build()
   val norm = Normalizer(rlog)
   (key, rlog.getEvent, norm)
   })

 val videoViewStream = eventStream
   .filter(_._2 == "video_view")
   .filter(_._3.isDefined)
   .map((z) => (z._1, z._3.get))
   .map((z) => (z._1, z._2.asInstanceOf[VideoView]))
   .cache()

 // repartition by (deviceType, DeviceOS)
 val deviceTypeVideoViews = videoViewStream.map((v) =>
((v._2.getDeviceType, v._2.getDeviceOs), 1))
   .reduceByKeyAndWindow(_ + _, Durations.seconds(10))
   .print()
}

object RawLogProcessor extends Logging {

 /**
  * If str is surrounded by quotes it return the content between the
quotes
  */
 def unquote(str: String) = {
   if (str != null && str.length >= 2 && str.charAt(0) == '\"' &&
str.charAt(str.length - 1) == '\"')
 str.substring(1, str.length - 1)
   else
 str
 }

 val checkpointDir = "/tmp/checkpointDir_tacoma"
 var sparkConfig: Config = _
 var ssc: StreamingContext = _
 var processor: Option[RawLogProcessor] = None

 val createContext: () => StreamingContext = () => {
   val batchDurationSecs =
sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
   val sparkConf = new SparkConf()
   sparkConf.registerKryoClasses(Array(classOf[VideoView],
classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
   sparkConfig.entrySet.asScala
 .map(kv => kv.getKey -> kv.getValue)
 .foreach {
   case (k, v) =>
 val value = unquote(v.render())

 logInfo(s"spark.$k = $value")

 sparkConf.set(s"spark.$k", value)
 }

   // calculate sparkContext and streamingContext
   new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs))
 }

 def createProcessor(sparkConf: Config, kafkaConf: Config):
RawLogProcessor = {
   sparkConfig = sparkConf
   ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir,
creatingFunc = createContext, createOnError = true)
   ssc.checkpoint(checkpointDir)
   // kafkaProperties
   val kafkaParams = kafkaConf.entrySet.asScala
 .map(kv => kv.getKey -> unquote(kv.getValue.render()))
 .toMap

   logInfo(s"Initializing kafkaParams = $kafkaParams")
   // create processor
   new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"),
kafkaParams)
 }

 def apply(sparkConfig: Config, kafkaConf: Config) = {
   if (processor.isEmpty) {
 processor = Some(createProcessor(sparkConfig, kafkaConf))
   }
   processor.get
 }

 def start() = {
   ssc.start()
   ssc.awaitTermination()
 }

}

Extended logs:
https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9
6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log

Could someone tell me what it causes this problem? I tried looking at
the stacktrace but I am not very familiar with the codebase to make
solid assertions.
Any ideas as to what may be happening here.

--- Ankur Chauhan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):

Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
 // create kafka stream
 val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
 //KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10),
StorageLevel.MEMORY_AND_DISK_2)

 val eventStream = rawlogDStream
   .map({
 case (key, rawlogVal) =>
   val record = rawlogVal.asInstanceOf[GenericData.Record]
   val rlog = RawLog.newBuilder()
 .setId(record.get("id").asInstanceOf[String])
 .setAccount(record.get("account").asInstanceOf[String])
 .setEvent(record.get("event").asInstanceOf[String])
 .setTimestamp(record.get("timestamp").asInstanceOf[Long])
 .setUserAgent(record.get("user_agent").asInstanceOf[String])

.setParams(record.get("params").asInstanceOf[java.util.Map[String,
String]])
 

kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):

Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
 // create kafka stream
 val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
 //KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10),
StorageLevel.MEMORY_AND_DISK_2)

 val eventStream = rawlogDStream
   .map({
 case (key, rawlogVal) =>
   val record = rawlogVal.asInstanceOf[GenericData.Record]
   val rlog = RawLog.newBuilder()
 .setId(record.get("id").asInstanceOf[String])
 .setAccount(record.get("account").asInstanceOf[String])
 .setEvent(record.get("event").asInstanceOf[String])
 .setTimestamp(record.get("timestamp").asInstanceOf[Long])
 .setUserAgent(record.get("user_agent").asInstanceOf[String])

.setParams(record.get("params").asInstanceOf[java.util.Map[String,
String]])