Any solutions for this problem please . Sent from my iPhone
> On Jan 17, 2018, at 10:39 PM, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> > wrote: > > Hi, > > I have created a streaming object from checkpoint but it always through up > error as stream corrupted when I restart spark streaming job. any solution > for this? > > private def createStreamingContext( > sparkCheckpointDir: String, sparkSession: SparkSession, > batchDuration: Int, config: com.typesafe.config.Config) = { > val topics = config.getString(Constants.Properties.KafkaTopics) > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > config.getString(Constants.Properties.KafkaBrokerList)) > val ssc = new StreamingContext(sparkSession.sparkContext, > Seconds(batchDuration)) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) > val datapointDStream = > messages.map(_._2).map(TransformDatapoint.parseDataPointText) > lazy val sqlCont = sparkSession.sqlContext > > hiveDBInstance = config.getString("hiveDBInstance") > > TransformDatapoint.readDstreamData(sparkSession, sqlCont, > datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, > fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance) > > ssc.checkpoint(sparkCheckpointDir) > ssc > } > > > > // calling streming context method > > val streamingContext = > StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir), > () => > createStreamingContext(config.getString(Constants.Properties.CheckPointDir), > sparkSession, config.getInt(Constants.Properties.BatchInterval), config)) > > ERROR: > org.apache.spark.SparkException: Failed to read checkpoint from directory > hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC > > java.io.IOException: Stream is corrupted > > > Thanks, > Asmath