Re: Sending events to Kafka from spark job
Dear Andy, As far as I understand, the transformations are applied to the RDDs not to the data and I need to send the actual data to Kafka. This way, I think I should perform at least one action to make spark load the data. Kindly correct me if I do not understand this the correct way. Best regards. On 29 March 2016 at 19:40, Andy Davidson <a...@santacruzintegration.com> wrote: > Hi Fanoos > > I would be careful about using collect(). You need to make sure you local > computer has enough memory to hold your entire data set. > > Eventually I will need to do something similar. I have to written the code > yet. My plan is to load the data into a data frame and then write a UDF > that actually publishes the Kafka > > If you are using RDD’s you could use map() or some other transform to > cause the data to be published > > Andy > > From: fanooos <dev.fano...@gmail.com> > Date: Tuesday, March 29, 2016 at 4:26 AM > To: "user @spark" <user@spark.apache.org> > Subject: Re: Sending events to Kafka from spark job > > I think I find a solution but I have no idea how this affects the execution > of the application. > > At the end of the script I added a sleep statement. > > import time > time.sleep(1) > > > This solved the problem. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-from-spark-job-tp26622p26624.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > -- Anas Rabei Senior Software Developer Mubasher.info anas.ra...@mubasher.info
Re: Sending events to Kafka from spark job
Hi Fanoos I would be careful about using collect(). You need to make sure you local computer has enough memory to hold your entire data set. Eventually I will need to do something similar. I have to written the code yet. My plan is to load the data into a data frame and then write a UDF that actually publishes the Kafka If you are using RDD¹s you could use map() or some other transform to cause the data to be published Andy From: fanooos <dev.fano...@gmail.com> Date: Tuesday, March 29, 2016 at 4:26 AM To: "user @spark" <user@spark.apache.org> Subject: Re: Sending events to Kafka from spark job > I think I find a solution but I have no idea how this affects the execution > of the application. > > At the end of the script I added a sleep statement. > > import time > time.sleep(1) > > > This solved the problem. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-fr > om-spark-job-tp26622p26624.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Sending events to Kafka from spark job
I think I find a solution but I have no idea how this affects the execution of the application. At the end of the script I added a sleep statement. import time time.sleep(1) This solved the problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-from-spark-job-tp26622p26624.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sending events to Kafka from spark job
I am trying to read stored tweets in a file and send it to Kafka using Spark python. The code is very simple but it does not work. The spark job runs correctly but nothing sent to Kafka Here is the code /#!/usr/bin/python # -*- coding: utf-8 -*- from pyspark import SparkContext, SparkConf from kafka import KafkaProducer import sys reload(sys) sys.setdefaultencoding('utf8') topic = "testTopic" if __name__ == "__main__": conf = SparkConf().setAppName("TestSparkFromPython") sc = SparkContext(conf=conf) producer = KafkaProducer(bootstrap_servers="10.62.54.111:9092") tweets = sc.textFile("/home/fanooos/Desktop/historical_scripts/output/1/activities_201603270430_201603270440.json") tweetsCollection = tweets.collect() for tweet in tweetsCollection: producer.send('testTopic', value=bytes(tweet))/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-from-spark-job-tp26622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org