[ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu updated SPARK-19680: --------------------------------- Component/s: (was: Structured Streaming) DStreams > Offsets out of range with no configured reset policy for partitions > ------------------------------------------------------------------- > > Key: SPARK-19680 > URL: https://issues.apache.org/jira/browse/SPARK-19680 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.0 > Reporter: Schakmann Rene > > I'm using spark streaming with kafka to acutally create a toplist. I want to > read all the messages in kafka. So I set > "auto.offset.reset" -> "earliest" > Nevertheless when I start the job on our spark cluster it is not working I > get: > Error: > {code:title=error.log|borderStyle=solid} > Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, > most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, > executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets out of range with no configured reset policy for partitions: > {SearchEvents-2=161803385} > {code} > This is somehow wrong because I did set the auto.offset.reset property > Setup: > Kafka Parameter: > {code:title=Config.Scala|borderStyle=solid} > def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, > Object] = { > Map( > "bootstrap.servers" -> > properties.getProperty("kafka.bootstrap.servers"), > "group.id" -> properties.getProperty("kafka.consumer.group"), > "auto.offset.reset" -> "earliest", > "spark.streaming.kafka.consumer.cache.enabled" -> "false", > "enable.auto.commit" -> "false", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer") > } > {code} > Job: > {code:title=Job.Scala|borderStyle=solid} > def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, > Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: > Broadcast[KafkaSink[TopList]]): Unit = { > getFilteredStream(stream.map(_.value()), windowDuration, > slideDuration).foreachRDD(rdd => { > val topList = new TopList > topList.setCreated(new Date()) > topList.setTopListEntryList(rdd.take(TopListLength).toList) > CurrentLogger.info("TopList length: " + > topList.getTopListEntryList.size().toString) > kafkaSink.value.send(SendToTopicName, topList) > CurrentLogger.info("Last Run: " + System.currentTimeMillis()) > }) > } > def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, > slideDuration: Int): DStream[TopListEntry] = { > val Mapper = MapperObject.readerFor[SearchEventDTO] > result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s)) > .filter(s => s != null && s.getSearchRequest != null && > s.getSearchRequest.getSearchParameters != null && s.getVertical == > Vertical.BAP && > s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName)) > .map(row => { > val name = > row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase() > (name, new TopListEntry(name, 1, row.getResultCount)) > }) > .reduceByKeyAndWindow( > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + > b.getMeanSearchHits), > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - > b.getMeanSearchHits), > Minutes(windowDuration), > Seconds(slideDuration)) > .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L) > .map(row => (row._2.getSearchCount, row._2)) > .transform(rdd => rdd.sortByKey(ascending = false)) > .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, > row._2.getMeanSearchHits / row._2.getSearchCount)) > } > def main(properties: Properties): Unit = { > val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName) > val kafkaSink = > sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties))) > val kafkaParams: Map[String, Object] = > SparkUtil.getDefaultKafkaReceiverParameter(properties) > val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30)) > ssc.checkpoint("/home/spark/checkpoints") > val adEventStream = > KafkaUtils.createDirectStream[String, Array[Byte]](ssc, > PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), > kafkaParams)) > processSearchKeyWords(adEventStream, > SparkUtil.getWindowDuration(properties), > SparkUtil.getSlideDuration(properties), kafkaSink) > ssc.start() > ssc.awaitTermination() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org