I must have been gone mad :) Thanks for pointing it out. I downloaded 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
However, I am getting a new error now >>> kvs = KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l ocalhost:9092'}) ________________________________________________________________________________ ________________ Spark Streaming's Kafka libraries not found in class path. Try one of the foll owing. 1. Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.5.0 ... 2. Download the JAR of the artifact from Maven Central http://search.maven.org /, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.5.0. Then, include the jar in the spark-submit command as $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ... ________________________________________________________________________________ ________________ Traceback (most recent call last): File "<stdin>", line 1, in <module> File "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark \streaming\kafka.py", line 130, in createDirectStream raise e py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsP ythonHelper at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) >>> os.environ['SPARK_CLASSPATH'] 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0' >>> So I launched pyspark with --jars with the assembly jar. Now it is working. THANK YOU for help. Curiosity: Why adding it to SPARK CLASSPATH did not work? Best Ayan On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > I think you're using the wrong version of kafka assembly jar, I think > Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd > better change to version 1.5.0, looks like you're using Spark 1.5.0, why > you choose Kafka assembly 1.3.0? > > > D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar > > > > On Tue, Sep 22, 2015 at 6:41 AM, ayan guha <guha.a...@gmail.com> wrote: > >> Hi >> >> I have added spark assembly jar to SPARK CLASSPATH >> >> >>> print os.environ['SPARK_CLASSPATH'] >> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar >> >> >> Now I am facing below issue with a test topic >> >> >>> ssc = StreamingContext(sc, 2) >> >>> kvs = >> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l >> ocalhost:9092'}) >> Traceback (most recent call last): >> File "<stdin>", line 1, in <module> >> File >> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark >> \streaming\kafka.py", line 126, in createDirectStream >> jstream = helper.createDirectStream(ssc._jssc, kafkaParams, >> set(topics), jfr >> omOffsets) >> File >> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4 >> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__ >> File >> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark >> \sql\utils.py", line 36, in deco >> return f(*a, **kw) >> File >> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4 >> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value >> py4j.protocol.Py4JError: An error occurred while calling >> o22.createDirectStream. >> Trace: >> py4j.Py4JException: Method createDirectStream([class >> org.apache.spark.streaming. >> api.java.JavaStreamingContext, class java.util.HashMap, class >> java.util.HashSet, >> class java.util.HashMap]) does not exist >> at >> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) >> >> at >> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) >> >> at py4j.Gateway.invoke(Gateway.java:252) >> at >> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:207) >> at java.lang.Thread.run(Unknown Source) >> >> >> >>> >> >> Am I doing something wrong? >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha