Thanks Cody I was able to find out the issue yesterday after sending the last email.
On Friday, September 25, 2015, Cody Koeninger <c...@koeninger.org> wrote: > So you're still having a problem getting partitions or offsets from kafka > when creating the stream. You can try each of those kafka operations > individually (getPartitions / getLatestLeaderOffsets) > > checkErrors should be dealing with an arraybuffer of throwables, not just > a single one. Is that the only error you're seeing, or are there more? > > You can also modify it to call printStackTrace or whatever on each > individual error, instead of only printing the message. > > > > > On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <sourabh3...@gmail.com > <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote: > >> I was able to get pass this issue. I was pointing the SSL port whereas >> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I >> am getting the following error: >> >> Exception in thread "main" org.apache.spark.SparkException: >> java.nio.BufferUnderflowException >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >> at scala.util.Either.fold(Either.scala:97) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309) >> at >> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36) >> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59) >> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> Thanks, >> Sourabh >> >> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <c...@koeninger.org >> <javascript:_e(%7B%7D,'cvml','c...@koeninger.org');>> wrote: >> >>> That looks like the OOM is in the driver, when getting partition >>> metadata to create the direct stream. In that case, executor memory >>> allocation doesn't matter. >>> >>> Allocate more driver memory, or put a profiler on it to see what's >>> taking up heap. >>> >>> >>> >>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote: >>> >>>> Adding Cody and Sriharsha >>>> >>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','sourabh3...@gmail.com');>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and >>>>> am trying to run a spark streaming job to consume data form my broker, but >>>>> I am getting the following error: >>>>> >>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size >>>>> 352518400 >>>>> java.lang.OutOfMemoryError: Java heap space >>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) >>>>> at >>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) >>>>> at >>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) >>>>> at >>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >>>>> at >>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>>> at >>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) >>>>> at >>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83) >>>>> at >>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80) >>>>> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) >>>>> at >>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>> at >>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>>>> at org.apache.spark.streaming.kafka.KafkaCluster.org >>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) >>>>> at >>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296) >>>>> at >>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35) >>>>> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58) >>>>> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>> at >>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) >>>>> at >>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) >>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>>> >>>>> >>>>> >>>>> I have tried allocating 100G of memory with 1 executor but it is still >>>>> failing. >>>>> >>>>> Spark version: 1.2.2 >>>>> Kafka version ported: 0.8.2 >>>>> Kafka server version: trunk version with SSL enabled >>>>> >>>>> Can someone please help me debug this. >>>>> >>>>> Thanks, >>>>> Sourabh >>>>> >>>> >>>> >>> >> >