I just noticed that Spark version that I am using (2.0.2) is built with Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could this be the reason why we are getting this error?
On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <c...@koeninger.org> wrote: > So there's no reason to use checkpointing at all, right? Eliminate > that as a possible source of problems. > > Probably unrelated, but this also isn't a very good way to benchmark. > Kafka producers are threadsafe, there's no reason to create one for > each partition. > > On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed > <11besemja...@seecs.edu.pk> wrote: > > This is the code that I have been trying is giving me this error. No > > complicated operation being performed on the topics as far as I can see. > > > > class Identity() extends BenchBase { > > > > > > override def process(lines: DStream[(Long, String)], config: > > SparkBenchConfig): Unit = { > > > > val reportTopic = config.reporterTopic > > > > val brokerList = config.brokerList > > > > > > lines.foreachRDD(rdd => rdd.foreachPartition( partLines => { > > > > val reporter = new KafkaReporter(reportTopic, brokerList) > > > > partLines.foreach{ case (inTime , content) => > > > > val outTime = System.currentTimeMillis() > > > > reporter.report(inTime, outTime) > > > > if(config.debugMode) { > > > > println("Event: " + inTime + ", " + outTime) > > > > } > > > > } > > > > })) > > > > } > > > > } > > > > > > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> That's an indication that the beginning offset for a given batch is > >> higher than the ending offset, i.e. something is seriously wrong. > >> > >> Are you doing anything at all odd with topics, i.e. deleting and > >> recreating them, using compacted topics, etc? > >> > >> Start off with a very basic stream over the same kafka topic that just > >> does foreach println or similar, with no checkpointing at all, and get > >> that working first. > >> > >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed > >> <11besemja...@seecs.edu.pk> wrote: > >> > Update: I am using Spark 2.0.2 and Kafka 0.8.2 with Scala 2.10 > >> > > >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed > >> > <11besemja...@seecs.edu.pk> wrote: > >> >> > >> >> I am PhD student at Ohio State working on a study to evaluate > streaming > >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel > HiBench > >> >> benchmarks. But I think I am having a problem with Spark. I have > Spark > >> >> Streaming application which I am trying to run on a 5 node cluster > >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However, > >> >> whenever I run a Spark Streaming application I encounter the > following > >> >> error: > >> >> > >> >> java.lang.IllegalArgumentException: requirement failed: numRecords > must > >> >> not be negative > >> >> at scala.Predef$.require(Predef.scala:224) > >> >> at > >> >> > >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.< > init>(InputInfoTracker.scala:38) > >> >> at > >> >> > >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute( > DirectKafkaInputDStream.scala:165) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >> >> at > >> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream. > createRDDWithLocalProperties(DStream.scala:415) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:335) > >> >> at > >> >> > >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:333) > >> >> at scala.Option.orElse(Option.scala:289) > >> >> > >> >> The application starts fine, but as soon as the Kafka producers start > >> >> emitting the stream data I start receiving the aforementioned error > >> >> repeatedly. > >> >> > >> >> I have tried removing Spark Streaming checkpointing files as has been > >> >> suggested in similar posts on the internet. However, the problem > >> >> persists > >> >> even if I start a Kafka topic and its corresponding consumer Spark > >> >> Streaming > >> >> application for the first time. Also the problem could not be offset > >> >> related > >> >> as I start the topic for the first time. > >> >> > >> >> Although the application seems to be processing the stream properly > as > >> >> I > >> >> can see by the benchmark numbers generated. However, the numbers are > >> >> way of > >> >> from what I got for Storm and Flink, suspecting me to believe that > >> >> there is > >> >> something wrong with the pipeline and Spark is not able to process > the > >> >> stream as cleanly as it should. Any help in this regard would be > really > >> >> appreciated. > >> > > >> > > > > > >