Here is the code snippet, starting line 365 in KafkaCluster.scala: type Err = ArrayBuffer[Throwable]
/** If the result is right, return it, otherwise throw SparkException */ def checkErrors[T](result: Either[Err, T]): T = { result.fold( errs => throw new SparkException(errs.mkString("Throwing this errir\n")), ok => ok ) } On Thu, Sep 24, 2015 at 3:00 PM, Sourabh Chandak <sourabh3...@gmail.com> wrote: > I was able to get pass this issue. I was pointing the SSL port whereas > SimpleConsumer should point to the PLAINTEXT port. But after fixing that I > am getting the following error: > > Exception in thread "main" org.apache.spark.SparkException: > java.nio.BufferUnderflowException > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at scala.util.Either.fold(Either.scala:97) > at > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309) > at > org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36) > at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59) > at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > Thanks, > Sourabh > > On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> That looks like the OOM is in the driver, when getting partition metadata >> to create the direct stream. In that case, executor memory allocation >> doesn't matter. >> >> Allocate more driver memory, or put a profiler on it to see what's taking >> up heap. >> >> >> >> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3...@gmail.com> >> wrote: >> >>> Adding Cody and Sriharsha >>> >>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and >>>> am trying to run a spark streaming job to consume data form my broker, but >>>> I am getting the following error: >>>> >>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size >>>> 352518400 >>>> java.lang.OutOfMemoryError: Java heap space >>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) >>>> at >>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) >>>> at >>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) >>>> at >>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >>>> at >>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>> at >>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) >>>> at >>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83) >>>> at >>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80) >>>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at >>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>>> at org.apache.spark.streaming.kafka.KafkaCluster.org >>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296) >>>> at >>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35) >>>> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58) >>>> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) >>>> at >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) >>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>> >>>> >>>> >>>> I have tried allocating 100G of memory with 1 executor but it is still >>>> failing. >>>> >>>> Spark version: 1.2.2 >>>> Kafka version ported: 0.8.2 >>>> Kafka server version: trunk version with SSL enabled >>>> >>>> Can someone please help me debug this. >>>> >>>> Thanks, >>>> Sourabh >>>> >>> >>> >> >