Thank you, Luciano, Shixiong.
I thought the "_2.11" part referred to the Kafka version - an
unfortunate coincidence.
Indeed
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar
my_kafka_streaming_wordcount.py
OR
spark-submit --packages
org.apache.spark:spark-streaming-kafka_2.10:1.5.2
my_kafka_streaming_wordcount.py
Worked, however, it needed another amendment:
lines = KafkaUtils.createStream(ssc, "localhost:2181",
"consumer-group", {"test": 1})
creates the D-stream with no error, but
lines = KafkaUtils.createDirectStream(ssc, ["test"],
{"metadata.broker.list":"localhost:1281"})
produces a
Py4JJavaError: An error occurred while calling o29.createDirectStream.
: org.apache.spark.SparkException:
java.nio.channels.ClosedChannelException
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
.... /more lines here ....
/
I am mentioning it, in case anyone else lands in the list archives
trying to run a simple PySpark Streaming, Kafka Hello World.
Best Regards
Chris M.
On 12/18/2015 04:20 AM, Luciano Resende wrote:
Unless you built your own Spark distribution with Scala 2_11, you want
to use the 2.10 dependency :
--packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2
On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas
<[email protected] <mailto:[email protected]>> wrote:
Hello,
I am trying to set up a simple example with Spark Streaming
(Python) and Kafka on a single machine deployment.
My Kafka broker/server is also on the same machine
(localhost:1281) and I am using Spark Version:
spark-1.5.2-bin-hadoop2.6
Python code
...
ssc = StreamingContext(sc, 1)
...
lines = KafkaUtils.createDirectStream(ssc, ["test"],
{"metadata.broker.list":"localhost:1281"})
So I try
spark-submit --jars
spark-streaming-kafka-assembly_2.11-1.5.2.jar
my_kafka_streaming_wordcount.py
OR
spark-submit --packages
org.apache.spark:spark-streaming-kafka_2.11:1.5.2
my_kafka_streaming_wordcount.py
(my kafka version is 2.11-0.9.0.0)
OR
pyspark --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
.... [import stuff and type those lines]
and I end up with:
15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java
classes where applicable
15/12/17 19:45:00 WARN MetricsSystem: Using default name
DAGScheduler for source because spark.app.id
<http://spark.app.id> is not set.
Traceback (most recent call last):
File
"/I/edited/the/path/here/my_kafka_streaming_wordcount.py",
line 80, in <module>
lines = KafkaUtils.createDirectStream(ssc, ["test"],
{"metadata.broker.list":"localhost:1281"})
File
"/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
line 130, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling
o29.createDirectStream.
: java.lang.NoSuchMethodError:
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
at
kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Am I missing something?
Thanks in advance
Chris M.
--
Luciano Resende
http://people.apache.org/~lresende <http://people.apache.org/%7Elresende>
http://twitter.com/lresende1975
http://lresende.blogspot.com/