Kafka-based Spark Streaming and Vertex AI for Sentiment Analysis

2024-02-21 Thread Mich Talebzadeh
I am working on a pet project to implement a real-time sentiment analysis
system for analyzing customer reviews. It leverages Kafka for data
ingestion, Spark Structured Streaming (SSS) for real-time processing, and
Vertex AI for sentiment analysis and potential action triggers.

*Features*

   - Real-time processing of customer reviews using SSS.
   - Sentiment analysis using pre-assigned labels or Vertex AI
   models.
   - Integration with Vertex AI for model deployment and prediction serving.
   - Potential actions based on sentiment analysis results
   (e.g., notifications, database updates).


*Tech stack*

   - Kafka: Stream processing platform for data ingestion.
   - SSS for real-time data processing on incoming messages with cleansing
   - Vertex AI: Machine learning platform for model training


I have created sample Json data with relevant attributes for product review as
shown below

{
  "rowkey": "7de43681-0e4a-45cb-ad40-5f14f5678333",
  "product_id": "product-id-1616",
  "timereported": "2024-02-21T08:46:40",
  "description": "Easy to use and setup, perfect for beginners.",
  "price": GBP507,
  "sentiment": negative,
  "product_category": "Electronics",
  "customer_id": "customer4",
  "location": "UK",
  "rating": 6,
  "review_text": "Sleek and modern design, but lacking some features.",
  "user_feedback": "Negative",
  "review_source": "online",
  "sentiment_confidence": 0.33,
  "product_features": "user-friendly",
  "timestamp": "",
  "language": "English"
},

I also attached a high level diagram. There is recently a demand for Gemini
usage. Your views are appreciated.


Thanks

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* I am an architect and not a data scientist. The information
provided is correct to the best of my knowledge but of course cannot be
guaranteed . It is essential to note that, as with any advice, quote "one test
result is worth one-thousand expert opinions (Werner
Von Braun
)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Kafka to spark streaming

2022-01-30 Thread Gourav Sengupta
Hi Amit,

before answering your question, I am just trying to understand it.

I am not exactly clear how do the Akka application, Kafka and SPARK
Streaming application sit together, and what are you exactly trying to
achieve?

Can you please elaborate?

Regards,
Gourav


On Fri, Jan 28, 2022 at 10:14 PM Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Re: Kafka to spark streaming

2022-01-29 Thread Amit Sharma
Thanks Mich. The link you shared have two options Kafka and Socket only.


Thanks
Amit

On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh 
wrote:

> So you have a classic architecture with spark receiving events through a
> kafka topic via kafka-spark-connector, do something with it and send data
> out to the consumer. Are you using Spark structured streaming here with
> batch streaming? check
>
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:
>
>> Hello everyone, we have spark streaming application. We send request to
>> stream through Akka actor using Kafka topic. We wait for response as it is
>> real time. Just want a suggestion is there any better option like Livy
>> where we can send and receive request to spark streaming.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Kafka to spark streaming

2022-01-29 Thread Mich Talebzadeh
So you have a classic architecture with spark receiving events through a
kafka topic via kafka-spark-connector, do something with it and send data
out to the consumer. Are you using Spark structured streaming here with
batch streaming? check

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:

> Hello everyone, we have spark streaming application. We send request to
> stream through Akka actor using Kafka topic. We wait for response as it is
> real time. Just want a suggestion is there any better option like Livy
> where we can send and receive request to spark streaming.
>
>
> Thanks
> Amit
>


Kafka to spark streaming

2022-01-28 Thread Amit Sharma
Hello everyone, we have spark streaming application. We send request to
stream through Akka actor using Kafka topic. We wait for response as it is
real time. Just want a suggestion is there any better option like Livy
where we can send and receive request to spark streaming.


Thanks
Amit


Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Gabor Somogyi
Hi Davide,

Please see the doc:
*Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10
connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio 
wrote:

> Hi,
>
> I'm trying to use Spark Streaming with a very simple script like this:
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
>
> sc = SparkContext(appName="PythonSparkStreamingKafka")
>
>
> ssc = StreamingContext(sc, 1)
> kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
>"auto.offset.reset": "smallest"}
>
> training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
>
> training.pprint()
>
> ssc.start()
> ssc.awaitTermination()
>
> But although locally it works, with the cluster using Standalone mode it
> crashes. I have a cluster with 4 machines:
>
> 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
> 1 machine is the driver
> 2 machines are the workers.
>
> The strange thing is that when I had Kafka Producer, Broker and Zookeeper
> in the same machine in which I have the driver, it worked both locally and
> in the cluster. But obviously for the sake of scalability and modularity
> I'd like to use the current configuration.
>
> I'm using Spark 2.4.6, the Kafka Streaming API are
> "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that
> I'm currently using is kafka_2.11-2.4.1
>
> The result is the following:
>
> 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, 172.31.69.185, executor 0):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1
> in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in
> stage 0.0 (TID 1) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 1]
> 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2
> in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in
> stage 0.0 (TID 2) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 2]
> 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3
> in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time
> 1595584106000 ms
> 

Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Davide Curcio
Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
   "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it 
crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in 
the same machine in which I have the driver, it worked both locally and in the 
cluster. But obviously for the sake of scalability and modularity I'd like to 
use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are 
"spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm 
currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in 
stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 
0.0 (TID 1) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in 
stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 
0.0 (TID 2) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in 
stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 
1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 
in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 
1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 
0.0 (TID 3) on 172.31.79.221, executor 1: 
java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 
failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedul

Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Arpan Rajani
Hi Gerard,

Excellent, indeed your inputs helped. Thank you for the quick reply.

I modified the code based on inputs.

Now the application starts and it reads from the topic. Now we stream like
50,000 messages on the Kafka topic.

After a while we terminate the application using YARN kill and check how
many messages were written in HBase (say 9,000),
Then we restart the application and wait for messages to get picked up from
the topic, application does not read anything from the topic, that
means *Streaming
Application fails to get the correct offset from where it should start*. (
this was not the case with checkpointing mechanism, where I could see all
50,000 messages after restart).

What do you think is missing in this?

Following is the improved code based on previous inputs

//create Spark Streaming Context

val stream:InputDStream[ConsumerRecord[String,String]] =
KafkaUtil.createDirectStream())

// Modified based on inputs

stream.foreachRDD { rdd  =>

 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

}

val resultStream = stream.map ( x => (“tpicName”,x.value)).filter( x =>
!x._2.trim.isEmpty)

resultStream.foreach { rdd =>

  processRDD(rdd) // this stores messages in HBase

  //commit offsets using original stream.

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

ssc.start()

ssc.awaitTermination()

.
Could you please help me, figuring out what is missing here?

Many thanks,
Arpan



On Fri, Oct 13, 2017 at 3:27 PM, Gerard Maas  wrote:

> Hi Arpan,
>
> The error suggests that the streaming context has been started with
> streamingContext.start() and after that statement, some other
> dstream operations have been attempted.
> A suggested pattern to manage the offsets is the following:
>
> var offsetRanges: Array[OffsetRanger] = _
>
> //create streaming context, streams, ...
> // as first operation after the stream has been created, do:
>
> stream.foreachRDD { rdd =>
>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> }
> //Then do other desired operations on the streaming data
> val resultStream = stream.map(...).filter(...).transform(...)
> //materialize the resulting stream
>
> resultStream.foreachRDD{rdd =>
> // do stuff... write to a db, to a kafka topic,... whatever,...
>
> //at the end of the process, commit the offsets (note that I use the
> original stream instance, not `resultStream`
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
> I hope this helps,
>
> kr, Gerard.
>
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:
>
>> Hi all,
>>
>> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
>> store the offsets in Kafka in order to achieve restartability of the
>> streaming application. ( Using checkpoints, I already implemented, we will
>> require to change code in production hence checkpoint won't work)
>>
>> Checking Spark Streaming documentation- Storing offsets on Kafka approach
>> :
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-int
>> egration.html#kafka-itself, which describes :
>>
>> stream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>   // some time later, after outputs have completed
>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> }
>>
>>
>> Based on this, I modified the code like following:
>>
>> val kafkaMap:Map[String,Object] = KakfaConfigs
>>
>> val stream:InputDStream[ConsumerRecord[String,String]] = 
>> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
>> (Array("topicName"),kafkaMap))
>>
>> stream.foreach { rdd =>
>> val offsetRangers : Array[OffsetRanger] = 
>> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>>
>> // Filter out the values which have empty values and get the tuple of 
>> type
>> // ( topicname, stringValue_read_from_kafka_topic)
>> stream.map(x => ("topicName",x.value)).filter(x=> 
>> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>>
>> // Sometime later, after outputs have completed.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> }
>>
>>
>> def processRDD(rdd:RDD[(String,String)]) {
>>  // Process futher to hdfs
>> }
>>
>> Now, When I try to start Streaming application, it does not start and
>> looking at the logs, here is what we see :
>>
>> java.lang.IllegalStateException: Adding new inputs, transformations, and 
>> output operations after starting a context is not supported
>> at 
>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
>> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>>
>>
>> Can anyone suggest, or help to understand what are we missing here?
>>
>>
>> Regards,
>> Arpan
>>
>
>


Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Gerard Maas
Hi Arpan,

The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.











On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:

> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] = 
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
> val offsetRangers : Array[OffsetRanger] = 
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
> // Filter out the values which have empty values and get the tuple of type
> // ( topicname, stringValue_read_from_kafka_topic)
> stream.map(x => ("topicName",x.value)).filter(x=> 
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
> // Sometime later, after outputs have completed.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
>  // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and 
> output operations after starting a context is not supported
> at 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>


Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Arpan Rajani
Hi all,

In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store
the offsets in Kafka in order to achieve restartability of the streaming
application. ( Using checkpoints, I already implemented, we will require to
change code in production hence checkpoint won't work)

Checking Spark Streaming documentation- Storing offsets on Kafka approach :

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself,
which describes :

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


Based on this, I modified the code like following:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] =
KafkaUtil.createDirectStream(ssc, PreferConsistent,
Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
val offsetRangers : Array[OffsetRanger] =
rdd.asInstanceOf[HasOffsetRangers].offsetRanges

// Filter out the values which have empty values and get the tuple of type
// ( topicname, stringValue_read_from_kafka_topic)
stream.map(x => ("topicName",x.value)).filter(x=>
!x._2.trim.isEmpty).foreachRDD(processRDD _)

// Sometime later, after outputs have completed.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs
}

Now, When I try to start Streaming application, it does not start and
looking at the logs, here is what we see :

java.lang.IllegalStateException: Adding new inputs, transformations,
and output operations after starting a context is not supported
at 
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)


Can anyone suggest, or help to understand what are we missing here?


Regards,
Arpan


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-11 Thread Gaurav Pandya
Thank you very much guys. My question may sound little bit off but was
somewhat confused so wanted to get some expert advice on this. I will take
a look at the links mentioned in the replies. I really appreciate your
suggestions. These are the kind of answers I needed to clear my doubts.
Have a nice day ahead.

On Fri, Mar 10, 2017 at 6:18 PM, vaquar khan <vaquar.k...@gmail.com> wrote:

> Please read Spark documents at least once before asking question.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>
> http://2s7gjr373w3x22jf92z99mgm5w-wpengine.netdna-ssl.com/wp-
> content/uploads/2015/11/spark-streaming-datanami.png
>
>
> Regards,
> Vaquar khan
>
>
> On Fri, Mar 10, 2017 at 6:17 AM, Sean Owen <so...@cloudera.com> wrote:
>
>> Kafka and Spark Streaming don't do the same thing. Kafka stores and
>> transports data, Spark Streaming runs computations on a stream of data.
>> Neither is itself a streaming platform in its entirety.
>>
>> It's kind of like asking whether you should build a website using just
>> MySQL, or nginx.
>>
>>
>>> On 9 Mar 2017, at 20:37, Gaurav1809 <gauravhpan...@gmail.com> wrote:
>>>
>>> Hi All, Would you please let me know which streaming platform is best.
>>> Be it
>>> server log processing, social media feeds ot any such streaming data. I
>>> want
>>> to know the comparison between Kafka & Spark Streaming.
>>>
>>>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
>
> IT Architect / Lead Consultant
> Greater Chicago
>


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread vaquar khan
Please read Spark documents at least once before asking question.

http://spark.apache.org/docs/latest/streaming-programming-guide.html

http://2s7gjr373w3x22jf92z99mgm5w-wpengine.netdna-ssl.com/wp-content/uploads/2015/11/spark-streaming-datanami.png


Regards,
Vaquar khan


On Fri, Mar 10, 2017 at 6:17 AM, Sean Owen <so...@cloudera.com> wrote:

> Kafka and Spark Streaming don't do the same thing. Kafka stores and
> transports data, Spark Streaming runs computations on a stream of data.
> Neither is itself a streaming platform in its entirety.
>
> It's kind of like asking whether you should build a website using just
> MySQL, or nginx.
>
>
>> On 9 Mar 2017, at 20:37, Gaurav1809 <gauravhpan...@gmail.com> wrote:
>>
>> Hi All, Would you please let me know which streaming platform is best. Be
>> it
>> server log processing, social media feeds ot any such streaming data. I
>> want
>> to know the comparison between Kafka & Spark Streaming.
>>
>>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread Sean Owen
Kafka and Spark Streaming don't do the same thing. Kafka stores and
transports data, Spark Streaming runs computations on a stream of data.
Neither is itself a streaming platform in its entirety.

It's kind of like asking whether you should build a website using just
MySQL, or nginx.


> On 9 Mar 2017, at 20:37, Gaurav1809 <gauravhpan...@gmail.com> wrote:
>
> Hi All, Would you please let me know which streaming platform is best. Be
> it
> server log processing, social media feeds ot any such streaming data. I
> want
> to know the comparison between Kafka & Spark Streaming.
>
>


Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-10 Thread Robin East
As Jorn says there is no best. I would start with 
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101. This will 
help you form some meaningful questions about what tools suit which use cases. 
Most places have a selection of tools such as spark, kafka, flink, storm, flume 
and so on. 
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 
<http://www.manning.com/books/spark-graphx-in-action>





> On 9 Mar 2017, at 20:04, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> I find this question strange. There is no best tool for every use case. For 
> example, both tools mentioned below are suitable for different purposes, 
> sometimes also complementary.
> 
>> On 9 Mar 2017, at 20:37, Gaurav1809 <gauravhpan...@gmail.com> wrote:
>> 
>> Hi All, Would you please let me know which streaming platform is best. Be it
>> server log processing, social media feeds ot any such streaming data. I want
>> to know the comparison between Kafka & Spark Streaming.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Which-streaming-platform-is-best-Kafka-or-Spark-Streaming-tp28474.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Which streaming platform is best? Kafka or Spark Streaming?

2017-03-09 Thread Jörn Franke
I find this question strange. There is no best tool for every use case. For 
example, both tools mentioned below are suitable for different purposes, 
sometimes also complementary.

> On 9 Mar 2017, at 20:37, Gaurav1809 <gauravhpan...@gmail.com> wrote:
> 
> Hi All, Would you please let me know which streaming platform is best. Be it
> server log processing, social media feeds ot any such streaming data. I want
> to know the comparison between Kafka & Spark Streaming.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Which-streaming-platform-is-best-Kafka-or-Spark-Streaming-tp28474.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Which streaming platform is best? Kafka or Spark Streaming?

2017-03-09 Thread Gaurav1809
Hi All, Would you please let me know which streaming platform is best. Be it
server log processing, social media feeds ot any such streaming data. I want
to know the comparison between Kafka & Spark Streaming.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Which-streaming-platform-is-best-Kafka-or-Spark-Streaming-tp28474.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

What's the entire spark-submit + Spark properties you're using?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Dec 2, 2016 at 6:28 PM, Gabriel Perez <gabr...@adtheorent.com> wrote:
> I had it setup with three nodes, a master and 2 slaves. Is there anything
> that would tell me it was in local mode. I am also added the –deploy-mode
> cluster flag and saw the same results.
>
>
>
> Thanks,
>
> Gabe
>
>
>
> From: Mich Talebzadeh <mich.talebza...@gmail.com>
> Date: Friday, December 2, 2016 at 12:26 PM
> To: Gabriel Perez <gabr...@adtheorent.com>
> Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org>
>
>
> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> in this POC of yours are you running this app with spark  in Local mode by
> any chance?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
>
> On 2 December 2016 at 16:54, Gabriel Perez <gabr...@adtheorent.com> wrote:
>
> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in the
> consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> From: Jacek Laskowski <ja...@japila.pl>
> Date: Friday, December 2, 2016 at 11:47 AM
> To: Gabriel Perez <gabr...@adtheorent.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many executors
> read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
I had it setup with three nodes, a master and 2 slaves. Is there anything that 
would tell me it was in local mode. I am also added the –deploy-mode cluster 
flag and saw the same results.

Thanks,
Gabe

From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Friday, December 2, 2016 at 12:26 PM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: Jacek Laskowski <ja...@japila.pl>, user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

in this POC of yours are you running this app with spark  in Local mode by any 
chance?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


  

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Mich Talebzadeh
in this POC of yours are you running this app with spark  in Local mode by
any chance?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez <gabr...@adtheorent.com> wrote:

> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in
> the consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Friday, December 2, 2016 at 11:47 AM
> *To: *Gabriel Perez <gabr...@adtheorent.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many
> executors read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com>
> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently
> I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext, LocationStrategies.
> PreferConsistent(),
> ConsumerStrategies.<String, String>
> Subscribe( topics, kafkaParams ) );
>
> //JavaPairDStream<String, String> unifiedStream =
> javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
> JavaDStream records = stream.map( new
> Function<ConsumerRecordString, String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Pulling key from the stream and creating the
> aggregation key.
>  */
> public String call( ConsumerRecord<String, String>
> record ) {
>
>
> return record.key();
>
> }
> } );
>
> JavaPairDStream<String, Integer> pairs =
> recor

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
We actually ended up reverting back to 0.9.0 in my testing environment because 
we found other products weren’t ready to go for 0.10 as well. So I am not able 
to create those snapshots. Hopefully I don’t see the same issue with 0.9.0. 
Thank you for your help thought.

