Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-20 Thread varun sharma
I do this in my stop script to kill the application: kill -s SIGTERM `pgrep
-f StreamingApp`
to stop it forcefully : pkill -9 -f "StreamingApp"
StreamingApp is name of class which I submitted.

I also have shutdown hook thread to stop it gracefully.

sys.ShutdownHookThread {
  logInfo("Gracefully stopping StreamingApp")
  ssc.stop(true, true)
  logInfo("StreamingApp stopped")
}

I am also not able to kill application from sparkUI.


On Sat, Nov 21, 2015 at 11:32 AM, Vikram Kone <vikramk...@gmail.com> wrote:

> I tried adding shutdown hook to my code but it didn't help. Still same
> issue
>
>
> On Fri, Nov 20, 2015 at 7:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Which Spark release are you using ?
>>
>> Can you pastebin the stack trace of the process running on your machine ?
>>
>> Thanks
>>
>> On Nov 20, 2015, at 6:46 PM, Vikram Kone <vikramk...@gmail.com> wrote:
>>
>> Hi,
>> I'm seeing a strange problem. I have a spark cluster in standalone mode.
>> I submit spark jobs from a remote node as follows from the terminal
>>
>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>> spark-jobs.jar
>>
>> when the app is running , when I press ctrl-C on the console terminal,
>> then the process is killed and so is the app in the spark master UI. When I
>> go to spark master ui, i see that this app is in state Killed under
>> Completed applications, which is what I expected to see.
>>
>> Now, I created a shell script as follows to do the same
>>
>> #!/bin/bash
>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>> spark-jobs.jar
>> echo $! > my.pid
>>
>> When I execute the shell script from terminal, as follows
>>
>> $> bash myscript.sh
>>
>> The application is submitted correctly to spark master and I can see it
>> as one of the running apps in teh spark master ui. But when I kill the
>> process in my terminal as follows
>>
>> $> ps kill $(cat my.pid)
>>
>> I see that the process is killed on my machine but the spark appliation
>> is still running in spark master! It doesn't get killed.
>>
>> I noticed one more thing that, when I launch the spark job via shell
>> script and kill the application from spark master UI by clicking on "kill"
>> next to the running application, it gets killed in spark ui but I still see
>> the process running in my machine.
>>
>> In both cases, I would expect the remote spark app to be killed and my
>> local process to be killed.
>>
>> Why is this happening? and how can I kill a spark app from the terminal
>> launced via shell script w.o going to the spark master UI?
>>
>> I want to launch the spark app via script and log the pid so i can
>> monitor it remotely
>>
>> thanks for the help
>>
>>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: In Spark application, how to get the passed in configuration?

2015-11-12 Thread varun sharma
You must be getting a warning at the start of application like : Warning:
Ignoring non-spark config property: runtime.environment=passInValue .

Configs in spark should start with *spark* as prefix. So try something like
--conf spark.runtime.environment=passInValue .

Regards
Varun

On Thu, Nov 12, 2015 at 9:51 PM, java8964 <java8...@hotmail.com> wrote:

> In my Spark application, I want to access the pass in configuration, but
> it doesn't work. How should I do that?
>
> object myCode extends Logging {
>   // starting point of the application
>   def main(args: Array[String]): Unit = {
> val sparkContext = new SparkContext()
> val runtimeEnvironment = sparkContext.getConf.get("runtime.environment", 
> "default")
> Console.println("load properties from runtimeEnvironment: " + 
> runtimeEnvironment)
> logInfo("load properties from runtimeEnvironment: " + runtimeEnvironment)
> sparkContext.stop()
>   }
> }
>
>
> /opt/spark/bin/spark-submit --class myCode --conf 
> runtime.environment=passInValue my.jar
>
> load properties from runtimeEnvironment: default
>
>
> It looks like that I cannot access the dynamic passed in value from the 
> command line this way.
>
> In the Hadoop, the Configuration object will include all the passed in 
> key/value in the application. How to archive that in Spark?
>
> Thanks
>
> Yong
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread varun sharma
Cody, adding partitions to kafka is there as a last resort, I was wondering
if I can decrease the processing time by not touching my Kafka cluster.
Adrian, repartition looks like a good option and let me check if I can gain
performance.
Dibyendu, will surely try out this consumer.

