I just noticed that Spark version that I am using (2.0.2) is built with
Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could
this be the reason why we are getting this error?

On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <c...@koeninger.org> wrote:

> So there's no reason to use checkpointing at all, right?  Eliminate
> that as a possible source of problems.
>
> Probably unrelated, but this also isn't a very good way to benchmark.
> Kafka producers are threadsafe, there's no reason to create one for
> each partition.
>
> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
> <11besemja...@seecs.edu.pk> wrote:
> > This is the code that I have been trying is giving me this error. No
> > complicated operation being performed on the topics as far as I can see.
> >
> > class Identity() extends BenchBase {
> >
> >
> >   override def process(lines: DStream[(Long, String)], config:
> > SparkBenchConfig): Unit = {
> >
> >     val reportTopic = config.reporterTopic
> >
> >     val brokerList = config.brokerList
> >
> >
> >     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
> >
> >       val reporter = new KafkaReporter(reportTopic, brokerList)
> >
> >       partLines.foreach{ case (inTime , content) =>
> >
> >         val outTime = System.currentTimeMillis()
> >
> >         reporter.report(inTime, outTime)
> >
> >         if(config.debugMode) {
> >
> >           println("Event: " + inTime + ", " + outTime)
> >
> >         }
> >
> >       }
> >
> >     }))
> >
> >   }
> >
> > }
> >
> >
> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> That's an indication that the beginning offset for a given batch is
> >> higher than the ending offset, i.e. something is seriously wrong.
> >>
> >> Are you doing anything at all odd with topics, i.e. deleting and
> >> recreating them, using compacted topics, etc?
> >>
> >> Start off with a very basic stream over the same kafka topic that just
> >> does foreach println or similar, with no checkpointing at all, and get
> >> that working first.
> >>
> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
> >> <11besemja...@seecs.edu.pk> wrote:
> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
> >> >
> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> >> > <11besemja...@seecs.edu.pk> wrote:
> >> >>
> >> >> I am PhD student at Ohio State working on a study to evaluate
> streaming
> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
> HiBench
> >> >> benchmarks. But I think I am having a problem  with Spark. I have
> Spark
> >> >> Streaming application which I am trying to run on a 5 node cluster
> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
> >> >> whenever I run a Spark Streaming application I encounter the
> following
> >> >> error:
> >> >>
> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
> must
> >> >> not be negative
> >> >>         at scala.Predef$.require(Predef.scala:224)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<
> init>(InputInfoTracker.scala:38)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:165)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> >>         at
> >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> >>         at
> >> >>
> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> >>         at scala.Option.orElse(Option.scala:289)
> >> >>
> >> >> The application starts fine, but as soon as the Kafka producers start
> >> >> emitting the stream data I start receiving the aforementioned error
> >> >> repeatedly.
> >> >>
> >> >> I have tried removing Spark Streaming checkpointing files as has been
> >> >> suggested in similar posts on the internet. However, the problem
> >> >> persists
> >> >> even if I start a Kafka topic and its corresponding consumer Spark
> >> >> Streaming
> >> >> application for the first time. Also the problem could not be offset
> >> >> related
> >> >> as I start the topic for the first time.
> >> >>
> >> >> Although the application seems to be processing the stream properly
> as
> >> >> I
> >> >> can see by the benchmark numbers generated. However, the numbers are
> >> >> way of
> >> >> from what I got for Storm and Flink, suspecting me to believe that
> >> >> there is
> >> >> something wrong with the pipeline and Spark is not able to process
> the
> >> >> stream as cleanly as it should. Any help in this regard would be
> really
> >> >> appreciated.
> >> >
> >> >
> >
> >
>

Reply via email to