Thanks,
Gabe

From: Jacek Laskowski <ja...@japila.pl>
Date: Friday, December 2, 2016 at 12:21 PM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
  

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" <gabr...@adtheorent.com> wrote:

> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in
> the consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Friday, December 2, 2016 at 11:47 AM
> *To: *Gabriel Perez <gabr...@adtheorent.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many
> executors read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com>
> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently
> I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> 
> Map<String, Object> kafkaParams = new HashMap<>();
> kafkaParams.put( "bootstrap.servers", "server:9092" );
> kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
> kafkaParams.put( "group.id", "spark-aggregation" );
> kafkaParams.put( "auto.offset.reset", "earliest" );
> kafkaParams.put( "request.timeout.ms", "305000" );
> kafkaParams.put( "heartbeat.interval.ms", "85000" );
> kafkaParams.put( "session.timeout.ms", "9" );
>
> Collection topics = Arrays.asList( "Topic" );
>
> SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
> .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
> .set( "spark.executor.instances", "16" );
>
> JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
> //Creates connect to the Stream.
> final JavaInputDStream<ConsumerRecordString, String>>
> stream =
> KafkaUtils.createDirectStream(
> javaStreamingContext, LocationStrategies.
> PreferConsistent(),
> ConsumerStrategies.<String, String>
> Subscribe( topics, kafkaParams ) );
>
> //JavaPairDStream<String, String> unifiedStream =
> javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
> JavaDStream records = stream.map( new
> Function<ConsumerRecordString, String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Pulling key from the stream and creating the
> aggregation key.
>  */
> public String call( ConsumerRecord<String, String>
> record ) {
>
>
> return record.key();
>
> }
> } );
>
> JavaPairDStream<String, Integer> pairs =
> records.mapToPair( new
> PairFunction<String, String, Integer>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> /**
>  * Creating new tuple to perform calculations on.
>  */
> public Tuple2<String, Integer> call( String s ) {
>
> return new Tuple2<>( s, 1 );
> }
> } );
>
>   

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski <ja...@japila.pl>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez <gabr...@adtheorent.com>
Cc: user <user@spark.apache.org>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
<gabr...@adtheorent.com<mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2<String, Integer> call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new 
VoidFunction<JavaRDDConsumerRecordString,
String>>>() {

/**

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Jacek Laskowski
Hi,

How many partitions does the topic have? How do you check how many
executors read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabr...@adtheorent.com> wrote:

Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer",
StringDeserializer.class );
kafkaParams.put( "value.deserializer",
StringDeserializer.class );
kafkaParams.put( "group.id", "spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( "request.timeout.ms", "305000" );
kafkaParams.put( "heartbeat.interval.ms", "85000" );
kafkaParams.put( "session.timeout.ms", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster(
"spark://server:7077" )
.setAppName( "aggregation" ).set(
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>>
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, LocationStrategies.
PreferConsistent(),
ConsumerStrategies.<String, String>
Subscribe( topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the
aggregation key.
 */
public String call( ConsumerRecord<String, String>
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair(
new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2<String, Integer> call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream<String, Integer> counts =
pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new VoidFunction<JavaRDD
ConsumerRecordString,
String>>>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call( JavaRDD<ConsumerRecordString,
String>> rdd ) {

OffsetRange[] offsetRanges = (
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

// some time later, after outputs have
completed
( (CanCommitOffsets) stream.inputDStream()
).commitAsync( offsetRanges
);
}
} );




--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread gabrielperez2484
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka. 


Thanks, 
Gabe


Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id", "spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( "request.timeout.ms", "305000" );
kafkaParams.put( "heartbeat.interval.ms", "85000" );
kafkaParams.put( "session.timeout.ms", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream<ConsumerRecordString, String>> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream<String, String> unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function<ConsumerRecordString, String>, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord<String, String> 
record ) {


return record.key();

}
} );

JavaPairDStream<String, Integer> pairs = records.mapToPair( new
PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2<String, Integer> call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream<String, Integer> counts = pairs.reduceByKey( new
Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new 
VoidFunction<JavaRDDConsumerRecordString,
String>>>() {

/**
* 
*/
private static final long serialVersionUID = 1L;

@Override
public void call( JavaRDD<ConsumerRecordString, 
String>> rdd ) {

OffsetRange[] offsetRanges = ( 
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

    // some time later, after outputs have completed
( (CanCommitOffsets) stream.inputDStream() 
).commitAsync( offsetRanges
);
}
} );




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Number of consumers in Kafka with Spark Streaming

2016-06-21 Thread Cody Koeninger
If you're using the direct stream, and don't have speculative
execution turned on, there is one executor consumer created per
partition, plus a driver consumer for getting the latest offsets.  If
you have fewer executors than partitions, not all of those consumers
will be running at the same time.

The direct stream doesn't use consumer groups in the same way the
kafka high level consumer does, but you should be able to pass group
id in the kafka parameters.

On Tue, Jun 21, 2016 at 9:56 AM, Guillermo Ortiz <konstt2...@gmail.com> wrote:
> I use Spark Streaming with Kafka and I'd like to know how many consumers are
> generated. I guess that as many as partitions in Kafka but I'm not sure.
> Is there a way to know the name of the groupId generated in Spark to Kafka?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Number of consumers in Kafka with Spark Streaming

2016-06-21 Thread Guillermo Ortiz
I use Spark Streaming with Kafka and I'd like to know how many consumers
are generated. I guess that as many as partitions in Kafka but I'm not
sure.
Is there a way to know the name of the groupId generated in Spark to Kafka?


RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
Hi Yogesh,

I'm not sure if this is possible or not. I'd be interested in knowing. My gut 
thinks it would be an anti-pattern if it's possible to do something like this 
and that's why I handle it in either the foreachRDD or foreachPartition. The 
way I look at spark streaming is as an application which is always running and 
doing something like windowed batching or microbatching or whatever I'm trying 
to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of 
the job.  IF the RDD I'm get from Kafka has some number of events then I'll 
process the RDD further. 

David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 8:30 AM
To: David Newberger
Subject: Re: Handle empty kafka in Spark Streaming

I am looking for something which checks the JavaPairReceiverInputDStreambefore 
further going for any operations.
For example, if I have get JavaPairReceiverInputDStream in following
manner:

JavaPairReceiverInputDStream<String, String> 
message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, 
StorageLevel.MEMORY_AND_DISK_SER());

Then I would like check whether message is empty or not. If it not empty then 
go for further operations else wait for some data in Kafka.

On Wed, Jun 15, 2016 at 6:31 PM, David Newberger <david.newber...@wandcorp.com> 
wrote:
> If you're asking how to handle no messages in a batch window then I would add 
> an isEmpty check like:
>
> dStream.foreachRDD(rdd => {
> if (!rdd.isEmpty())
> ...
> }
>
> Or something like that.
>
>
> David Newberger
>
> -Original Message-
> From: Yogesh Vyas [mailto:informy...@gmail.com]
> Sent: Wednesday, June 15, 2016 6:31 AM
> To: user
> Subject: Handle empty kafka in Spark Streaming
>
> Hi,
>
> Does anyone knows how to handle empty Kafka while Spark Streaming job is 
> running ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
If you're asking how to handle no messages in a batch window then I would add 
an isEmpty check like:

dStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) 
...
}