Thanks all, will share my findings..

On Thu, Oct 29, 2015 at 7:16 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Consuming from kafka is inherently limited to using a number of consumer
> nodes less than or equal to the number of kafka partitions.  If you think
> about it, you're going to be paying some network cost to repartition that
> data from a consumer to different processing nodes, regardless of what
> Spark consumer library you use.
>
> If you really need finer grained parallelism, and want to do it in a more
> efficient manner, you need to move that partitioning to the producer (i.e.
> add more partitions to kafka).
>
> On Thu, Oct 29, 2015 at 6:11 AM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> You can call .repartition on the Dstream created by the Kafka direct
>> consumer. You take the one-time hit of a shuffle but gain the ability to
>> scale out processing beyond your number of partitions.
>>
>> We’re doing this to scale up from 36 partitions / topic to 140 partitions
>> (20 cores * 7 nodes) and it works great.
>>
>> -adrian
>>
>> From: varun sharma
>> Date: Thursday, October 29, 2015 at 8:27 AM
>> To: user
>> Subject: Need more tasks in KafkaDirectStream
>>
>> Right now, there is one to one correspondence between kafka partitions
>> and spark partitions.
>> I dont have a requirement of one to one semantics.
>> I need more tasks to be generated in the job so that it can be
>> parallelised and batch can be completed fast. In the previous Receiver
>> based approach number of tasks created were independent of kafka
>> partitions, I need something like that only.
>> Any config available if I dont need one to one semantics?
>> Is there any way I can repartition without incurring any additional cost.
>>
>> Thanks
>> *VARUN SHARMA*
>>
>>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Need more tasks in KafkaDirectStream

2015-10-29 Thread varun sharma
Right now, there is one to one correspondence between kafka partitions and
spark partitions.
I dont have a requirement of one to one semantics.
I need more tasks to be generated in the job so that it can be parallelised
and batch can be completed fast. In the previous Receiver based approach
number of tasks created were independent of kafka partitions, I need
something like that only.
Any config available if I dont need one to one semantics?
Is there any way I can repartition without incurring any additional cost.

Thanks
*VARUN SHARMA*


Re: correct and fast way to stop streaming application

2015-10-27 Thread varun sharma
One more thing we can try is before committing offset we can verify the
latest offset of that partition(in zookeeper) with fromOffset in
OffsetRange.
Just a thought...

Let me know if it works..

On Tue, Oct 27, 2015 at 9:00 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If you want to make sure that your offsets are increasing without gaps...
> one way to do that is to enforce that invariant when you're saving to your
> database.  That would probably mean using a real database instead of
> zookeeper though.
>
> On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <
> krot.vyaches...@gmail.com> wrote:
>
>> Any ideas? This is so important because we use kafka direct streaming and
>> save processed offsets manually as last step in the job, so we archive
>> at-least-once.
>> But see what happens when new batch is scheduled after a job fails:
>> - suppose we start from offset 10 loaded from zookeeper
>> - job starts with offsets 10-20
>> - job fails N times, awaitTermination notices that and stops context (or
>> even jvm with System.exit), but Scheduler has already started new job, it
>> is job for offsets 20-30, and sent it to executor.
>> - executor does all the steps (if there is only one stage) and saves
>> offset 30 to zookeeper.
>>
>> This way I loose data in offsets 10-20
>>
>> How should this be handled correctly?
>>
>> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharman...@gmail.com>:
>>
>>> +1, wanted to do same.
>>>
>>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
>>> krot.vyaches...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I wonder what is the correct way to stop streaming application if some
>>>> job failed?
>>>> What I have now:
>>>>
>>>> val ssc = new StreamingContext
>>>> 
>>>> ssc.start()
>>>> try {
>>>>ssc.awaitTermination()
>>>> } catch {
>>>>case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>>>> }
>>>>
>>>> It works but one problem still exists - after job failed and before
>>>> streaming context is stopped it manages to start job for next batch. That
>>>> is not desirable for me.
>>>> It works like this because JobScheduler is an actor and after it
>>>> reports error, it goes on with next message that starts next batch job.
>>>> While ssc.awaitTermination() works in another thread and happens after next
>>>> batch starts.
>>>>
>>>> Is there a way to stop before next job is submitted?
>>>>
>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: correct and fast way to stop streaming application

