Re: kafkaDirectStream usage error
2 things: - you're only attempting to read from a single TopicAndPartition. Since your topic has multiple partitions, this probably isn't what you want - you're getting an offset out of range exception because the offset you're asking for doesn't exist in kafka. Use the other createDirectStream method (the one that takes a set of topics, not a map of topicpartitions to offset), at least until you get an understanding for how things work. On Thu, Feb 4, 2016 at 7:58 PM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > I am using below directsream to consume messages from kafka . Topic > has 8 partitions. > > val topicAndPart = OffsetRange.create("request5",0, > 1,10).topicAndPartition() > val fromOffsets = > Map[kafka.common.TopicAndPartition,Long](topicAndPart->0) > val messageHandler = (mmd : MessageAndMetadata[String,String]) => > (mmd.key(),mmd.message()) > val k1 = > KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, > kafkaParams, fromOffsets,messageHandler) > > I am getting below error . Any idea where I am doing wrong . > Please help . > > 6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 > (TID 2, datanode3.isdp.com): UnknownReason > 16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in > stage 0.0 (TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes) > 16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception > could not be deserialized > java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at >
kafkaDirectStream usage error
I am using below directsream to consume messages from kafka . Topic has 8 partitions. val topicAndPart = OffsetRange.create("request5",0, 1,10).topicAndPartition() val fromOffsets = Map[kafka.common.TopicAndPartition,Long](topicAndPart->0) val messageHandler = (mmd : MessageAndMetadata[String,String]) => (mmd.key(),mmd.message()) val k1 = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams, fromOffsets,messageHandler) I am getting below error . Any idea where I am doing wrong . Please help . 6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 2, datanode3.isdp.com): UnknownReason 16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes) 16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception could not be deserialized java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/02/04 21:04:38 ERROR scheduler.TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@7202dc8c 16/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 3, datanode3.isdp.com): UnknownReason 16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 4, datanode3.isdp.com, RACK_LOCAL, 2273 bytes) 16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception could not be deserialized java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException at