Hi,
I have a simple application which fails with the following exception
only when the application is restarted (i.e. the checkpointDir has
entires from a previous execution):
Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
4)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
ream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
ala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
e.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
ly(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
cala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
la:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
la:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
:512)
at
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
cala:115)
at
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
orwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
at com.brightcove.analytics.tacoma.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
The relavant source is:
class RawLogProcessor(ssc: StreamingContext, topic: String,
kafkaParams: Map[String, String]) {
// create kafka stream
val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
//KafkaUtils.createStream[String, Object, StringDecoder,
KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10),
StorageLevel.MEMORY_AND_DISK_2)
val eventStream = rawlogDStream
.map({
case (key, rawlogVal) =>
val record = rawlogVal.asInstanceOf[GenericData.Record]
val rlog = RawLog.newBuilder()
.setId(record.get("id").asInstanceOf[String])
.setAccount(record.get("account").asInstanceOf[String])
.setEvent(record.get("event").asInstanceOf[String])
.setTimestamp(record.get("timestamp").asInstanceOf[Long])
.setUserAgent(record.get("user_agent").asInstanceOf[String])
.setParams(record.get("params").asInstanceOf[java.util.Map[String,
String]])
.build()
val norm = Normalizer(rlog)
(key, rlog.getEvent, norm)
})
val videoViewStream = eventStream
.filter(_._2 == "video_view")
.filter(_._3.isDefined)
.map((z) => (z._1, z._3.get))
.map((z) => (z._1, z._2.asInstanceOf[VideoView]))
.cache()
// repartition by (deviceType, DeviceOS)
val deviceTypeVideoViews = videoViewStream.map((v) =>
((v._2.getDeviceType, v._2.getDeviceOs), 1))
.reduceByKeyAndWindow(_ + _, Durations.seconds(10))
.print()
}
object RawLogProcessor extends Logging {
/**
* If str is surrounded by quotes it return the content between the
quotes
*/
def unquote(str: String) = {
if (str != null && str.length >= 2 && str.charAt(0) == '\"' &&
str.charAt(str.length - 1) == '\"')
str.substring(1, str.length - 1)
else
str
}
val checkpointDir = "/tmp/checkpointDir_tacoma"
var sparkConfig: Config = _
var ssc: StreamingContext = _
var processor: Option[RawLogProcessor] = None
val createContext: () => StreamingContext = () => {
val batchDurationSecs =
sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[VideoView],
classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
sparkConfig.entrySet.asScala
.map(kv => kv.getKey -> kv.getValue)
.foreach {
case (k, v) =>
val value = unquote(v.render())
logInfo(s"spark.$k = $value")
sparkConf.set(s"spark.$k", value)
}
// calculate sparkContext and streamingContext
new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs))
}
def createProcessor(sparkConf: Config, kafkaConf: Config):
RawLogProcessor = {
sparkConfig = sparkConf
ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir,
creatingFunc = createContext, createOnError = true)
ssc.checkpoint(checkpointDir)
// kafkaProperties
val kafkaParams = kafkaConf.entrySet.asScala
.map(kv => kv.getKey -> unquote(kv.getValue.render()))
.toMap
logInfo(s"Initializing kafkaParams = $kafkaParams")
// create processor
new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"),
kafkaParams)
}
def apply(sparkConfig: Config, kafkaConf: Config) = {
if (processor.isEmpty) {
processor = Some(createProcessor(sparkConfig, kafkaConf))
}
processor.get
}
def start() = {
ssc.start()
ssc.awaitTermination()
}
}
Extended logs:
https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9
6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log
Could someone tell me what it causes this problem? I tried looking at
the stacktrace but I am not very familiar with the codebase to make
solid assertions.
Any ideas as to what may be happening here.
--- Ankur Chauhan
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]