2015-10-26 Thread varun sharma
+1, wanted to do same.

On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <krot.vyaches...@gmail.com>
wrote:

> Hi all,
>
> I wonder what is the correct way to stop streaming application if some job
> failed?
> What I have now:
>
> val ssc = new StreamingContext
> 
> ssc.start()
> try {
>ssc.awaitTermination()
> } catch {
>case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
> }
>
> It works but one problem still exists - after job failed and before
> streaming context is stopped it manages to start job for next batch. That
> is not desirable for me.
> It works like this because JobScheduler is an actor and after it reports
> error, it goes on with next message that starts next batch job. While
> ssc.awaitTermination() works in another thread and happens after next batch
> starts.
>
> Is there a way to stop before next job is submitted?
>



-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Streaming and Filtering > 3000 partitons

2015-10-22 Thread varun sharma
oduce back to filtered topics in Kafka.
>>
>>
>>
>> Using the receiver-less based approach with Spark 1.4.1 (described here
>> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
>> I am able to use either KafkaUtils.createDirectStream or
>> KafkaUtils.createRDD, consume from many topics, and filter them with the
>> same filters but I can't seem to wrap my head around how to apply
>> topic-specific filters, or to finally produce to topic-specific destination
>> topics.
>>
>>
>>
>> Another point would be that I will need to checkpoint the metadata after
>> each successful batch and set starting offsets per partition back to ZK.  I
>> expect I can do that on the final RDDs after casting them accordingly, but
>> if anyone has any expertise/guidance doing that and is willing to share,
>> I'd be pretty grateful.
>>
>>
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Issue in spark batches

2015-10-21 Thread varun sharma
Hi TD,
Is there any way in spark  I can fail/retry batch in case of any exceptions
or do I have to write code to explicitly keep on retrying?
Also If some batch fail, I want to block further batches to be processed as
it would create inconsistency in updation of zookeeper offsets and maybe
kill the job itself after lets say 3 retries.

Any pointers to achieve same are appreciated.

On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <t...@databricks.com> wrote:

> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
> actually completing with exception, the UI does not update correctly.
>
> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Also, As you can see the timestamps in attached image. batches coming
>> after the Cassandra server comes up(21:04) are processed and batches which
>> are in hung state(21:03) never get processed.
>> So, How do I fail those batches so that those can be processed again.
>>
>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com>
>> wrote:
>>
>>> Hi TD,
>>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>>> if i catch any exceptions?.
>>> Right now that batch doesn't fail and remain in hung state. Is there any
>>> way I fail that batch so that it can be tried again.
>>>
>>> Thanks
>>> Varun
>>>
>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>>> does, you can catch that exception and write your own logic to retry and/or
>>>> no update. Once the foreachRDD function completes, that batch will be
>>>> internally marked as completed.
>>>>
>>>> TD
>>>>
>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
>>>> varunsharman...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>>> job.*
>>>>> *Spark 1.4.0*
>>>>> *cassandra connector 1.4.0-M3*
>>>>> *Issue is:*
>>>>>
>>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>>> offsets in *zookeeper*.
>>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>>> arrives in that time window is not processed ever though the offsets are
>>>>> updated in zookeeper.
>>>>> It is resulting data loss.
>>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>>> PFA the screenshots of hung batches and code.
>>>>>
>>>>> *Code:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>> .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>>> StreamModel.getColumns)
>>>>>
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>> })
>>>>>
>>>>> *I have even tried this variant:*
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>   val stream = rdd
>>>>> .map(x =>JsonUtility.deserialize(x))
>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>>> StreamModel.getColumns)
>>>>> })
>>>>>
>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>
>>>>>   //commit the offsets once everything is done
>>>>>
>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>
>>>>> }
>>>>>
>>>>> Exception when cassandra cluster is down:
>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>>> streaming job 144523914 ms.3
>>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>>> {..}
>>>>>
>>>>> --
>>>>> *VARUN SHARMA*
>>>>> *Flipkart*
>>>>> *Bangalore*
>>>>>
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Issue in spark batches