Or something like that. 


David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 6:31 AM
To: user
Subject: Handle empty kafka in Spark Streaming

Hi,

Does anyone knows how to handle empty Kafka while Spark Streaming job is 
running ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Handle empty kafka in Spark Streaming

2016-06-15 Thread Yogesh Vyas
Hi,

Does anyone knows how to handle empty Kafka while Spark Streaming job
is running ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

2016-02-25 Thread Harsh J
You should be able to cast the object type to the real underlying type
(GenericRecord (if generic, which is so by default), or the actual type
class (if specific)). The underlying implementation of KafkaAvroDecoder
seems to use either one of those depending on a config switch:
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L206-L218
 and
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java#L27-L28

Once you have the right underlying class, extracting fields should be
simpler/direct, and would not need a mid-transformation to JSON.

On Fri, 26 Feb 2016 at 06:00 Mohammad Tariq <donta...@gmail.com> wrote:

> I got it working by using jsonRDD. This is what I had to do in order to
> make it work :
>
>   val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>   val lines = messages.map(_._2.toString)
>   lines.foreachRDD(jsonRDD => {
> val sqlContext =
> SQLContextSingleton.getInstance(jsonRDD.sparkContext)
> val data = sqlContext.read.json(jsonRDD)
> data.printSchema()
> data.show()
> data.select("COL_NAME").show()
> data.groupBy("COL_NAME").count().show()
>   })
>
> Not sure though if it's the best way to achieve this.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> You can use `DStream.map` to transform objects to anything you want.
>>
>> On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com>
>> wrote:
>>
>>> Hi group,
>>>
>>> I have just started working with confluent platform and spark streaming,
>>> and was wondering if it is possible to access individual fields from an
>>> Avro object read from a kafka topic through spark streaming. As per its
>>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>>> a *DStream[Object, Object]*, and don't have any schema associated with
>>> *Object*(or I am unable to figure it out). This makes it impossible to
>>> perform some operations on this DStream, for example, converting it to a
>>> Spark DataFrame.
>>>
>>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>>> *Object *I think I am going in the wrong direction. Any
>>> pointers/suggestions would be really helpful.
>>>
>>> *Versions used :*
>>> confluent-1.0.1
>>> spark-1.6.0-bin-hadoop2.4
>>> Scala code runner version - 2.11.6
>>>
>>> And this is the small piece of code I am using :
>>>
>>> package org.myorg.scalaexamples
>>>
>>> import org.apache.spark.rdd.RDD
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.SparkContext
>>> import org.apache.avro.mapred.AvroKey
>>> import org.apache.spark.sql.SQLContext
>>> //import org.apache.avro.mapred.AvroValue
>>> import org.apache.spark.streaming.kafka._
>>> import org.apache.spark.storage.StorageLevel
>>> import org.apache.avro.generic.GenericRecord
>>> import org.apache.spark.streaming.dstream.DStream
>>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>>> //import org.apache.hadoop.io.serializer.avro.AvroRecord
>>> //import org.apache.spark.streaming.dstream.ForEachDStream
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.kafka.common.serialization.Deserializer
>>>
>>> object DirectKafkaWordCount {
>>>   def main(args: Array[String]) {
>>> if (args.length < 2) {
>>>   System.err.println(s"""
>>> |Usage: DirectKafkaWordCount  
>>> |   is a list of one or more Kafka brokers
>>> |   is a list of one or more kafka topics to consume from
>>> |
>>> """.stripMargin)
>>>   System.exit(1)
>>> }
>>> val Array(brokers, topics) = args
>>> val sparkConf = new
>>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>>
>>> sparkConf.registerKryoClasses(Array(classOf[org.apache.av

Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

2016-02-25 Thread Mohammad Tariq
I got it working by using jsonRDD. This is what I had to do in order to
make it work :

  val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
  val lines = messages.map(_._2.toString)
  lines.foreachRDD(jsonRDD => {
val sqlContext =
SQLContextSingleton.getInstance(jsonRDD.sparkContext)
val data = sqlContext.read.json(jsonRDD)
data.printSchema()
data.show()
data.select("COL_NAME").show()
data.groupBy("COL_NAME").count().show()
  })

Not sure though if it's the best way to achieve this.



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> You can use `DStream.map` to transform objects to anything you want.
>
> On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com>
> wrote:
>
>> Hi group,
>>
>> I have just started working with confluent platform and spark streaming,
>> and was wondering if it is possible to access individual fields from an
>> Avro object read from a kafka topic through spark streaming. As per its
>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>> a *DStream[Object, Object]*, and don't have any schema associated with
>> *Object*(or I am unable to figure it out). This makes it impossible to
>> perform some operations on this DStream, for example, converting it to a
>> Spark DataFrame.
>>
>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>> *Object *I think I am going in the wrong direction. Any
>> pointers/suggestions would be really helpful.
>>
>> *Versions used :*
>> confluent-1.0.1
>> spark-1.6.0-bin-hadoop2.4
>> Scala code runner version - 2.11.6
>>
>> And this is the small piece of code I am using :
>>
>> package org.myorg.scalaexamples
>>
>> import org.apache.spark.rdd.RDD
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.SparkContext
>> import org.apache.avro.mapred.AvroKey
>> import org.apache.spark.sql.SQLContext
>> //import org.apache.avro.mapred.AvroValue
>> import org.apache.spark.streaming.kafka._
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.avro.generic.GenericRecord
>> import org.apache.spark.streaming.dstream.DStream
>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>> //import org.apache.hadoop.io.serializer.avro.AvroRecord
>> //import org.apache.spark.streaming.dstream.ForEachDStream
>> import org.apache.spark.sql.SQLContext
>> import org.apache.kafka.common.serialization.Deserializer
>>
>> object DirectKafkaWordCount {
>>   def main(args: Array[String]) {
>> if (args.length < 2) {
>>   System.err.println(s"""
>> |Usage: DirectKafkaWordCount  
>> |   is a list of one or more Kafka brokers
>> |   is a list of one or more kafka topics to consume from
>> |
>> """.stripMargin)
>>   System.exit(1)
>> }
>> val Array(brokers, topics) = args
>> val sparkConf = new
>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>
>> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> val topicsSet = topics.split(",").toSet
>> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers, "group.id" -> "consumer",
>>   "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
>> http://localhost:8081;)
>> val messages = KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>> messages.print()
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>>
>> Thank you so much for your valuable time!
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> <http://about.me/mti>
>>
>>
>
>


Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

2016-02-25 Thread Shixiong(Ryan) Zhu
You can use `DStream.map` to transform objects to anything you want.

On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com> wrote:

> Hi group,
>
> I have just started working with confluent platform and spark streaming,
> and was wondering if it is possible to access individual fields from an
> Avro object read from a kafka topic through spark streaming. As per its
> default behaviour *KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
> a *DStream[Object, Object]*, and don't have any schema associated with
> *Object*(or I am unable to figure it out). This makes it impossible to
> perform some operations on this DStream, for example, converting it to a
> Spark DataFrame.
>
> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
> *Object *I think I am going in the wrong direction. Any
> pointers/suggestions would be really helpful.
>
> *Versions used :*
> confluent-1.0.1
> spark-1.6.0-bin-hadoop2.4
> Scala code runner version - 2.11.6
>
> And this is the small piece of code I am using :
>
> package org.myorg.scalaexamples
>
> import org.apache.spark.rdd.RDD
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.SparkContext
> import org.apache.avro.mapred.AvroKey
> import org.apache.spark.sql.SQLContext
> //import org.apache.avro.mapred.AvroValue
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.storage.StorageLevel
> import org.apache.avro.generic.GenericRecord
> import org.apache.spark.streaming.dstream.DStream
> import io.confluent.kafka.serializers.KafkaAvroDecoder
> //import org.apache.hadoop.io.serializer.avro.AvroRecord
> //import org.apache.spark.streaming.dstream.ForEachDStream
> import org.apache.spark.sql.SQLContext
> import org.apache.kafka.common.serialization.Deserializer
>
> object DirectKafkaWordCount {
>   def main(args: Array[String]) {
> if (args.length < 2) {
>   System.err.println(s"""
> |Usage: DirectKafkaWordCount  
> |   is a list of one or more Kafka brokers
> |   is a list of one or more kafka topics to consume from
> |
> """.stripMargin)
>   System.exit(1)
> }
> val Array(brokers, topics) = args
> val sparkConf = new
> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>
> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
> val ssc = new StreamingContext(sparkConf, Seconds(5))
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers, "group.id" -> "consumer",
>   "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
> http://localhost:8081;)
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
> messages.print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thank you so much for your valuable time!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>


Access fields by name/index from Avro data read from Kafka through Spark Streaming

2016-02-25 Thread Mohammad Tariq
Hi group,

I have just started working with confluent platform and spark streaming,
and was wondering if it is possible to access individual fields from an
Avro object read from a kafka topic through spark streaming. As per its
default behaviour *KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)*
return a *DStream[Object,
Object]*, and don't have any schema associated with *Object*(or I am unable
to figure it out). This makes it impossible to perform some operations on
this DStream, for example, converting it to a Spark DataFrame.

Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
*Object *I think I am going in the wrong direction. Any
pointers/suggestions would be really helpful.

*Versions used :*
confluent-1.0.1
spark-1.6.0-bin-hadoop2.4
Scala code runner version - 2.11.6

And this is the small piece of code I am using :

package org.myorg.scalaexamples

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.avro.mapred.AvroKey
import org.apache.spark.sql.SQLContext
//import org.apache.avro.mapred.AvroValue
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.avro.generic.GenericRecord
import org.apache.spark.streaming.dstream.DStream
import io.confluent.kafka.serializers.KafkaAvroDecoder
//import org.apache.hadoop.io.serializer.avro.AvroRecord
//import org.apache.spark.streaming.dstream.ForEachDStream
import org.apache.spark.sql.SQLContext
import org.apache.kafka.common.serialization.Deserializer

object DirectKafkaWordCount {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println(s"""
|Usage: DirectKafkaWordCount  
|   is a list of one or more Kafka brokers
|   is a list of one or more kafka topics to consume from
|
""".stripMargin)
  System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new
SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")

sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "group.id" -> "consumer",
  "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
http://localhost:8081;)
val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
messages.print()
ssc.start()
ssc.awaitTermination()
  }
}

Thank you so much for your valuable time!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-17 Thread radoburansky
Hi Jerry,

How do you know that only 100 messages are inserted? What is the primary
key of the "tableOfTopicA" Cassandra table? Isn't it possible that you map
more messages to the same primamary key and therefore they overwrite each
other in Cassandra?

Regards

Rado

On Tue, Feb 16, 2016 at 10:29 PM, Jerry [via Apache Spark User List] <
ml-node+s1001560n26244...@n3.nabble.com> wrote:

> Hello,
>
> I have questions using Spark streaming to consume data from Kafka and
> insert to Cassandra database.
>
> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
> Hadoop, Cassandra
> Scala: 2.10.5
> Spark: 1.2.2
> Hadoop: 1.2.1
> Cassandra 2.0.18
>
> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB
> memory)
> Kafka: 0.8.2.1
> Zookeeper: 3.4.6
>
> Other configurations:
> batchInterval = 6 Seconds
> blockInterval = 1500 millis
> spark.locality.wait = 500 millis
> #Consumers = 10
>
> There are two columns in the cassandra table
> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>
> Here is a piece of codes,
>
> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
>  StorageLevel.MEMORY_AND_DISK_SER)
> .map(_._2.toString).map(Tuple1(_))
> .map{case(log) => (System.currentTimeMillis(), log)}
> }
> @transient val unifiedMessage = ssc.union(kstreams)
>
> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
> SomeColumns("createdtime", "log"))
>
> I created a producer and send messages to Brokers (1000 messages/per time)
>
> But the Cassandra can only be inserted about 100 messages in each round of
> test.
> Can anybody give me advices why the other messages (about 900 message)
> can't be consumed?
> How do I configure and tune the parameters in order to improve the
> throughput of consumers?
>
> Thank you very much for your reading and suggestions in advances.
>
> Jerry Wong
>
> ------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=cmFkb2J1cmFuc2t5QGdtYWlsLmNvbXwxfC03MDA2NjE5MjQ=>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-17 Thread Jerry
Rado,

Yes. you are correct. A lots of messages are created almost in the same
time (even use milliseconds). I changed to use "UUID.randomUUID()" with
which all messages can be inserted in the Cassandra table without time lag.

Thank you very much!
Jerry Wong

On Wed, Feb 17, 2016 at 1:50 AM, radoburansky [via Apache Spark User List] <
ml-node+s1001560n26246...@n3.nabble.com> wrote:

> Hi Jerry,
>
> How do you know that only 100 messages are inserted? What is the primary
> key of the "tableOfTopicA" Cassandra table? Isn't it possible that you
> map more messages to the same primamary key and therefore they overwrite
> each other in Cassandra?
>
> Regards
>
> Rado
>
> On Tue, Feb 16, 2016 at 10:29 PM, Jerry [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node=26246=0>> wrote:
>
>> Hello,
>>
>> I have questions using Spark streaming to consume data from Kafka and
>> insert to Cassandra database.
>>
>> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
>> Hadoop, Cassandra
>> Scala: 2.10.5
>> Spark: 1.2.2
>> Hadoop: 1.2.1
>> Cassandra 2.0.18
>>
>> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB
>> memory)
>> Kafka: 0.8.2.1
>> Zookeeper: 3.4.6
>>
>> Other configurations:
>> batchInterval = 6 Seconds
>> blockInterval = 1500 millis
>> spark.locality.wait = 500 millis
>> #Consumers = 10
>>
>> There are two columns in the cassandra table
>> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>>
>> Here is a piece of codes,
>>
>> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
>> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
>>  StorageLevel.MEMORY_AND_DISK_SER)
>> .map(_._2.toString).map(Tuple1(_))
>> .map{case(log) => (System.currentTimeMillis(), log)}
>> }
>> @transient val unifiedMessage = ssc.union(kstreams)
>>
>> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
>> SomeColumns("createdtime", "log"))
>>
>> I created a producer and send messages to Brokers (1000 messages/per
>> time)
>>
>> But the Cassandra can only be inserted about 100 messages in each round
>> of test.
>> Can anybody give me advices why the other messages (about 900 message)
>> can't be consumed?
>> How do I configure and tune the parameters in order to improve the
>> throughput of consumers?
>>
>> Thank you very much for your reading and suggestions in advances.
>>
>> Jerry Wong
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node=26246=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26246.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Optimize the performance of inserting data to
> Cassandra with Kafka and Spark Streaming, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=26244=amVycnkua2luZzIud29uZ0BnbWFpbC5jb218MjYyNDR8MTYwMzcyMjg3MQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

