What's the Scala version of your Spark? Is it 2.10?

Best Regards,
Shixiong Zhu

2015-12-17 10:10 GMT-08:00 Christos Mantas <cman...@cslab.ece.ntua.gr>:

> Hello,
>
> I am trying to set up a simple example with Spark Streaming (Python) and
> Kafka on a single machine deployment.
> My Kafka broker/server is also on the same machine (localhost:1281) and I
> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>
> Python code
>
> ...
> ssc = StreamingContext(sc, 1)
> ...
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
>
> So I try
>
> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
> my_kafka_streaming_wordcount.py
>
> OR
>
> spark-submit --packages  org.apache.spark:spark-streaming-kafka_2.11:1.5.2
> my_kafka_streaming_wordcount.py
> (my kafka version is 2.11-0.9.0.0)
>
> OR
>
> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar .... [import
> stuff and type those lines]
>
>
> and I end up with:
>
> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> Traceback (most recent call last):
>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80,
> in <module>
>     lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>   File
> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 130, in createDirectStream
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o29.createDirectStream.
> : java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>     at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
>     at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
>     at kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>     at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>     at py4j.Gateway.invoke(Gateway.java:259)
>     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>     at py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at py4j.GatewayConnection.run(GatewayConnection.java:207)
>     at java.lang.Thread.run(Thread.java:745)
>
> Am I missing something?
>
> Thanks in advance
> Chris M.
>
>
>
>
>
>

Reply via email to