2015-10-20 Thread varun sharma
Hi TD,
Yes saveToCassandra throws exception. How do I fail that task explicitly if
i catch any exceptions?.
Right now that batch doesn't fail and remain in hung state. Is there any
way I fail that batch so that it can be tried again.

Thanks
Varun

On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it does,
> you can catch that exception and write your own logic to retry and/or no
> update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
>> after
>> parsing the json and the subsequently updating the offsets in *zookeeper*
>> .
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 144523914 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {..}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-04 Thread varun sharma
I went through the story and as I understood it is for saving data to
multiple keyspaces at once.
How will it work for saving data to multiple tables in same keyspace.
I think tableName: String should also be tableName: T=>String..
Let me know if I understood incorrectly..


On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi,
>
> collect(partialFunction) is equivalent to filter(x=>
> partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
> equivalent to your expression. I favor collect for its more compact form
> but that's a personal preference. Use what you feel reads best.
>
> Regarding performance, there will be some overhead of submitting many a
> task for every filtered RDD that gets materialized to Cassandra. That's the
> reason I proposed the ticket linked above. Have a look whether that would
> improve your particular usecase and vote for it if so :-)
>
> -kr, Gerard.
>
> On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Thanks Gerardthe code snippet you shared worked.. but can you please
>> explain/point me the usage of *collect* here. How it is
>> different(performance/readability) from *filter.*
>>
>>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>>
>>
>> I am doing something like this.Please tell if I can improve the *Processing
>> time* of this particular code:
>>
>> kafkaStringStream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct().collect()
>>   if (topics.length > 0) {
>> val rdd_value = rdd.take(10).mkString("\n.\n")
>> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
>> feeds\n$rdd_value"))
>>
>> topics.foreach { topic =>
>>   //rdd.filter(x=> x._1 == topic).map(_._2)
>>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
>> }
>> updateOffsetsinZk(rdd)
>>   }
>>
>> }
>>
>> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Something like this?
>>>
>>> I'm making the assumption that your topic name equals your keyspace for
>>> this filtering example.
>>>
>>> dstream.foreachRDD{rdd =>
>>>   val topics = rdd.map(_._1).distinct.collect
>>>   topics.foreach{topic =>
>>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>>> collect with rdd.collect() that brings data to the driver
>>>   }
>>> }
>>>
>>>
>>> I'm wondering: would something like this (
>>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>>> purposes?
>>>
>>> -kr, Gerard.
>>>
>>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
>>> wrote:
>>>
>>>> Hi Adrian,
>>>>
>>>> Can you please give an example of how to achieve this:
>>>>
>>>>> *I would also look at filtering by topic and saving as different
>>>>> Dstreams in your code*
>>>>
>>>> I have managed to get DStream[(String, String)] which is (
>>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>>> DStream[(String,Iterable[String])].
>>>> But I want a DStream instead of Iterable in order to apply
>>>> saveToCassandra for storing it.
>>>>
>>>> Please help in how to transform iterable to DStream or any other
>>>> workaround for achieving same.
>>>>
>>>>
>>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com>
>>>> wrote:
>>>>
>>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>>> operators for the processing.
>>>>>
>>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>>> topics) and the processing is *really* different, I would also look
>>>>> at filtering by topic and saving as different Dstreams in your code.
>>>>>
>>>>> Either way you need to start with Cody’s tip in order to extract the
>>>>> topic name.
&g

Re: Kafka Direct Stream

2015-10-03 Thread varun sharma
Thanks Gerardthe code snippet you shared worked.. but can you please
explain/point me the usage of *collect* here. How it is
different(performance/readability) from *filter.*

> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*


I am doing something like this.Please tell if I can improve the *Processing
time* of this particular code:

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
val rdd_value = rdd.take(10).mkString("\n.\n")
Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all
feeds\n$rdd_value"))

topics.foreach { topic =>
  //rdd.filter(x=> x._1 == topic).map(_._2)
  val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
  CassandraHelper.saveDataToCassandra(topic, filteredRdd)
}
updateOffsetsinZk(rdd)
  }

}