2016-02-16 Thread Jerry
Hello, 

I have questions using Spark streaming to consume data from Kafka and insert
to Cassandra database.

5 AWS instances (each one does have 8 cores, 30GB memory) for Spark, Hadoop,
Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18

3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6

Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10

There are two columns in the cassandra table keySpaceOfTopicA.tableOfTopicA,
"createdtime" and "log".

Here is a piece of codes,

@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),   
StorageLevel.MEMORY_AND_DISK_SER)
.map(_._2.toString).map(Tuple1(_))
.map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)

unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))

I created a producer and send messages to Brokers (1000 messages/per time)

But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message) can't
be consumed? 
How do I configure and tune the parameters in order to improve the
throughput of consumers?  

Thank you very much for your reading and suggestions in advances.

Jerry Wong



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



architecture though experiment: what is the advantage of using kafka with spark streaming?

2015-12-10 Thread Andy Davidson
I noticed that many people are using Kafka and spark streaming. Can some one
provide a couple of use case

I image some possible use cases might be

Is the purpose using  Kafka
1. provide some buffering?
2. implementing some sort of load balancing for the over all system?
3. Provide filtering /sorting of data?
4. Simplify client connection. Easy for thousands of producers to connect to
kafka. Probably hard to do with spark streaming
5. ???
Kind regards

