[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Schakmann Rene updated SPARK-19680:
-----------------------------------
    Description: 
I'm using spark streaming with kafka to acutally create a toplist. I want to 
read all the messages in kafka. So I set

   "auto.offset.reset" -> "earliest"

Nevertheless when I start the job on our spark cluster it is not working I get:

Error:

        Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{SearchEvents-2=161803385}

This is somehow wrong because i did set the auto.offset.reset property

Setup:

Kafka Parameter:

  def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
Object] = {
    Map(
      "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
      "group.id" -> properties.getProperty("kafka.consumer.group"),
      "auto.offset.reset" -> "earliest",
      "spark.streaming.kafka.consumer.cache.enabled" -> "false",
      "enable.auto.commit" -> "false",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
  }

Job:

{code:title=Bar.java|borderStyle=solid}
  def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
Broadcast[KafkaSink[TopList]]): Unit = {
    getFilteredStream(stream.map(_.value()), windowDuration, 
slideDuration).foreachRDD(rdd => {
      val topList = new TopList
      topList.setCreated(new Date())
      topList.setTopListEntryList(rdd.take(TopListLength).toList)
      CurrentLogger.info("TopList length: " + 
topList.getTopListEntryList.size().toString)
      kafkaSink.value.send(SendToTopicName, topList)
      CurrentLogger.info("Last Run: " + System.currentTimeMillis())
    })

  }
{code}
  def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
slideDuration: Int): DStream[TopListEntry] = {

    val Mapper = MapperObject.readerFor[SearchEventDTO]

    result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
      .filter(s => s != null && s.getSearchRequest != null && 
s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP 
&& 
s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
      .map(row => {
        val name = 
row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
        (name, new TopListEntry(name, 1, row.getResultCount))
      })
      .reduceByKeyAndWindow(
        (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
        (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
        Minutes(windowDuration),
        Seconds(slideDuration))
      .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
      .map(row => (row._2.getSearchCount, row._2))
      .transform(rdd => rdd.sortByKey(ascending = false))
      .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
row._2.getMeanSearchHits / row._2.getSearchCount))
  }

  def main(properties: Properties): Unit = {

    val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
    val kafkaSink = 
sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
    val kafkaParams: Map[String, Object] = 
SparkUtil.getDefaultKafkaReceiverParameter(properties)
    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))

    ssc.checkpoint("/home/spark/checkpoints")
    val adEventStream =
      KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, 
Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams))

    processSearchKeyWords(adEventStream, 
SparkUtil.getWindowDuration(properties), 
SparkUtil.getSlideDuration(properties), kafkaSink)

    ssc.start()
    ssc.awaitTermination()

  }




  was:
I'm using spark streaming with kafka to acutally create a toplist. I want to 
read all the messages in kafka. So I set

   "auto.offset.reset" -> "earliest"

Nevertheless when I start the job on our spark cluster it is not working I get:

Error:

        Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{SearchEvents-2=161803385}

This is somehow wrong because i did set the auto.offset.reset property

Setup:

Kafka Parameter:

  def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
Object] = {
    Map(
      "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
      "group.id" -> properties.getProperty("kafka.consumer.group"),
      "auto.offset.reset" -> "earliest",
      "spark.streaming.kafka.consumer.cache.enabled" -> "false",
      "enable.auto.commit" -> "false",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
  }

Job:


  def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
Broadcast[KafkaSink[TopList]]): Unit = {
    getFilteredStream(stream.map(_.value()), windowDuration, 
slideDuration).foreachRDD(rdd => {
      val topList = new TopList
      topList.setCreated(new Date())
      topList.setTopListEntryList(rdd.take(TopListLength).toList)
      CurrentLogger.info("TopList length: " + 
topList.getTopListEntryList.size().toString)
      kafkaSink.value.send(SendToTopicName, topList)
      CurrentLogger.info("Last Run: " + System.currentTimeMillis())
    })

  }

  def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
slideDuration: Int): DStream[TopListEntry] = {

    val Mapper = MapperObject.readerFor[SearchEventDTO]

    result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
      .filter(s => s != null && s.getSearchRequest != null && 
s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP 
&& 
s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
      .map(row => {
        val name = 
row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
        (name, new TopListEntry(name, 1, row.getResultCount))
      })
      .reduceByKeyAndWindow(
        (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
        (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
        Minutes(windowDuration),
        Seconds(slideDuration))
      .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
      .map(row => (row._2.getSearchCount, row._2))
      .transform(rdd => rdd.sortByKey(ascending = false))
      .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
row._2.getMeanSearchHits / row._2.getSearchCount))
  }

  def main(properties: Properties): Unit = {

    val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
    val kafkaSink = 
sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
    val kafkaParams: Map[String, Object] = 
SparkUtil.getDefaultKafkaReceiverParameter(properties)
    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))

    ssc.checkpoint("/home/spark/checkpoints")
    val adEventStream =
      KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, 
Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams))

    processSearchKeyWords(adEventStream, 
SparkUtil.getWindowDuration(properties), 
SparkUtil.getSlideDuration(properties), kafkaSink)

    ssc.start()
    ssc.awaitTermination()

  }





> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
>       Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> This is somehow wrong because i did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
>     Map(
>       "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> Job:
> {code:title=Bar.java|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
>     getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
> {code}
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, 
> PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), 
> kafkaParams))
>     processSearchKeyWords(adEventStream, 
> SparkUtil.getWindowDuration(properties), 
> SparkUtil.getSlideDuration(properties), kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to