So there's no reason to use checkpointing at all, right?  Eliminate
that as a possible source of problems.

Probably unrelated, but this also isn't a very good way to benchmark.
Kafka producers are threadsafe, there's no reason to create one for
each partition.

On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
<11besemja...@seecs.edu.pk> wrote:
> This is the code that I have been trying is giving me this error. No
> complicated operation being performed on the topics as far as I can see.
>
> class Identity() extends BenchBase {
>
>
>   override def process(lines: DStream[(Long, String)], config:
> SparkBenchConfig): Unit = {
>
>     val reportTopic = config.reporterTopic
>
>     val brokerList = config.brokerList
>
>
>     lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>
>       val reporter = new KafkaReporter(reportTopic, brokerList)
>
>       partLines.foreach{ case (inTime , content) =>
>
>         val outTime = System.currentTimeMillis()
>
>         reporter.report(inTime, outTime)
>
>         if(config.debugMode) {
>
>           println("Event: " + inTime + ", " + outTime)
>
>         }
>
>       }
>
>     }))
>
>   }
>
> }
>
>
> On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> That's an indication that the beginning offset for a given batch is
>> higher than the ending offset, i.e. something is seriously wrong.
>>
>> Are you doing anything at all odd with topics, i.e. deleting and
>> recreating them, using compacted topics, etc?
>>
>> Start off with a very basic stream over the same kafka topic that just
>> does foreach println or similar, with no checkpointing at all, and get
>> that working first.
>>
>> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> <11besemja...@seecs.edu.pk> wrote:
>> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >
>> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> > <11besemja...@seecs.edu.pk> wrote:
>> >>
>> >> I am PhD student at Ohio State working on a study to evaluate streaming
>> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
>> >> benchmarks. But I think I am having a problem  with Spark. I have Spark
>> >> Streaming application which I am trying to run on a 5 node cluster
>> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> whenever I run a Spark Streaming application I encounter the following
>> >> error:
>> >>
>> >> java.lang.IllegalArgumentException: requirement failed: numRecords must
>> >> not be negative
>> >>         at scala.Predef$.require(Predef.scala:224)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >>         at
>> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> >>         at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> >>         at scala.Option.orElse(Option.scala:289)
>> >>
>> >> The application starts fine, but as soon as the Kafka producers start
>> >> emitting the stream data I start receiving the aforementioned error
>> >> repeatedly.
>> >>
>> >> I have tried removing Spark Streaming checkpointing files as has been
>> >> suggested in similar posts on the internet. However, the problem
>> >> persists
>> >> even if I start a Kafka topic and its corresponding consumer Spark
>> >> Streaming
>> >> application for the first time. Also the problem could not be offset
>> >> related
>> >> as I start the topic for the first time.
>> >>
>> >> Although the application seems to be processing the stream properly as
>> >> I
>> >> can see by the benchmark numbers generated. However, the numbers are
>> >> way of
>> >> from what I got for Storm and Flink, suspecting me to believe that
>> >> there is
>> >> something wrong with the pipeline and Spark is not able to process the
>> >> stream as cleanly as it should. Any help in this regard would be really
>> >> appreciated.
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to