I must have been gone mad :) Thanks for pointing it out. I downloaded 1.5.0
assembly jar and added it in SPARK_CLASSPATH.

However, I am getting a new error now

>>> kvs =
KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
ocalhost:9092'})

________________________________________________________________________________
________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the
foll
owing.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages
org.apache.spark:spark-streaming-kafka:1.5.0
...

  2. Download the JAR of the artifact from Maven Central
http://search.maven.org
/,
     Group Id = org.apache.spark, Artifact Id =
spark-streaming-kafka-assembly,
Version = 1.5.0.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________
________________



Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\streaming\kafka.py", line 130, in createDirectStream
    raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtilsP
ythonHelper
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Unknown Source)

>>> os.environ['SPARK_CLASSPATH']
'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>>>


So I launched pyspark with --jars with the assembly jar. Now it is working.

THANK YOU for help.

Curiosity:  Why adding it to SPARK CLASSPATH did not work?

Best
Ayan

On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> I think you're using the wrong version of kafka assembly jar, I think
> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
> you choose Kafka assembly 1.3.0?
>
>
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>
>
>
> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> I have added spark assembly jar to SPARK CLASSPATH
>>
>> >>> print os.environ['SPARK_CLASSPATH']
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>
>>
>> Now  I am facing below issue with a test topic
>>
>> >>> ssc = StreamingContext(sc, 2)
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>> ocalhost:9092'})
>> Traceback (most recent call last):
>>   File "<stdin>", line 1, in <module>
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\kafka.py", line 126, in createDirectStream
>>     jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>> set(topics), jfr
>> omOffsets)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \sql\utils.py", line 36, in deco
>>     return f(*a, **kw)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling
>> o22.createDirectStream.
>>  Trace:
>> py4j.Py4JException: Method createDirectStream([class
>> org.apache.spark.streaming.
>> api.java.JavaStreamingContext, class java.util.HashMap, class
>> java.util.HashSet,
>>  class java.util.HashMap]) does not exist
>>         at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>
>>         at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>
>>         at py4j.Gateway.invoke(Gateway.java:252)
>>         at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>         at java.lang.Thread.run(Unknown Source)
>>
>>
>> >>>
>>
>> Am I doing something wrong?
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to