Re: kafkaDirectStream usage error

2016-02-05 Thread Cody Koeninger
2 things:

- you're only attempting to read from a single TopicAndPartition.  Since
your topic has multiple partitions, this probably isn't what you want

- you're getting an offset out of range exception because the offset you're
asking for doesn't exist in kafka.

Use the other createDirectStream method (the one that takes a set of
topics, not a map of topicpartitions to offset), at least until you get an
understanding for how things work.



On Thu, Feb 4, 2016 at 7:58 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> I am  using  below  directsream to consume  messages from kafka . Topic
> has 8 partitions.
>
>  val topicAndPart =  OffsetRange.create("request5",0,
> 1,10).topicAndPartition()
> val fromOffsets =
> Map[kafka.common.TopicAndPartition,Long](topicAndPart->0)
> val messageHandler = (mmd : MessageAndMetadata[String,String]) =>
> (mmd.key(),mmd.message())
> val k1 =
> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
> kafkaParams, fromOffsets,messageHandler)
>
> I am  getting  below  error . Any  idea  where  I am doing  wrong  .
> Please  help .
>
> 6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0
> (TID 2, datanode3.isdp.com): UnknownReason
> 16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in
> stage 0.0 (TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
> 16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception
> could not be deserialized
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> 

kafkaDirectStream usage error

2016-02-04 Thread Diwakar Dhanuskodi
I am  using  below  directsream to consume  messages from kafka . Topic has 8 
partitions.

 val topicAndPart =  OffsetRange.create("request5",0, 
1,10).topicAndPartition()
    val fromOffsets = Map[kafka.common.TopicAndPartition,Long](topicAndPart->0)
    val messageHandler = (mmd : MessageAndMetadata[String,String]) => 
(mmd.key(),mmd.message())
    val k1 = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
 kafkaParams, fromOffsets,messageHandler)

I am  getting  below  error . Any  idea  where  I am doing  wrong  . Please  
help .

6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 
2, datanode3.isdp.com): UnknownReason
16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 
(TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception 
could not be deserialized
java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at 
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
    at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/02/04 21:04:38 ERROR scheduler.TaskResultGetter: Could not deserialize 
TaskEndReason: ClassNotFound with classloader 
org.apache.spark.util.MutableURLClassLoader@7202dc8c
16/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 
(TID 3, datanode3.isdp.com): UnknownReason
16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 
(TID 4, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception 
could not be deserialized
java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
    at