[ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241577#comment-16241577 ]
Serkan Taş commented on SPARK-19680: ------------------------------------ I have the same issue also on a yarn cluster for a job running 4 days terminated due to the same error : diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18388.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18388.0 (TID 21387, server, executor 1): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-0=33436703} Exception in thread "main" org.apache.spark.SparkException: Application application_fdsfdsfsdfsdf_0001 finished with failed status Hadop : 2.8.0 Spark : 2.1.0 Kafka : 0.10.2.1 Configuration : val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "brokers", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "grp_id", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) > Offsets out of range with no configured reset policy for partitions > ------------------------------------------------------------------- > > Key: SPARK-19680 > URL: https://issues.apache.org/jira/browse/SPARK-19680 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.0 > Reporter: Schakmann Rene > > I'm using spark streaming with kafka to acutally create a toplist. I want to > read all the messages in kafka. So I set > "auto.offset.reset" -> "earliest" > Nevertheless when I start the job on our spark cluster it is not working I > get: > Error: > {code:title=error.log|borderStyle=solid} > Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, > most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, > executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets out of range with no configured reset policy for partitions: > {SearchEvents-2=161803385} > {code} > This is somehow wrong because I did set the auto.offset.reset property > Setup: > Kafka Parameter: > {code:title=Config.Scala|borderStyle=solid} > def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, > Object] = { > Map( > "bootstrap.servers" -> > properties.getProperty("kafka.bootstrap.servers"), > "group.id" -> properties.getProperty("kafka.consumer.group"), > "auto.offset.reset" -> "earliest", > "spark.streaming.kafka.consumer.cache.enabled" -> "false", > "enable.auto.commit" -> "false", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer") > } > {code} > Job: > {code:title=Job.Scala|borderStyle=solid} > def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, > Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: > Broadcast[KafkaSink[TopList]]): Unit = { > getFilteredStream(stream.map(_.value()), windowDuration, > slideDuration).foreachRDD(rdd => { > val topList = new TopList > topList.setCreated(new Date()) > topList.setTopListEntryList(rdd.take(TopListLength).toList) > CurrentLogger.info("TopList length: " + > topList.getTopListEntryList.size().toString) > kafkaSink.value.send(SendToTopicName, topList) > CurrentLogger.info("Last Run: " + System.currentTimeMillis()) > }) > } > def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, > slideDuration: Int): DStream[TopListEntry] = { > val Mapper = MapperObject.readerFor[SearchEventDTO] > result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s)) > .filter(s => s != null && s.getSearchRequest != null && > s.getSearchRequest.getSearchParameters != null && s.getVertical == > Vertical.BAP && > s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName)) > .map(row => { > val name = > row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase() > (name, new TopListEntry(name, 1, row.getResultCount)) > }) > .reduceByKeyAndWindow( > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + > b.getMeanSearchHits), > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - > b.getMeanSearchHits), > Minutes(windowDuration), > Seconds(slideDuration)) > .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L) > .map(row => (row._2.getSearchCount, row._2)) > .transform(rdd => rdd.sortByKey(ascending = false)) > .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, > row._2.getMeanSearchHits / row._2.getSearchCount)) > } > def main(properties: Properties): Unit = { > val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName) > val kafkaSink = > sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties))) > val kafkaParams: Map[String, Object] = > SparkUtil.getDefaultKafkaReceiverParameter(properties) > val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30)) > ssc.checkpoint("/home/spark/checkpoints") > val adEventStream = > KafkaUtils.createDirectStream[String, Array[Byte]](ssc, > PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), > kafkaParams)) > processSearchKeyWords(adEventStream, > SparkUtil.getWindowDuration(properties), > SparkUtil.getSlideDuration(properties), kafkaSink) > ssc.start() > ssc.awaitTermination() > } > {code} > As I saw in the code KafkaUtils > {code:title=Job.Scala|borderStyle=solid} > logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to > none for executor") > kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") > {code} > This means as soon as one worker has a kafka partion that can no be processed > because the offset is not valid anymore due to retention policy the streaming > job will stop working -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org