Andy




Re: architecture though experiment: what is the advantage of using kafka with spark streaming?

2015-12-10 Thread Cody Koeninger
Kafka provides buffering, ordering, decoupling of producers from multiple
consumers.  So pretty much any time you have requirements for asynchronous
process, fault tolerance, and/or a common view of the order of events
across multiple consumers kafka is worth a look.

Spark provides a much richer language for processing data than what you'd
get with writing kafka consumers yourself.

On Thu, Dec 10, 2015 at 8:00 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I noticed that many people are using Kafka and spark streaming. Can some
> one provide a couple of use case
>
> I image some possible use cases might be
>
> Is the purpose using  Kafka
>
>1. provide some buffering?
>2. implementing some sort of load balancing for the over all system?
>3. Provide filtering /sorting of data?
>4. Simplify client connection. Easy for thousands of producers to
>connect to kafka. Probably hard to do with spark streaming
>5. ???
>
> Kind regards
>
> Andy
>


Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Cassa L
Just to confirm, is this what you are mentioning about? Is there any
example on how to set it? I believe it is for 0.8.3 version?

https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers


On Fri, Aug 28, 2015 at 12:52 PM, Sriharsha Chintalapani ka...@harsha.io
wrote:

 You can configure PLAINTEXT listener as well with the broker and use that
 port for spark.

 --
 Harsha


 On August 28, 2015 at 12:24:45 PM, Sourabh Chandak (sourabh3...@gmail.com)
 wrote:

 Can we use the existing kafka spark streaming jar to connect to a kafka
 server running in SSL mode?

 We are fine with non SSL consumer as our kafka cluster and spark cluster
 are in the same network


 Thanks,
 Sourabh

 On Fri, Aug 28, 2015 at 12:03 PM, Gwen Shapira g...@confluent.io wrote:
 I can't speak for the Spark Community, but checking their code,
 DirectKafkaStream and KafkaRDD use the SimpleConsumer API:


 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

 On Fri, Aug 28, 2015 at 11:32 AM, Cassa L lcas...@gmail.com wrote:

  Hi I am using below Spark jars with Direct Stream API.
spark-streaming-kafka_2.10
 
  When I look at its pom.xml, Kafka libraries that its pulling in is
 groupIdorg.apache.kafka/groupId
 artifactIdkafka_${scala.binary.version}/artifactId
 version0.8.2.1/version
 
 
  I believe this DirectStream API uses SimpleConsumer API. Can someone from
  Spark community confirm too?
 
  Thanks,
  LCassa.
 
  On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani 
 ka...@harsha.io
  wrote:
 
   SSL is supported for new producer and consumer api and old api (simple
   consumer and high-level consumer) is not supported.
   I think spark uses simple consumer? if so its not supported.
  
   Thanks,
   Harsha
  
  
   On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:
  
   Hi,
   I was going through SSL setup of Kafka.
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
   However, I am also using Spark-Kafka streaming to read data from Kafka.
  Is
   there a way to activate SSL for spark streaming API or not possible at
   all?
  
   Thanks,
   LCassa
  
  
 




SSL between Kafka and Spark Streaming API

2015-08-28 Thread Cassa L
Hi,
 I was going through SSL setup of Kafka.
https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
However, I am also using Spark-Kafka streaming to read data from Kafka.  Is
there a way to activate SSL for spark streaming API or not possible at all?

Thanks,
LCassa


Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Cassa L
Hi I am using below Spark jars with Direct Stream API.
  spark-streaming-kafka_2.10

When I look at its pom.xml, Kafka libraries that its pulling in is
   groupIdorg.apache.kafka/groupId
   artifactIdkafka_${scala.binary.version}/artifactId
   version0.8.2.1/version


I believe this DirectStream API uses SimpleConsumer API. Can someone from
Spark community confirm too?

Thanks,
LCassa.

On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani ka...@harsha.io
wrote:

 SSL is supported for new producer and consumer api and old api (simple
 consumer and high-level consumer) is not supported.
 I think spark uses simple consumer? if so its not supported.

 Thanks,
 Harsha


 On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:

 Hi,
 I was going through SSL setup of Kafka.
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
 However, I am also using Spark-Kafka streaming to read data from Kafka. Is
 there a way to activate SSL for spark streaming API or not possible at
 all?

 Thanks,
 LCassa




Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Cody Koeninger
Yeah, the direct api uses the simple consumer

On Fri, Aug 28, 2015 at 1:32 PM, Cassa L lcas...@gmail.com wrote:

 Hi I am using below Spark jars with Direct Stream API.
   spark-streaming-kafka_2.10

 When I look at its pom.xml, Kafka libraries that its pulling in is
groupIdorg.apache.kafka/groupId
artifactIdkafka_${scala.binary.version}/artifactId
version0.8.2.1/version


 I believe this DirectStream API uses SimpleConsumer API. Can someone from
 Spark community confirm too?

 Thanks,
 LCassa.

 On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:

 SSL is supported for new producer and consumer api and old api (simple
 consumer and high-level consumer) is not supported.
 I think spark uses simple consumer? if so its not supported.

 Thanks,
 Harsha


 On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:

 Hi,
 I was going through SSL setup of Kafka.
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
 However, I am also using Spark-Kafka streaming to read data from Kafka.
 Is
 there a way to activate SSL for spark streaming API or not possible at
 all?

 Thanks,
 LCassa





Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Sourabh Chandak
Can we use the existing kafka spark streaming jar to connect to a kafka
server running in SSL mode?

We are fine with non SSL consumer as our kafka cluster and spark cluster
are in the same network


Thanks,
Sourabh

On Fri, Aug 28, 2015 at 12:03 PM, Gwen Shapira g...@confluent.io wrote:

 I can't speak for the Spark Community, but checking their code,
 DirectKafkaStream and KafkaRDD use the SimpleConsumer API:


 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala

 https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

 On Fri, Aug 28, 2015 at 11:32 AM, Cassa L lcas...@gmail.com wrote:

  Hi I am using below Spark jars with Direct Stream API.
