Hi, I am trying to consume from Kafka topics following 
http://spark.apache.org/docs/latest/streaming-kafka-integration.html Approach 
one(createStream). I am not able to write it to local text file using 
saveAsTextFiles() function. Below is the code

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)
zkQuorum, topic = 'localhost:9092', 'python-kafka'

kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1})
lines = kafka_stream.map(lambda x: x[1])

kafka_stream.saveAsTextFiles('file:///home/puneett/')

When I access the consumer I get following output

[puneett@gb-slo-svb-0255 ~]$ 
/nfs/science/shared/kafka/kafka/bin/kafka-console-consumer.sh --topic 
python-kafka --property schema.registry.url="http://localhost:9092"; --zookeeper 
localhost:2182 --from-beginning
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/nfs/science/shared/kafka/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There are 10000 more similar kafka test message produced
There

Please can someone suggest what am I doing wrong?
Regards,
Puneet
dunnhumby limited is a limited company registered in England and Wales with 
registered number 02388853 and VAT registered number 927 5871 83. Our 
registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The 
contents of this message and any attachments to it are confidential and may be 
legally privileged. If you have received this message in error you should 
delete it from your system immediately and advise the sender. dunnhumby may 
monitor and record all emails. The views expressed in this email are those of 
the sender and not those of dunnhumby.

Reply via email to