On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Something like this?
>
> I'm making the assumption that your topic name equals your keyspace for
> this filtering example.
>
> dstream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct.collect
>   topics.foreach{topic =>
> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
> collect with rdd.collect() that brings data to the driver
>   }
> }
>
>
> I'm wondering: would something like this (
> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
> purposes?
>
> -kr, Gerard.
>
> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi Adrian,
>>
>> Can you please give an example of how to achieve this:
>>
>>> *I would also look at filtering by topic and saving as different
>>> Dstreams in your code*
>>
>> I have managed to get DStream[(String, String)] which is (
>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>> Now if I do kafkaStringStream.groupByKey() then I would get a
>> DStream[(String,Iterable[String])].
>> But I want a DStream instead of Iterable in order to apply
>> saveToCassandra for storing it.
>>
>> Please help in how to transform iterable to DStream or any other
>> workaround for achieving same.
>>
>>
>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>> operators for the processing.
>>>
>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>> topics) and the processing is *really* different, I would also look at
>>> filtering by topic and saving as different Dstreams in your code.
>>>
>>> Either way you need to start with Cody’s tip in order to extract the
>>> topic name.
>>>
>>> -adrian
>>>
>>> From: Cody Koeninger
>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>> To: Udit Mehta
>>> Cc: user
>>> Subject: Re: Kafka Direct Stream
>>>
>>> You can get the topic for a given partition from the offset range.  You
>>> can either filter using that; or just have a single rdd and match on topic
>>> when doing mapPartitions or foreachPartition (which I think is a better
>>> idea)
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>
>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using spark direct stream to consume from multiple topics in
>>>> Kafka. I am able to consume fine but I am stuck at how to separate the data
>>>> for each topic since I need to process data differently depending on the
>>>> topic.
>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>> each having 1 topic.
>>>>
>>>> Any help would be appreciated.
>>>>
>>>> Thanks in advance,
>>>> Udit
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Adrian,

Can you please give an example of how to achieve this:

> *I would also look at filtering by topic and saving as different Dstreams
> in your code*

I have managed to get DStream[(String, String)] which is (*topic,my_data)*
tuple. Lets call it kafkaStringStream.
Now if I do kafkaStringStream.groupByKey() then I would get a
DStream[(String,Iterable[String])].
But I want a DStream instead of Iterable in order to apply saveToCassandra
for storing it.

Please help in how to transform iterable to DStream or any other workaround
for achieving same.


On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:

> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
Hi Nicolae,

Won't creating N KafkaDirectStreams be an overhead for my streaming job
compared to Single DirectStream?

On Fri, Oct 2, 2015 at 1:13 AM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> If you just need processing per topic, why not generate N different kafka
> direct streams ? when creating a kafka direct stream you have list of
> topics - just give one.
>
>
> Then the reusable part of your computations should be extractable as
> transformations/functions and reused between the streams.
>
>
> Nicu
>
>
>
> --
> *From:* Adrian Tanase <atan...@adobe.com>
> *Sent:* Thursday, October 1, 2015 5:47 PM
> *To:* Cody Koeninger; Udit Mehta
> *Cc:* user
> *Subject:* Re: Kafka Direct Stream
>
> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look at
> filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the topic
> name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.  You
> can either filter using that; or just have a single rdd and match on topic
> when doing mapPartitions or foreachPartition (which I think is a better
> idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
> Spark Streaming + Kafka Integration Guide - Spark 1.5.0 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Read more...
> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>
>
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in Kafka.
>> I am able to consume fine but I am stuck at how to separate the data for
>> each topic since I need to process data differently depending on the topic.
>> I basically want to split the RDD consisting on N topics into N RDD's
>> each having 1 topic.
>>
>> Any help would be appreciated.
>>
>> Thanks in advance,
>> Udit
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


OOM error in Spark worker

2015-10-01 Thread varun sharma
My workers are going OOM over time. I am running a streaming job in spark
1.4.0.
Here is the heap dump of workers.







*16,802 instances of "org.apache.spark.deploy.worker.ExecutorRunner",
loaded by "sun.misc.Launcher$AppClassLoader @ 0xdff94088" occupy
488,249,688 (95.80%) bytes. These instances are referenced from one
instance of "java.lang.Object[]", loaded by "" Keywords org.apache.spark.deploy.worker.ExecutorRunner
java.lang.Object[] sun.misc.Launcher$AppClassLoader
@ 0xdff94088 *
is this because of this bug:
http://apache-spark-developers-list.1001551.n3.nabble.com/Worker-memory-leaks-td13341.html
https://issues.apache.org/jira/browse/SPARK-9202

Also,
I am getting below error continuously if one of the worker/executor dies on
any node in my spark cluster.
If I start the worker also, error doesn't go. I have to force_kill my
streaming job and restart to fix the issue. Is it some bug?
I am using Spark 1.4.0.


*MY_IP in logs is IP of worker node which failed. *
























*15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194218 -
Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
  at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
  at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
  at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745) 15/09/03 11:29:11 WARN