spark-streaming-kafka_2.10
 
  When I look at its pom.xml, Kafka libraries that its pulling in is
 groupIdorg.apache.kafka/groupId
 artifactIdkafka_${scala.binary.version}/artifactId
 version0.8.2.1/version
 
 
  I believe this DirectStream API uses SimpleConsumer API. Can someone from
  Spark community confirm too?
 
  Thanks,
  LCassa.
 
  On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani 
 ka...@harsha.io
  wrote:
 
   SSL is supported for new producer and consumer api and old api (simple
   consumer and high-level consumer) is not supported.
   I think spark uses simple consumer? if so its not supported.
  
   Thanks,
   Harsha
  
  
   On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:
  
   Hi,
   I was going through SSL setup of Kafka.
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
   However, I am also using Spark-Kafka streaming to read data from Kafka.
  Is
   there a way to activate SSL for spark streaming API or not possible at
   all?
  
   Thanks,
   LCassa
  
  
 



Re: No Twitter Input from Kafka to Spark Streaming

2015-08-06 Thread Akhil Das
You just pasted your twitter credentials, consider changing it. :/

Thanks
Best Regards

On Wed, Aug 5, 2015 at 10:07 PM, narendra narencs...@gmail.com wrote:

 Thanks Akash for the answer. I added endpoint to the listener and now it is
 working.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: No Twitter Input from Kafka to Spark Streaming

2015-08-05 Thread narendra
Thanks Akash for the answer. I added endpoint to the listener and now it is
working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread narendra
My application takes Twitter4j tweets and publishes those to a topic in
Kafka. Spark Streaming subscribes to that topic for processing. But in
actual, Spark Streaming is not able to receive tweet data from Kafka so
Spark Streaming is running empty batch jobs with out input and I am not able
to see any output from Spark Streaming.

The code of the application is - 

import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

object TwitterPopularTags {
def main(args: Array[String]) {

/** Information necessary for accessing the Twitter API */
val consumerKey= 2AgtQfH8rlyUDyfjwPOCDosEQ
val consumerSecret=
vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl
val accessToken=
33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e
val accessTokenSecret =
X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm
val cb = new ConfigurationBuilder()
cb.setOAuthConsumerKey(consumerKey)
cb.setOAuthConsumerSecret(consumerSecret)
cb.setOAuthAccessToken(accessToken)
cb.setOAuthAccessTokenSecret(accessTokenSecret)
cb.setJSONStoreEnabled(true)
cb.setIncludeEntitiesEnabled(true)
val twitterStream = new
TwitterStreamFactory(cb.build()).getInstance()  

val KafkaTopic = LiveTweets
/* kafka producer properties */
val kafkaProducer = {
val props = new Properties()
props.put(metadata.broker.list,
broker2:9092,localhost:9092)
props.put(serializer.class,
kafka.serializer.StringEncoder)
props.put(request.required.acks, 1)
val config = new ProducerConfig(props)
new Producer[String, String](config)
 }

/* Invoked when a new tweet comes */
val listener = new StatusListener() { 

   override def onStatus(status: Status): Unit = {
   val msg = new KeyedMessage[String,
String](KafkaTopic,DataObjectFactory.getRawJSON(status))
   kafkaProducer.send(msg)
  }
   override def onException(ex: Exception): Unit = throw ex

  // no-op for the following events
  override def onStallWarning(warning: StallWarning): Unit =
{}
  override def onDeletionNotice(statusDeletionNotice:
StatusDeletionNotice): Unit = {}
  override def onScrubGeo(userId: Long, upToStatusId: Long):
Unit = {}
  override def
onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
}

twitterStream.addListener(listener)
// Create Spark Streaming context
val sparkConf = new SparkConf().setAppName(Twitter-Kafka-Spark
Streaming)
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

// Define the Kafka parameters, broker list must be specified
val kafkaParams = Map(metadata.broker.list -
broker2:9092,localhost:9092)
val topics = Set(KafkaTopic)

// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc,kafkaParams,topics)
val lines = kafkaStream.map(_._2)
val words = lines.flatMap(_.split( ))
val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

  }
}

Spark Streaming web UI - 

http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png 

http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png 


Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread Cody Koeninger
Have you tried using the console consumer to see if anything is actually
getting published to that topic?

On Tue, Aug 4, 2015 at 11:45 AM, narendra narencs...@gmail.com wrote:

 My application takes Twitter4j tweets and publishes those to a topic in
 Kafka. Spark Streaming subscribes to that topic for processing. But in
 actual, Spark Streaming is not able to receive tweet data from Kafka so
 Spark Streaming is running empty batch jobs with out input and I am not
 able
 to see any output from Spark Streaming.

 The code of the application is -

 import java.util.HashMap
 import java.util.Properties
 import twitter4j._
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
 import twitter4j.StatusDeletionNotice;
 import twitter4j.StatusListener;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 import kafka.serializer.StringDecoder
 import org.apache.spark.streaming.kafka._
 import kafka.javaapi.producer.Producer
 import kafka.producer.{KeyedMessage, ProducerConfig}
 import org.apache.spark._
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._

 object TwitterPopularTags {
 def main(args: Array[String]) {

 /** Information necessary for accessing the Twitter API */
 val consumerKey= 2AgtQfH8rlyUDyfjwPOCDosEQ
 val consumerSecret=
 vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl
 val accessToken=
 33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e
 val accessTokenSecret =
 X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm
 val cb = new ConfigurationBuilder()
 cb.setOAuthConsumerKey(consumerKey)
 cb.setOAuthConsumerSecret(consumerSecret)
 cb.setOAuthAccessToken(accessToken)
 cb.setOAuthAccessTokenSecret(accessTokenSecret)
 cb.setJSONStoreEnabled(true)
 cb.setIncludeEntitiesEnabled(true)
 val twitterStream = new
 TwitterStreamFactory(cb.build()).getInstance()

 val KafkaTopic = LiveTweets
 /* kafka producer properties */
 val kafkaProducer = {
 val props = new Properties()
 props.put(metadata.broker.list,
 broker2:9092,localhost:9092)
 props.put(serializer.class,
 kafka.serializer.StringEncoder)
 props.put(request.required.acks, 1)
 val config = new ProducerConfig(props)
 new Producer[String, String](config)
  }

 /* Invoked when a new tweet comes */
 val listener = new StatusListener() {

override def onStatus(status: Status): Unit = {
val msg = new KeyedMessage[String,
 String](KafkaTopic,DataObjectFactory.getRawJSON(status))
kafkaProducer.send(msg)
   }
override def onException(ex: Exception): Unit = throw ex

   // no-op for the following events
   override def onStallWarning(warning: StallWarning): Unit
 =
 {}
   override def onDeletionNotice(statusDeletionNotice:
 StatusDeletionNotice): Unit = {}
   override def onScrubGeo(userId: Long, upToStatusId:
 Long):
 Unit = {}
   override def
 onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
 }

 twitterStream.addListener(listener)
 // Create Spark Streaming context
 val sparkConf = new SparkConf().setAppName(Twitter-Kafka-Spark
 Streaming)
 val sc = new SparkContext(sparkConf)
 val ssc = new StreamingContext(sc, Seconds(2))

 // Define the Kafka parameters, broker list must be specified
 val kafkaParams = Map(metadata.broker.list -
 broker2:9092,localhost:9092)
 val topics = Set(KafkaTopic)

 // Create the direct stream with the Kafka parameters and topics
 val kafkaStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc,kafkaParams,topics)
 val lines = kafkaStream.map(_._2)
 val words = lines.flatMap(_.split( ))
 val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)
 wordCounts.print()
 ssc.start()
 ssc.awaitTermination()

   }
 }

 Spark Streaming web UI -

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png
 

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png
 


 Thank you.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr

writing to kafka using spark streaming

2015-07-06 Thread Shushant Arora
I have a requirement to write in kafka queue from a spark streaming
application.

I am using spark 1.2 streaming. Since different executors in spark are
allocated at each run so instantiating a new kafka producer at each run
seems a costly operation .Is there a way to reuse objects in processing
executors(not in receivers)?


Re: writing to kafka using spark streaming

2015-07-06 Thread Cody Koeninger
Use foreachPartition, and allocate whatever the costly resource is once per
partition.

On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 I have a requirement to write in kafka queue from a spark streaming
 application.

 I am using spark 1.2 streaming. Since different executors in spark are
 allocated at each run so instantiating a new kafka producer at each run
 seems a costly operation .Is there a way to reuse objects in processing
 executors(not in receivers)?





