Here is the code snippet, starting line 365 in KafkaCluster.scala:

type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
    errs => throw new SparkException(errs.mkString("Throwing this errir\n")),
    ok => ok

On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak <>

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>         at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>         at java.lang.reflect.Method.invoke(
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Thanks,
> Sourabh
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <>
> wrote:
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <>
>> wrote:
>>> Adding Cody and Sriharsha
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <>
>>> wrote:
>>>> Hi,
>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>> am trying to run a spark streaming job to consume data form my broker, but
>>>> I am getting the following error:
>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>> 352518400
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.nio.HeapByteBuffer.<init>(
>>>>         at java.nio.ByteBuffer.allocate(
>>>>         at
>>>>         at
>>>>         at
>>>>         at
>>>>         at
>>>>         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>         at
>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>         at
>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>         at
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>         at
>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>         at java.lang.reflect.Method.invoke(
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>         at
>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>> failing.
>>>> Spark version: 1.2.2
>>>> Kafka version ported: 0.8.2
>>>> Kafka server version: trunk version with SSL enabled
>>>> Can someone please help me debug this.
>>>> Thanks,
>>>> Sourabh

Reply via email to