BlockManagerMaster: Failed to remove RDD 194217 - Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms] at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
  at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
  at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
  at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
  at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745) 15/09/03 11:29:11 ERROR
SparkDeploySchedulerBackend: Asked to remove non-existent executor
16723 15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD
194216 - Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]} *

*It is easily reproducible if I manually stop a worker on one of my node. *


*15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 329 15/09/03 23:52:18 ERROR
SparkDeploySchedulerBackend: Asked to remove non-existent executor
333 15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 334 *


*It doesn't go even if I start the worker again. Follow up question: If my
streaming job has consumed some events from Kafka topic and are pending to
be scheduled because of delay in processing... Will my force killing the
streaming job lose that data which is not yet scheduled? *


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


OOM error in Spark worker

2015-09-29 Thread varun sharma
My workers are going OOM over time. I am running a streaming job in spark
1.4.0.
Here is the heap dump of workers.

/16,802 instances of "org.apache.spark.deploy.worker.ExecutorRunner", loaded
by "sun.misc.Launcher$AppClassLoader @ 0xdff94088" occupy 488,249,688
(95.80%) bytes. These instances are referenced from one instance of
"java.lang.Object[]", loaded by ""

Keywords
org.apache.spark.deploy.worker.ExecutorRunner
java.lang.Object[]
sun.misc.Launcher$AppClassLoader @ 0xdff94088
/

I am getting below error continuously if one of the worker/executor dies on
any node in my spark cluster.
If I start the worker also, error doesn't go. I have to force_kill my
streaming job and restart to fix the issue. Is it some bug?
I am using Spark 1.4.0.

MY_IP in logs is IP of worker node which failed.

/15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194218 -
Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194217 - Ask
timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/09/03 11:29:11 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 16723
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194216 - Ask
timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
/
It is easily reproducible if I manually stop a worker on one of my node.
/15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 329
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 333
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 334
/
It doesn't go even if I start the worker again.

Follow up question: If my streaming job has consumed some events from Kafka
topic and are pending to be scheduled because of delay in processing... Will
my force killing the streaming job lose that data which is not yet
scheduled?

Please help ASAP.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OOM-error-in-Spark-worker-tp24856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Installation Maven PermGen OutOfMemoryException

2014-12-27 Thread varun sharma
This works for me:

export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M
-XX:ReservedCodeCacheSize=512m  mvn -DskipTests clean package



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Installation-Maven-PermGen-OutOfMemoryException-tp20831p20868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Debian package for spark?

2014-12-25 Thread varun sharma
Hi Kevin,
Were you able to build spark with command export MAVEN_OPTS=-Xmx2g
-XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m  mvn -Pdeb
-DskipTests clean package ?

I am getting the below error for all versions of spark(even 1.2.0):

Failed to execute goal org.vafer:jdeb:0.11:jdeb (default) on project
spark-assembly_2.10: Failed to create debian package
/opt/spark/spark-1.2.0/assembly/target/spark_1.2.0-${buildNumber}_all.deb:
Could not create deb package: Control file descriptor keys are invalid
[Version]. The following keys are mandatory [Package, Version, Section,
Priority, Architecture, Maintainer, Description]. Please check your
pom.xml/build.xml and your control file. - [Help 1]

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Debian-package-for-spark-tp18410p20856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org