Thank you, Luciano, Shixiong.

I thought the "_2.11" part referred to the Kafka version - an unfortunate coincidence.

Indeed

   spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar
   my_kafka_streaming_wordcount.py
   OR
   spark-submit --packages
   org.apache.spark:spark-streaming-kafka_2.10:1.5.2
   my_kafka_streaming_wordcount.py

Worked, however, it needed another amendment:

   lines = KafkaUtils.createStream(ssc, "localhost:2181",
   "consumer-group", {"test": 1})

creates the D-stream with no error, but

   lines = KafkaUtils.createDirectStream(ssc, ["test"],
   {"metadata.broker.list":"localhost:1281"})

produces a

   Py4JJavaError: An error occurred while calling o29.createDirectStream.
   : org.apache.spark.SparkException:
   java.nio.channels.ClosedChannelException
   at
   
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at
   
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at
   
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at
   
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
   .... /more lines here ....
   /

I am mentioning it, in case anyone else lands in the list archives trying to run a simple PySpark Streaming, Kafka Hello World.

Best Regards
Chris M.


On 12/18/2015 04:20 AM, Luciano Resende wrote:
Unless you built your own Spark distribution with Scala 2_11, you want to use the 2.10 dependency :

   --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2

On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas <[email protected] <mailto:[email protected]>> wrote:

    Hello,

    I am trying to set up a simple example with Spark Streaming
    (Python) and Kafka on a single machine deployment.
    My Kafka broker/server is also on the same machine
    (localhost:1281) and I am using Spark Version:
    spark-1.5.2-bin-hadoop2.6

    Python code

        ...
        ssc = StreamingContext(sc, 1)
        ...
        lines = KafkaUtils.createDirectStream(ssc, ["test"],
        {"metadata.broker.list":"localhost:1281"})


    So I try

        spark-submit --jars
        spark-streaming-kafka-assembly_2.11-1.5.2.jar
        my_kafka_streaming_wordcount.py

    OR

        spark-submit --packages
        org.apache.spark:spark-streaming-kafka_2.11:1.5.2
        my_kafka_streaming_wordcount.py
        (my kafka version is 2.11-0.9.0.0)

    OR

        pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
        .... [import stuff and type those lines]


    and I end up with:

        15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load
        native-hadoop library for your platform... using builtin-java
        classes where applicable
        15/12/17 19:45:00 WARN MetricsSystem: Using default name
        DAGScheduler for source because spark.app.id
        <http://spark.app.id> is not set.
        Traceback (most recent call last):
          File
        "/I/edited/the/path/here/my_kafka_streaming_wordcount.py",
        line 80, in <module>
            lines = KafkaUtils.createDirectStream(ssc, ["test"],
        {"metadata.broker.list":"localhost:1281"})
          File
        
"/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
        line 130, in createDirectStream
        py4j.protocol.Py4JJavaError: An error occurred while calling
        o29.createDirectStream.
        : java.lang.NoSuchMethodError:
        scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
            at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
            at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
            at
        kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
            at
        
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
            at
        
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
            at
        
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
            at
        
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
            at
        
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at
        
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at
        
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:497)
            at
        py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
            at
        py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
            at py4j.Gateway.invoke(Gateway.java:259)
            at
        py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:207)
            at java.lang.Thread.run(Thread.java:745)

    Am I missing something?

    Thanks in advance
    Chris M.








--
Luciano Resende
http://people.apache.org/~lresende <http://people.apache.org/%7Elresende>
http://twitter.com/lresende1975
http://lresende.blogspot.com/

Reply via email to