Re: writing to kafka using spark streaming

2015-07-06 Thread Tathagata Das
Yeah, creating a new producer at the granularity of partitions may not be
that costly.

On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote:

 Use foreachPartition, and allocate whatever the costly resource is once
 per partition.

 On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 I have a requirement to write in kafka queue from a spark streaming
 application.

 I am using spark 1.2 streaming. Since different executors in spark are
 allocated at each run so instantiating a new kafka producer at each run
 seems a costly operation .Is there a way to reuse objects in processing
 executors(not in receivers)?






Re: writing to kafka using spark streaming

2015-07-06 Thread Shushant Arora
whats the difference between foreachPartition vs mapPartitions for a
Dtstream both works at partition granularity?

One is an operation and another is action but if I call an opeartion
afterwords mapPartitions  also, which one is more efficient and recommeded?

On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com wrote:

 Yeah, creating a new producer at the granularity of partitions may not be
 that costly.

 On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote:

 Use foreachPartition, and allocate whatever the costly resource is once
 per partition.

 On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 I have a requirement to write in kafka queue from a spark streaming
 application.

 I am using spark 1.2 streaming. Since different executors in spark are
 allocated at each run so instantiating a new kafka producer at each run
 seems a costly operation .Is there a way to reuse objects in processing
 executors(not in receivers)?







Re: writing to kafka using spark streaming

2015-07-06 Thread Tathagata Das
Both have same efficiency. The primary difference is that one is a
transformation (hence is lazy, and requires another action to actually
execute), and the other is an action.
But it may be a slightly better design in general to have transformations
be purely functional (that is, no external side effect) and all
non-functional stuff be actions (e.g., saveAsHadoopFile is an action).


On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 whats the difference between foreachPartition vs mapPartitions for a
 Dtstream both works at partition granularity?

 One is an operation and another is action but if I call an opeartion
 afterwords mapPartitions  also, which one is more efficient and
 recommeded?

 On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, creating a new producer at the granularity of partitions may not be
 that costly.

 On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Use foreachPartition, and allocate whatever the costly resource is once
 per partition.

 On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 I have a requirement to write in kafka queue from a spark streaming
 application.

 I am using spark 1.2 streaming. Since different executors in spark are
 allocated at each run so instantiating a new kafka producer at each run
 seems a costly operation .Is there a way to reuse objects in processing
 executors(not in receivers)?








Re: writing to kafka using spark streaming

2015-07-06 Thread Shushant Arora
On using foreachPartition jobs get created are not displayed on driver
console but are visible on web ui.
On driver it creates some stage statistics of form [Stage 2:
   (0 + 2) / 5] and disappeared .

I am using foreachPartition as :

kafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() {
public Void call(JavaPairRDDbyte[], byte[] v1) throws Exception {
v1.foreachPartition(new VoidFunctionIteratorTuple2byte[],byte[]() {
public void call(IteratorTuple2byte[], byte[] t) throws Exception {
SparkKafkaProducer producer = SparkKafkaProducer.getInstance();
while(t.hasNext()){
Tuple2byte[], byte[] tuple = t.next();
//create msg after processing tuple._2()
producer.sendMsg(msg);
}
}
});
return null;
}
});

1.Why jobs are not displayed on driver console?

Is  call function in above code snippet being executed on each workers for
each partition? And on webui also no job get 2.displayed when input source
(kafka queue) does not have any new messages? But when I used mapPartitions
jobs get created and displayed on webui as well as driver for each batch
whether input has data or not ?

 Is it expected behaviour foreachPartition - that it ignores empty
partition or it does not even created partitions when input source was
empty.








On Tue, Jul 7, 2015 at 12:44 AM, Tathagata Das t...@databricks.com wrote:

 Both have same efficiency. The primary difference is that one is a
 transformation (hence is lazy, and requires another action to actually
 execute), and the other is an action.
 But it may be a slightly better design in general to have
 transformations be purely functional (that is, no external side effect)
 and all non-functional stuff be actions (e.g., saveAsHadoopFile is an
 action).


 On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 whats the difference between foreachPartition vs mapPartitions for a
 Dtstream both works at partition granularity?

 One is an operation and another is action but if I call an opeartion
 afterwords mapPartitions  also, which one is more efficient and
 recommeded?

 On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, creating a new producer at the granularity of partitions may not
 be that costly.

 On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Use foreachPartition, and allocate whatever the costly resource is once
 per partition.

 On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 I have a requirement to write in kafka queue from a spark streaming
 application.

 I am using spark 1.2 streaming. Since different executors in spark are
 allocated at each run so instantiating a new kafka producer at each run
 seems a costly operation .Is there a way to reuse objects in processing
 executors(not in receivers)?









Re: Help with publishing to Kafka from Spark Streaming?

2015-05-02 Thread Saisai Shao
Here is the pull request, you may refer to this:

https://github.com/apache/spark/pull/2994

Thanks
Jerry


2015-05-01 14:38 GMT+08:00 Pavan Sudheendra pavan0...@gmail.com:

 Link to the question:

 http://stackoverflow.com/questions/29974017/spark-kafka-producer-not-serializable-exception

 Thanks for any pointers.



Help with publishing to Kafka from Spark Streaming?

2015-05-01 Thread Pavan Sudheendra
Link to the question:
http://stackoverflow.com/questions/29974017/spark-kafka-producer-not-serializable-exception

Thanks for any pointers.


Re: How to replay consuming messages from kafka using spark streaming?

2015-01-23 Thread mykidong
Hi,

I have written spark streaming kafka receiver using kafka simple consumer
api:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver

This kafka receiver can be used as alternative to the current spark
streaming kafka receiver which is just written in high level kafka consumer
api.

With this kafka receiver, the kafka message offset control can be done more
easier for Receiver Woker Node Failure and Driver Node Failure.

- Kidong.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145p21343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread Cody Koeninger
Take a look at the implementation linked from here

https://issues.apache.org/jira/browse/SPARK-4964

see if that would meet your needs

On Wed, Jan 14, 2015 at 9:58 PM, mykidong mykid...@gmail.com wrote:

 Hi,

 My Spark Streaming Job is doing like kafka etl to HDFS.
 For instance, every 10 min. my streaming job is retrieving messages from
 kafka, and save them as avro files onto hdfs.
 My question is, if worker fails to write avro to hdfs, sometimes, I want to
 replay consuming messages from the last succeeded kafka offset again.
 I think, Spark Streaming Kafka Receiver is written using Kafka High Level
 Consumer API, not Simple Consumer API.

 Any idea how to replay kafka consuming in spark streaming?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread Shao, Saisai
I think there're two solutions:

1. Enable write ahead log in Spark Streaming if you're using Spark 1.2.
2.  Using third-party Kafka consumer 
(https://github.com/dibbhatt/kafka-spark-consumer).

Thanks
Saisai

-Original Message-
From: mykidong [mailto:mykid...@gmail.com] 
Sent: Thursday, January 15, 2015 11:59 AM
To: user@spark.apache.org
Subject: How to replay consuming messages from kafka using spark streaming?

Hi,

My Spark Streaming Job is doing like kafka etl to HDFS.
For instance, every 10 min. my streaming job is retrieving messages from kafka, 
and save them as avro files onto hdfs. 
My question is, if worker fails to write avro to hdfs, sometimes, I want to 
replay consuming messages from the last succeeded kafka offset again. 
I think, Spark Streaming Kafka Receiver is written using Kafka High Level 
Consumer API, not Simple Consumer API.

Any idea how to replay kafka consuming in spark streaming?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread mykidong
Hi,

My Spark Streaming Job is doing like kafka etl to HDFS.
For instance, every 10 min. my streaming job is retrieving messages from
kafka, and save them as avro files onto hdfs. 
My question is, if worker fails to write avro to hdfs, sometimes, I want to
replay consuming messages from the last succeeded kafka offset again. 
I think, Spark Streaming Kafka Receiver is written using Kafka High Level
Consumer API, not Simple Consumer API.

Any idea how to replay kafka consuming in spark streaming?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org