Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-26 Thread shyla deshpande
Hi Fawze,
Thank you for the link. But that is exactly what I am doing.
I think this is related to
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
setting.
When the disk utilization exceeds this setting, the node is marked
unhealthy.

Other than increasing the default 90%, is there anything else I can do?

Thanks
-Shyla



On Tue, Dec 25, 2018 at 7:26 PM Fawze Abujaber  wrote:

> http://shzhangji.com/blog/2015/05/31/spark-streaming-logging-configuration/
>
> On Wed, Dec 26, 2018 at 1:05 AM shyla deshpande 
> wrote:
>
>> Please point me to any documentation if available. Thanks
>>
>> On Tue, Dec 18, 2018 at 11:10 AM shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Is there a way to do this without stopping the streaming application in
>>> yarn cluster mode?
>>>
>>> On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I get the ERROR
>>>> 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
>>>> /var/log/hadoop-yarn/containers
>>>>
>>>> Is there a way to clean up these directories while the spark streaming
>>>> application is running?
>>>>
>>>> Thanks
>>>>
>>>
>
> --
> Take Care
> Fawze Abujaber
>


Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-25 Thread shyla deshpande
Please point me to any documentation if available. Thanks

On Tue, Dec 18, 2018 at 11:10 AM shyla deshpande 
wrote:

> Is there a way to do this without stopping the streaming application in
> yarn cluster mode?
>
> On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande 
> wrote:
>
>> I get the ERROR
>> 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
>> /var/log/hadoop-yarn/containers
>>
>> Is there a way to clean up these directories while the spark streaming
>> application is running?
>>
>> Thanks
>>
>


Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-18 Thread shyla deshpande
Is there a way to do this without stopping the streaming application in
yarn cluster mode?

On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande 
wrote:

> I get the ERROR
> 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
> /var/log/hadoop-yarn/containers
>
> Is there a way to clean up these directories while the spark streaming
> application is running?
>
> Thanks
>


How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-17 Thread shyla deshpande
I get the ERROR
1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
/var/log/hadoop-yarn/containers

Is there a way to clean up these directories while the spark streaming
application is running?

Thanks


spark job error

2018-01-30 Thread shyla deshpande
I am running Zeppelin on EMR. with the default settings.  I am getting the
following error. Restarting the Zeppelin application fixes the problem.

What default settings do I need to override that will help fix this error.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 71
in stage 231.0 failed 4 times, most recent failure: Lost task 71.3 in stage
231.0 Reason: Container killed by YARN for exceeding memory limits. 1.4 GB
of 1.4 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

Thanks


How do I save the dataframe data as a pdf file?

2017-12-12 Thread shyla deshpande
Hello all,

Is there a way to write the dataframe data as a pdf file?

Thanks
-Shyla


Re: Any libraries to do Complex Event Processing with spark streaming?

2017-10-03 Thread shyla deshpande
On Tue, Oct 3, 2017 at 10:50 AM, shyla deshpande 
wrote:

> Hi all,
> I have a data pipeline using Spark streaming, Kafka and Cassandra.
> Are there any libraries to help me with complex event processing using
> Spark Streaming?
>
> I appreciate your help.
>
> Thanks
>


Any libraries to do Complex Event Processing with spark streaming?

2017-10-03 Thread shyla deshpande
Hi all,
I have a data pipeline using Spark streaming, Kafka and Cassandra.
Are there any libraries to help me with complex event processing using
Spark Streaming?

I appreciate your help.

Thanks


What is the right way to stop a streaming application?

2017-08-22 Thread shyla deshpande
Hi all,

I am running a spark streaming application on AWS EC2 cluster in standalone
mode. I am using DStreams and Spark 2.0.2

I do have the setting stopGracefullyOnShutdown to true.  What is the right
way to stop the streaming application.

Thanks


Re: How do I pass multiple cassandra hosts in spark submit?

2017-08-10 Thread shyla deshpande
Got the answer from
https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/ETCZdCcaKq8



On Thu, Aug 10, 2017 at 11:59 AM, shyla deshpande 
wrote:

> I have a 3 node cassandra cluster. I want to pass all the 3 nodes in spark
> submit. How do I do that.
> Any code samples will help.
> Thanks
>


How do I pass multiple cassandra hosts in spark submit?

2017-08-10 Thread shyla deshpande
I have a 3 node cassandra cluster. I want to pass all the 3 nodes in spark
submit. How do I do that.
Any code samples will help.
Thanks


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-10 Thread shyla deshpande
Thanks Cody.

On Wed, Aug 9, 2017 at 8:46 AM, Cody Koeninger  wrote:

> org.apache.spark.streaming.kafka.KafkaCluster has methods
> getLatestLeaderOffsets and getEarliestLeaderOffsets
>
> On Mon, Aug 7, 2017 at 11:37 PM, shyla deshpande
>  wrote:
> > Thanks TD.
> >
> > On Mon, Aug 7, 2017 at 8:59 PM, Tathagata Das <
> tathagata.das1...@gmail.com>
> > wrote:
> >>
> >> I dont think there is any easier way.
> >>
> >> On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande <
> deshpandesh...@gmail.com>
> >> wrote:
> >>>
> >>> Thanks TD for the response. I forgot to mention that I am not using
> >>> structured streaming.
> >>>
> >>> I was looking into KafkaUtils.createRDD, and looks like I need to get
> the
> >>> earliest and the latest offset for each partition to build the
> >>> Array(offsetRange). I wanted to know if there was a easier way.
> >>>
> >>> 1 reason why we are hesitating to use structured streaming is because I
> >>> need to persist the data in Cassandra database which I believe is not
> >>> production ready.
> >>>
> >>>
> >>> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das
> >>>  wrote:
> >>>>
> >>>> Its best to use DataFrames. You can read from as streaming or as
> batch.
> >>>> More details here.
> >>>>
> >>>>
> >>>> https://spark.apache.org/docs/latest/structured-streaming-
> kafka-integration.html#creating-a-kafka-source-for-batch-queries
> >>>>
> >>>> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
> >>>>
> >>>> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande
> >>>>  wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>> What is the easiest way to read all the data from kafka in a batch
> >>>>> program for a given topic?
> >>>>> I have 10 kafka partitions, but the data is not much. I would like to
> >>>>> read  from the earliest from all the partitions for a topic.
> >>>>>
> >>>>> I appreciate any help. Thanks
> >>>>
> >>>>
> >>>
> >>
> >
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Thanks TD.

On Mon, Aug 7, 2017 at 8:59 PM, Tathagata Das 
wrote:

> I dont think there is any easier way.
>
> On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande 
> wrote:
>
>> Thanks TD for the response. I forgot to mention that I am not using
>> structured streaming.
>>
>> I was looking into KafkaUtils.createRDD, and looks like I need to get
>> the earliest and the latest offset for each partition to build the
>> Array(offsetRange). I wanted to know if there was a easier way.
>>
>> 1 reason why we are hesitating to use structured streaming is because I
>> need to persist the data in Cassandra database which I believe is not
>> production ready.
>>
>>
>> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Its best to use DataFrames. You can read from as streaming or as batch.
>>> More details here.
>>>
>>> https://spark.apache.org/docs/latest/structured-streaming-ka
>>> fka-integration.html#creating-a-kafka-source-for-batch-queries
>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>
>>> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> What is the easiest way to read all the data from kafka in a batch
>>>> program for a given topic?
>>>> I have 10 kafka partitions, but the data is not much. I would like to
>>>> read  from the earliest from all the partitions for a topic.
>>>>
>>>> I appreciate any help. Thanks
>>>>
>>>
>>>
>>
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Thanks TD for the response. I forgot to mention that I am not using
structured streaming.

I was looking into KafkaUtils.createRDD, and looks like I need to get the
earliest and the latest offset for each partition to build the
Array(offsetRange). I wanted to know if there was a easier way.

1 reason why we are hesitating to use structured streaming is because I
need to persist the data in Cassandra database which I believe is not
production ready.


On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das 
wrote:

> Its best to use DataFrames. You can read from as streaming or as batch.
> More details here.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> kafka-integration.html#creating-a-kafka-source-for-batch-queries
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande 
> wrote:
>
>> Hi all,
>>
>> What is the easiest way to read all the data from kafka in a batch
>> program for a given topic?
>> I have 10 kafka partitions, but the data is not much. I would like to
>> read  from the earliest from all the partitions for a topic.
>>
>> I appreciate any help. Thanks
>>
>
>


KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Hi all,

What is the easiest way to read all the data from kafka in a batch program
for a given topic?
I have 10 kafka partitions, but the data is not much. I would like to read
 from the earliest from all the partitions for a topic.

I appreciate any help. Thanks


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
I am doing that already for all known messy data. Thanks Cody for all your
time and input

On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger  wrote:

> Yes
>
> On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
>  wrote:
> > Thanks Cody again.
> >
> > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it
> in
> > the Cassandra table and saveToCassandra  is an action and my data do get
> > saved into Cassandra. It is working as expected 99% of the time except
> that
> > when there is an exception, I did not want the offsets to be committed.
> >
> > By Filtering for unsuccessful attempts, do you mean filtering the bad
> > records...
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger 
> wrote:
> >>
> >> If literally all you are doing is rdd.map I wouldn't expect
> >> saveToCassandra to happen at all, since map is not an action.
> >>
> >> Filtering for unsuccessful attempts and collecting those back to the
> >> driver would be one way for the driver to know whether it was safe to
> >> commit.
> >>
> >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
> >>  wrote:
> >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  -->
> is
> >> > running on executor
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
> is
> >> > running on driver.
> >> >
> >> > Is this the reason why kafka offsets are committed even when an
> >> > exception is
> >> > raised? If so is there a way to commit the offsets only when there are
> >> > no
> >> > exceptions?
> >> >
> >> >
> >> >
> >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
> >> > 
> >> > wrote:
> >> >>
> >> >> Thanks again Cody,
> >> >>
> >> >> My understanding is all the code inside foreachRDD is running on the
> >> >> driver except for
> >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >> >>
> >> >> When the exception is raised, I was thinking I won't be committing
> the
> >> >> offsets, but the offsets are committed all the time independent of
> >> >> whether
> >> >> an exception was raised or not.
> >> >>
> >> >> It will be helpful if you can explain this behavior.
> >> >>
> >> >>
> >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> >> >> wrote:
> >> >>>
> >> >>> I mean that the kafka consumers running on the executors should not
> be
> >> >>> automatically committing, because the fact that a message was read
> by
> >> >>> the consumer has no bearing on whether it was actually successfully
> >> >>> processed after reading.
> >> >>>
> >> >>> It sounds to me like you're confused about where code is running.
> >> >>> foreachRDD runs on the driver, not the executor.
> >> >>>
> >> >>>
> >> >>>
> >> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >> >>>
> >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >> >>>  wrote:
> >> >>> > Thanks Cody for your response.
> >> >>> >
> >> >>> > All I want to do is, commit the offsets only if I am successfully
> >> >>> > able
> >> >>> > to
> >> >>> > write to cassandra database.
> >> >>> >
> >> >>> > The line //save the rdd to Cassandra database is
> >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >> >>> >
> >> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> >> >>> > why
> >> >>> > it's
> >> >>> > being overridden. It is the executors that do the mapping and
> saving
> >> >>> > to
> >> >>> > cassandra. The status of success or failure of this operation is
> >> >>> > known
> >> >>> > o

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
Thanks Cody again.

No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
the Cassandra table and saveToCassandra  is an action and my data do get
saved into Cassandra. It is working as expected 99% of the time except that
when there is an exception, I did not want the offsets to be committed.

By Filtering for unsuccessful attempts, do you mean filtering the bad
records...






On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger  wrote:

> If literally all you are doing is rdd.map I wouldn't expect
> saveToCassandra to happen at all, since map is not an action.
>
> Filtering for unsuccessful attempts and collecting those back to the
> driver would be one way for the driver to know whether it was safe to
> commit.
>
> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>  wrote:
> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> > running on executor
> >
> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> > running on driver.
> >
> > Is this the reason why kafka offsets are committed even when an
> exception is
> > raised? If so is there a way to commit the offsets only when there are no
> > exceptions?
> >
> >
> >
> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande <
> deshpandesh...@gmail.com>
> > wrote:
> >>
> >> Thanks again Cody,
> >>
> >> My understanding is all the code inside foreachRDD is running on the
> >> driver except for
> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >>
> >> When the exception is raised, I was thinking I won't be committing the
> >> offsets, but the offsets are committed all the time independent of
> whether
> >> an exception was raised or not.
> >>
> >> It will be helpful if you can explain this behavior.
> >>
> >>
> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> wrote:
> >>>
> >>> I mean that the kafka consumers running on the executors should not be
> >>> automatically committing, because the fact that a message was read by
> >>> the consumer has no bearing on whether it was actually successfully
> >>> processed after reading.
> >>>
> >>> It sounds to me like you're confused about where code is running.
> >>> foreachRDD runs on the driver, not the executor.
> >>>
> >>>
> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >>>
> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >>>  wrote:
> >>> > Thanks Cody for your response.
> >>> >
> >>> > All I want to do is, commit the offsets only if I am successfully
> able
> >>> > to
> >>> > write to cassandra database.
> >>> >
> >>> > The line //save the rdd to Cassandra database is
> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >>> >
> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> why
> >>> > it's
> >>> > being overridden. It is the executors that do the mapping and saving
> to
> >>> > cassandra. The status of success or failure of this operation is
> known
> >>> > only
> >>> > on the executor and thats where I want to commit the kafka offsets.
> If
> >>> > this
> >>> > is not what I sould be doing, then  what is the right way?
> >>> >
> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> >>> > wrote:
> >>> >>
> >>> >> If your complaint is about offsets being committed that you didn't
> >>> >> expect... auto commit being false on executors shouldn't have
> anything
> >>> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >>> >> it's being overridden.
> >>> >>
> >>> >> What you've said and the code you posted isn't really enough to
> >>> >> explain what your issue is, e.g.
> >>> >>
> >>> >> is this line
> >>> >> // save the rdd to Cassandra database
> >>> >> a blocking call
> >>> >>
> >>> >> are you sure that the rdd foreach isn't bei

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
running on executor

stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
is running on driver.

Is this the reason why kafka offsets are committed even when an
exception is raised? If so is there a way to commit the offsets only
when there are no exceptions?



On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande 
wrote:

> Thanks again Cody,
>
> My understanding is all the code inside foreachRDD is running on the
> driver except for
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>
> When the exception is raised, I was thinking I won't be committing the
> offsets, but the offsets are committed all the time independent of whether
> an exception was raised or not.
>
> It will be helpful if you can explain this behavior.
>
>
> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:
>
>> I mean that the kafka consumers running on the executors should not be
>> automatically committing, because the fact that a message was read by
>> the consumer has no bearing on whether it was actually successfully
>> processed after reading.
>>
>> It sounds to me like you're confused about where code is running.
>> foreachRDD runs on the driver, not the executor.
>>
>> http://spark.apache.org/docs/latest/streaming-programming-gu
>> ide.html#design-patterns-for-using-foreachrdd
>>
>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>  wrote:
>> > Thanks Cody for your response.
>> >
>> > All I want to do is, commit the offsets only if I am successfully able
>> to
>> > write to cassandra database.
>> >
>> > The line //save the rdd to Cassandra database is
>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >
>> > What do you mean by Executors shouldn't be auto-committing, that's why
>> it's
>> > being overridden. It is the executors that do the mapping and saving to
>> > cassandra. The status of success or failure of this operation is known
>> only
>> > on the executor and thats where I want to commit the kafka offsets. If
>> this
>> > is not what I sould be doing, then  what is the right way?
>> >
>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>> wrote:
>> >>
>> >> If your complaint is about offsets being committed that you didn't
>> >> expect... auto commit being false on executors shouldn't have anything
>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>> >> it's being overridden.
>> >>
>> >> What you've said and the code you posted isn't really enough to
>> >> explain what your issue is, e.g.
>> >>
>> >> is this line
>> >> // save the rdd to Cassandra database
>> >> a blocking call
>> >>
>> >> are you sure that the rdd foreach isn't being retried and succeeding
>> >> the second time around, etc
>> >>
>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>  wrote:
>> >> > Hello All,
>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >> >
>> >> > I am setting enable.auto.commit to false, and manually want to commit
>> >> > the
>> >> > offsets after my output operation is successful. So when a exception
>> is
>> >> > raised during during the processing I do not want the offsets to be
>> >> > committed. But looks like the offsets are automatically committed
>> even
>> >> > when
>> >> > the exception is raised and thereby I am losing data.
>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> >> > executor.  But I don't want it to override. Please help.
>> >> >
>> >> > My code looks like..
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> brokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "group.id" -> "Group1",
>> >> >   "auto.offset.reset" -> offsetresetparameter,
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> >> > )
>> >> >
>> >> > val myTopics = Array("topic1")
>> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >> >   ssc,
>> >> >   PreferConsistent,
>> >> >   Subscribe[String, String](myTopics, kafkaParams)
>> >> > )
>> >> >
>> >> > stream1.foreachRDD { (rdd, time) =>
>> >> > val offsetRanges =
>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >> > try {
>> >> > //save the rdd to Cassandra database
>> >> >
>> >> >
>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> >> > } catch {
>> >> >   case ex: Exception => {
>> >> > println(ex.toString + "!! Bad Data, Unable to persist
>> >> > into
>> >> > table !" + errorOffsetRangesToString(offsetRanges))
>> >> >   }
>> >> > }
>> >> > }
>> >> >
>> >> > ssc.start()
>> >> > ssc.awaitTermination()
>> >
>> >
>>
>
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks again Cody,

My understanding is all the code inside foreachRDD is running on the driver
except for
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").

When the exception is raised, I was thinking I won't be committing the
offsets, but the offsets are committed all the time independent of whether
an exception was raised or not.

It will be helpful if you can explain this behavior.


On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:

> I mean that the kafka consumers running on the executors should not be
> automatically committing, because the fact that a message was read by
> the consumer has no bearing on whether it was actually successfully
> processed after reading.
>
> It sounds to me like you're confused about where code is running.
> foreachRDD runs on the driver, not the executor.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
>
> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>  wrote:
> > Thanks Cody for your response.
> >
> > All I want to do is, commit the offsets only if I am successfully able to
> > write to cassandra database.
> >
> > The line //save the rdd to Cassandra database is
> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >
> > What do you mean by Executors shouldn't be auto-committing, that's why
> it's
> > being overridden. It is the executors that do the mapping and saving to
> > cassandra. The status of success or failure of this operation is known
> only
> > on the executor and thats where I want to commit the kafka offsets. If
> this
> > is not what I sould be doing, then  what is the right way?
> >
> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> wrote:
> >>
> >> If your complaint is about offsets being committed that you didn't
> >> expect... auto commit being false on executors shouldn't have anything
> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >> it's being overridden.
> >>
> >> What you've said and the code you posted isn't really enough to
> >> explain what your issue is, e.g.
> >>
> >> is this line
> >> // save the rdd to Cassandra database
> >> a blocking call
> >>
> >> are you sure that the rdd foreach isn't being retried and succeeding
> >> the second time around, etc
> >>
> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >>  wrote:
> >> > Hello All,
> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >
> >> > I am setting enable.auto.commit to false, and manually want to commit
> >> > the
> >> > offsets after my output operation is successful. So when a exception
> is
> >> > raised during during the processing I do not want the offsets to be
> >> > committed. But looks like the offsets are automatically committed even
> >> > when
> >> > the exception is raised and thereby I am losing data.
> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> >> > executor.  But I don't want it to override. Please help.
> >> >
> >> > My code looks like..
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> brokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "group.id" -> "Group1",
> >> >   "auto.offset.reset" -> offsetresetparameter,
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> >> > )
> >> >
> >> > val myTopics = Array("topic1")
> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >> >   ssc,
> >> >   PreferConsistent,
> >> >   Subscribe[String, String](myTopics, kafkaParams)
> >> > )
> >> >
> >> > stream1.foreachRDD { (rdd, time) =>
> >> > val offsetRanges =
> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> > try {
> >> > //save the rdd to Cassandra database
> >> >
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> >> > } catch {
> >> >   case ex: Exception => {
> >> > println(ex.toString + "!! Bad Data, Unable to persist
> >> > into
> >> > table !" + errorOffsetRangesToString(offsetRanges))
> >> >   }
> >> > }
> >> > }
> >> >
> >> > ssc.start()
> >> > ssc.awaitTermination()
> >
> >
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks Cody for your response.

All I want to do is, commit the offsets only if I am successfully able to
write to cassandra database.

The line //save the rdd to Cassandra database is
rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")

What do you mean by Executors shouldn't be auto-committing, that's why it's
being overridden. It is the executors that do the mapping and saving to
cassandra. The status of success or failure of this operation is known only
on the executor and thats where I want to commit the kafka offsets. If this
is not what I sould be doing, then  what is the right way?

On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger  wrote:

> If your complaint is about offsets being committed that you didn't
> expect... auto commit being false on executors shouldn't have anything
> to do with that.  Executors shouldn't be auto-committing, that's why
> it's being overridden.
>
> What you've said and the code you posted isn't really enough to
> explain what your issue is, e.g.
>
> is this line
> // save the rdd to Cassandra database
> a blocking call
>
> are you sure that the rdd foreach isn't being retried and succeeding
> the second time around, etc
>
> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>  wrote:
> > Hello All,
> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >
> > I am setting enable.auto.commit to false, and manually want to commit the
> > offsets after my output operation is successful. So when a exception is
> > raised during during the processing I do not want the offsets to be
> > committed. But looks like the offsets are automatically committed even
> when
> > the exception is raised and thereby I am losing data.
> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> > executor.  But I don't want it to override. Please help.
> >
> > My code looks like..
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> brokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "group.id" -> "Group1",
> >   "auto.offset.reset" -> offsetresetparameter,
> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> > )
> >
> > val myTopics = Array("topic1")
> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >   ssc,
> >   PreferConsistent,
> >   Subscribe[String, String](myTopics, kafkaParams)
> > )
> >
> > stream1.foreachRDD { (rdd, time) =>
> > val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> > try {
> > //save the rdd to Cassandra database
> >
> >   stream1.asInstanceOf[CanCommitOffsets].commitAsync(
> offsetRanges)
> > } catch {
> >   case ex: Exception => {
> > println(ex.toString + "!! Bad Data, Unable to persist
> into
> > table !" + errorOffsetRangesToString(offsetRanges))
> >   }
> > }
> > }
> >
> > ssc.start()
> > ssc.awaitTermination()
>


kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-05 Thread shyla deshpande
Hello All,
I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .

I am setting enable.auto.commit to false, and manually want to commit
the offsets after my output operation is successful. So when a
exception is raised during during the processing I do not want the
offsets to be committed. But looks like the offsets are automatically
committed even when the exception is raised and thereby I am losing
data.
In my logs I see,  WARN  overriding enable.auto.commit to false for
executor.  But I don't want it to override. Please help.

My code looks like..

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "Group1",
  "auto.offset.reset" -> offsetresetparameter,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val myTopics = Array("topic1")
val stream1 = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](myTopics, kafkaParams)
)

stream1.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
try {
//save the rdd to Cassandra database

  stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} catch {
  case ex: Exception => {
println(ex.toString + "!! Bad Data, Unable to persist
into table !" + errorOffsetRangesToString(offsetRanges))
  }
}
}

ssc.start()
ssc.awaitTermination()


Spark streaming application is failing after running for few hours

2017-07-10 Thread shyla deshpande
My Spark streaming application is failing after running for few hours.
After it fails, when I check the storage tab, I see that MapWithStateRDD is
less than 100%.

Is this is reason why it is failing?

What does MapWithStateRDD 90% cached mean. Does this mean I lost 10% or the
10% is spilled to disk and will just be slower?

I am attaching the screenshot of part of  what I see in my storage tab.
Please help.

I am using Spark 2.0.2 .

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark streaming giving me a bunch of WARNINGS, please help meunderstand them

2017-07-10 Thread shyla deshpande
WARN  Use an existing SparkContext, some configuration may not take effect.
 I wanted to restart the spark streaming app, so stopped the
running and issued a new spark submit. Why and how it will use a existing
 SparkContext?
=> you are using checkpoint to restore the sparkcontext.
=> No, I am not using checkpoint for recovery. I need the checkpoint
because I am doing stateful streaming.

WARN  Spark is not running in local mode, therefore the checkpoint
directory must not be on the local filesystem. Directory
'file:/efs/checkpoint' appears to be on the local filesystem.
=>the CP path should be HDFSand so on. If you want to use local path, the
cluster model should be local.
=> I am using the AWS EFS mount for checkpoint because I am running in
standalone mode.

WARN  overriding enable.auto.commit to false for executor
=>stop the executor to commit the offset auto
=>No, I don't want the autocommit. I do the commit later after my output
operation using the commitAsync API

WARN  overriding auto.offset.reset to none for executor
=>it set the index where the executor read msg
=> I set this to none, because I want it to continue from where it left off

WARN  overriding executor group.id to spark-executor-mygroupid
=> set the groupid of consumer. If you do not set, it will set a default
and give a warning.
=> I am setting the groupid, mygroupid and it is adding spark-executor- as
a prefix...


On Mon, Jul 10, 2017 at 12:39 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> It seems you are usibg kafka 0.10.
> See my comments below.
>
> ---Original---
> *From:* "shyla deshpande"
> *Date:* 2017/7/10 08:17:10
> *To:* "user";
> *Subject:* Spark streaming giving me a bunch of WARNINGS, please help
> meunderstand them
>
> WARN  Use an existing SparkContext, some configuration may not take effect.
>  I wanted to restart the spark streaming app, so stopped the
> running and issued a new spark submit. Why and how it will use a existing
>  SparkContext?
> => you are using checkpoint to restore the sparkcontext.
> WARN  Spark is not running in local mode, therefore the checkpoint
> directory must not be on the local filesystem. Directory
> 'file:/efs/checkpoint' appears to be on the local filesystem.
> =>the CP path should be HDFSand so on.
> If you want to use local path, the cluster model should be local.
>
>
> WARN  overriding enable.auto.commit to false for executor
> =>stop the executor to commit the offset auto.
>
> WARN  overriding auto.offset.reset to none for executor
> =>it set the index where the executor read msg
> WARN  overriding executor group.id to spark-executor-mygroupid
> => set the groupid of consumer. If you do not set, it will set a default
> and give a warning.
>
> WARN  overriding receive.buffer.bytes to 65536 see KAFKA-3135
> WARN  overriding enable.auto.commit to false for executor
> WARN  overriding auto.offset.reset to none for executor
>


Spark streaming giving me a bunch of WARNINGS, please help me understand them

2017-07-09 Thread shyla deshpande
WARN  Use an existing SparkContext, some configuration may not take effect.
 I wanted to restart the spark streaming app, so stopped the
running and issued a new spark submit. Why and how it will use a existing
 SparkContext?

WARN  Spark is not running in local mode, therefore the checkpoint
directory must not be on the local filesystem. Directory
'file:/efs/checkpoint' appears to be on the local filesystem.

WARN  overriding enable.auto.commit to false for executor
WARN  overriding auto.offset.reset to none for executor
WARN  overriding executor group.id to spark-executor-mygroupid
WARN  overriding receive.buffer.bytes to 65536 see KAFKA-3135
WARN  overriding enable.auto.commit to false for executor
WARN  overriding auto.offset.reset to none for executor


Spark sql with Zeppelin, Task not serializable error when I try to cache the spark sql table

2017-05-31 Thread shyla deshpande
Hello all,

I am using Zeppelin 0.7.1 with Spark 2.1.0

I am getting org.apache.spark.SparkException: Task not serializable error
when I try to cache the spark sql table. I am using a UDF on a column of
table and want to cache the resultant table . I can execute the paragraph
successfully when there is no caching.

Please help! Thanks
---Following is my code
UDF :
def fn1(res: String): Int = {
  100
}
 spark.udf.register("fn1", fn1(_: String): Int)


   spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "k", "table" -> "t"))
  .load
  .createOrReplaceTempView("t1")


 val df1 = spark.sql("SELECT  col1, col2, fn1(col3)   from t1" )

 df1.createOrReplaceTempView("t2")

   spark.catalog.cacheTable("t2")


Re: Structured streaming and writing output to Cassandra

2017-04-08 Thread shyla deshpande
Thanks Jules. It was helpful.

On Fri, Apr 7, 2017 at 8:32 PM, Jules Damji  wrote:

> This blog that shows how to write a custom sink: https://databricks.com/
> blog/2017/04/04/real-time-end-to-end-integration-with-
> apache-kafka-in-apache-sparks-structured-streaming.html
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Apr 7, 2017, at 11:23 AM, shyla deshpande 
> wrote:
>
> Is anyone using structured streaming and writing the results to Cassandra
> database in a production environment?
>
> I do not think I have enough expertise to write a custom sink that can be
> used in production environment. Please help!
>
>


Structured streaming and writing output to Cassandra

2017-04-07 Thread shyla deshpande
Is anyone using structured streaming and writing the results to Cassandra
database in a production environment?

I do not think I have enough expertise to write a custom sink that can be
used in production environment. Please help!


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread shyla deshpande
Thanks everyone for sharing your ideas. Very useful. I appreciate.

On Fri, Apr 7, 2017 at 10:40 AM, Sam Elamin  wrote:

> Definitely agree with gourav there. I wouldn't want jenkins to run my work
> flow. Seems to me that you would only be using jenkins for its scheduling
> capabilities
>
> Yes you can run tests but you wouldn't want it to run your orchestration
> of jobs
>
> What happens if jenkijs goes down for any particular reason. How do you
> have the conversation with your stakeholders that your pipeline is not
> working and they don't have data because the build server is going through
> an upgrade or going through an upgrade
>
> However to be fair I understand what you are saying Steve if someone is in
> a place where you only have access to jenkins and have to go through hoops
> to setup:get access to new instances then engineers will do what they
> always do, find ways to game the system to get their work done
>
>
>
>
> On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta 
> wrote:
>
>> Hi Steve,
>>
>> Why would you ever do that? You are suggesting the use of a CI tool as a
>> workflow and orchestration engine.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Apr 7, 2017 at 4:07 PM, Steve Loughran 
>> wrote:
>>
>>> If you have Jenkins set up for some CI workflow, that can do scheduled
>>> builds and tests. Works well if you can do some build test before even
>>> submitting it to a remote cluster
>>>
>>> On 7 Apr 2017, at 10:15, Sam Elamin  wrote:
>>>
>>> Hi Shyla
>>>
>>> You have multiple options really some of which have been already listed
>>> but let me try and clarify
>>>
>>> Assuming you have a spark application in a jar you have a variety of
>>> options
>>>
>>> You have to have an existing spark cluster that is either running on EMR
>>> or somewhere else.
>>>
>>> *Super simple / hacky*
>>> Cron job on EC2 that calls a simple shell script that does a spart
>>> submit to a Spark Cluster OR create or add step to an EMR cluster
>>>
>>> *More Elegant*
>>> Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that
>>> will do the above step but have scheduling and potential backfilling and
>>> error handling(retries,alerts etc)
>>>
>>> AWS are coming out with glue <https://aws.amazon.com/glue/> soon that
>>> does some Spark jobs but I do not think its available worldwide just yet
>>>
>>> Hope I cleared things up
>>>
>>> Regards
>>> Sam
>>>
>>>
>>> On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Shyla,
>>>>
>>>> why would you want to schedule a spark job in EC2 instead of EMR?
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
>>>>> easiest way to do this. Thanks
>>>>>
>>>>
>>>>
>>>
>>>
>>


What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-06 Thread shyla deshpande
I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
easiest way to do this. Thanks


What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-06 Thread shyla deshpande
I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
easiest way to do this. Thanks


Re: dataframe filter, unable to bind variable

2017-03-30 Thread shyla deshpande
Works. Thanks Hosur.

On Thu, Mar 30, 2017 at 8:37 PM, hosur narahari  wrote:

> Try lit(fromDate) and lit(toDate). You've to import 
> org.apache.spark.sql.functions.lit
> to use it
>
> On 31 Mar 2017 7:45 a.m., "shyla deshpande" 
> wrote:
>
> The following works
>
> df.filter($"createdate".between("2017-03-20", "2017-03-22"))
>
>
> I would like to pass variables fromdate and todate to the filter
>
>  instead of constants. Unable to get the syntax right. Please help.
>
>
> Thanks
>
>
>


dataframe filter, unable to bind variable

2017-03-30 Thread shyla deshpande
The following works

df.filter($"createdate".between("2017-03-20", "2017-03-22"))


I would like to pass variables fromdate and todate to the filter

 instead of constants. Unable to get the syntax right. Please help.


Thanks


Re: Will the setting for spark.default.parallelism be used for spark.sql.shuffle.output.partitions?

2017-03-30 Thread shyla deshpande
The spark version I am using is spark 2.1.

On Thu, Mar 30, 2017 at 9:58 AM, shyla deshpande 
wrote:

> Thanks
>


Will the setting for spark.default.parallelism be used for spark.sql.shuffle.output.partitions?

2017-03-30 Thread shyla deshpande
Thanks


Re: Spark SQL, dataframe join questions.

2017-03-29 Thread shyla deshpande
On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
wrote:

> Following are my questions. Thank you.
>
> 1. When joining dataframes is it a good idea to repartition on the key column 
> that is used in the join or
> the optimizer is too smart so forget it.
>
> 2. In RDD join, wherever possible we do reduceByKey before the join to avoid 
> a big shuffle of data. Do we need
> to do anything similar with dataframe joins, or the optimizer is too smart so 
> forget it.
>
>


Re: dataframe join questions. Appreciate your input.

2017-03-28 Thread shyla deshpande
On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
wrote:

> Following are my questions. Thank you.
>
> 1. When joining dataframes is it a good idea to repartition on the key column 
> that is used in the join or
> the optimizer is too smart so forget it.
>
> 2. In RDD join, wherever possible we do reduceByKey before the join to avoid 
> a big shuffle of data. Do we need
> to do anything similar with dataframe joins, or the optimizer is too smart so 
> forget it.
>
>


dataframe join questions?

2017-03-28 Thread shyla deshpande
Following are my questions. Thank you.

1. When joining dataframes is it a good idea to repartition on the key
column that is used in the join or
the optimizer is too smart so forget it.

2. In RDD join, wherever possible we do reduceByKey before the join to
avoid a big shuffle of data. Do we need
to do anything similar with dataframe joins, or the optimizer is too
smart so forget it.


Re: Converting dataframe to dataset question

2017-03-23 Thread shyla deshpande
Ryan, you are right. That was issue. It works now. Thanks.

On Thu, Mar 23, 2017 at 8:26 PM, Ryan  wrote:

> you should import either spark.implicits or sqlContext.implicits, not
> both. Otherwise the compiler will be confused about two implicit
> transformations
>
> following code works for me, spark version 2.1.0
>
> object Test {
>   def main(args: Array[String]) {
> val spark = SparkSession
>   .builder
>   .master("local")
>   .appName(getClass.getSimpleName)
>   .getOrCreate()
> import spark.implicits._
> val df = Seq(TeamUser("t1", "u1", "r1")).toDF()
> df.printSchema()
> spark.close()
>   }
> }
>
> case class TeamUser(teamId: String, userId: String, role: String)
>
>
> On Fri, Mar 24, 2017 at 5:23 AM, shyla deshpande  > wrote:
>
>> I made the code even more simpler still getting the error
>>
>> error: value toDF is not a member of Seq[com.whil.batch.Teamuser]
>> [ERROR] val df = Seq(Teamuser("t1","u1","r1")).toDF()
>>
>> object Test {
>>   def main(args: Array[String]) {
>> val spark = SparkSession
>>   .builder
>>   .appName(getClass.getSimpleName)
>>   .getOrCreate()
>> import spark.implicits._
>> val sqlContext = spark.sqlContext
>> import sqlContext.implicits._
>> val df = Seq(Teamuser("t1","u1","r1")).toDF()
>> df.printSchema()
>>   }
>> }
>> case class Teamuser(teamid:String, userid:String, role:String)
>>
>>
>>
>>
>> On Thu, Mar 23, 2017 at 1:07 PM, Yong Zhang  wrote:
>>
>>> Not sure I understand this problem, why I cannot reproduce it?
>>>
>>>
>>> scala> spark.version
>>> res22: String = 2.1.0
>>>
>>> scala> case class Teamuser(teamid: String, userid: String, role: String)
>>> defined class Teamuser
>>>
>>> scala> val df = Seq(Teamuser("t1", "u1", "role1")).toDF
>>> df: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 1 
>>> more field]
>>>
>>> scala> df.show
>>> +--+--+-+
>>> |teamid|userid| role|
>>> +--+--+-+
>>> |t1|u1|role1|
>>> +--+--+-+
>>>
>>> scala> df.createOrReplaceTempView("teamuser")
>>>
>>> scala> val newDF = spark.sql("select teamid, userid, role from teamuser")
>>> newDF: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 
>>> 1 more field]
>>>
>>> scala> val userDS: Dataset[Teamuser] = newDF.as[Teamuser]
>>> userDS: org.apache.spark.sql.Dataset[Teamuser] = [teamid: string, userid: 
>>> string ... 1 more field]
>>>
>>> scala> userDS.show
>>> +--+--+-+
>>> |teamid|userid| role|
>>> +--+--+-+
>>> |t1|u1|role1|
>>> +--+--+-+
>>>
>>>
>>> scala> userDS.printSchema
>>> root
>>>  |-- teamid: string (nullable = true)
>>>  |-- userid: string (nullable = true)
>>>  |-- role: string (nullable = true)
>>>
>>>
>>> Am I missing anything?
>>>
>>>
>>> Yong
>>>
>>>
>>> --
>>> *From:* shyla deshpande 
>>> *Sent:* Thursday, March 23, 2017 3:49 PM
>>> *To:* user
>>> *Subject:* Re: Converting dataframe to dataset question
>>>
>>> I realized, my case class was inside the object. It should be defined
>>> outside the scope of the object. Thanks
>>>
>>> On Wed, Mar 22, 2017 at 6:07 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate your 
>>>> help. Thanks
>>>>
>>>> val spark = SparkSession
>>>>   .builder
>>>>   .config("spark.cassandra.connection.host", cassandrahost)
>>>>   .appName(getClass.getSimpleName)
>>>>   .getOrCreate()
>>>>
>>>> import spark.implicits._
>>>> val sqlContext = spark.sqlContext
>>>> import sqlContext.implicits._
>>>>
>>>> case class Teamuser(teamid:String, userid:String, role:String)
>>>> spark
>>>>   .read
>>>>   .format("org.apache.spark.sql.cassandra")
>>>>   .options(Map("keyspace" -> "test", "table" -> "teamuser"))
>>>>   .load
>>>>   .createOrReplaceTempView("teamuser")
>>>>
>>>> val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")
>>>>
>>>> userDF.show()
>>>>
>>>> val userDS:Dataset[Teamuser] = userDF.as[Teamuser]
>>>>
>>>>
>>>
>>
>


Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

2017-03-23 Thread shyla deshpande
Thanks a million Yong. Great help!!! It solved my problem.

On Thu, Mar 23, 2017 at 6:00 PM, Yong Zhang  wrote:

> Change:
>
> val arrayinput = input.getAs[Array[String]](0)
>
> to:
>
> val arrayinput = input.getAs[*Seq*[String]](0)
>
>
> Yong
>
>
> ------
> *From:* shyla deshpande 
> *Sent:* Thursday, March 23, 2017 8:18 PM
> *To:* user
> *Subject:* Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!
>
> This is my input data. The UDAF needs to aggregate the goals for a team
> and return a map that  gives the count for every goal in the team.
> I am getting the following error
>
> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
> cannot be cast to [Ljava.lang.String;
> at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)
>
> +--+--+
> |teamid|goals |
> +--+--+
> |t1|[Goal1, Goal2]|
> |t1|[Goal1, Goal3]|
> |t2|[Goal1, Goal2]|
> |t3|[Goal2, Goal3]|
> +--+--+
>
> root
>  |-- teamid: string (nullable = true)
>  |-- goals: array (nullable = true)
>  ||-- element: string (containsNull = true)
>
> /Calling the UDAF//
>
> object TestUDAF {
>   def main(args: Array[String]): Unit = {
>
> val spark = SparkSession
>   .builder
>   .getOrCreate()
>
> val sc: SparkContext = spark.sparkContext
> val sqlContext = spark.sqlContext
>
> import sqlContext.implicits._
>
> val data = Seq(
>   ("t1", Seq("Goal1", "Goal2")),
>   ("t1", Seq("Goal1", "Goal3")),
>   ("t2", Seq("Goal1", "Goal2")),
>   ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")
>
> data.show(truncate = false)
> data.printSchema()
>
> import spark.implicits._
>
> val sumgoals = new GoalAggregator
> val result = data.groupBy("teamid").agg(sumgoals(col("goals")))
>
> result.show(truncate = false)
>
>   }
> }
>
> ///UDAF/
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
>
> class GoalAggregator extends UserDefinedAggregateFunction{
>
>   override def inputSchema: org.apache.spark.sql.types.StructType =
>   StructType(StructField("value", ArrayType(StringType)) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>   StructField("combined", MapType(StringType,IntegerType)) :: Nil
>   )
>
>   override def dataType: DataType = MapType(StringType,IntegerType)
>
>   override def deterministic: Boolean = true
>
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer.update(0, Map[String, Integer]())
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> val mapbuf = buffer.getAs[Map[String, Int]](0)
> val arrayinput = input.getAs[Array[String]](0)
> val result = mapbuf ++ arrayinput.map(goal => {
>   val cnt  = mapbuf.get(goal).getOrElse(0) + 1
>   goal -> cnt
> })
> buffer.update(0, result)
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
> {
> val map1 = buffer1.getAs[Map[String, Int]](0)
> val map2 = buffer2.getAs[Map[String, Int]](0)
> val result = map1 ++ map2.map { case (k,v) =>
>   val cnt = map1.get(k).getOrElse(0) + 1
>   k -> cnt
> }
> buffer1.update(0, result)
>   }
>
>   override def evaluate(buffer: Row): Any = {
> buffer.getAs[Map[String, Int]](0)
>   }
> }
>
>
>
>


Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

2017-03-23 Thread shyla deshpande
This is my input data. The UDAF needs to aggregate the goals for a team and
return a map that  gives the count for every goal in the team.
I am getting the following error

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
cannot be cast to [Ljava.lang.String;
at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)

+--+--+
|teamid|goals |
+--+--+
|t1|[Goal1, Goal2]|
|t1|[Goal1, Goal3]|
|t2|[Goal1, Goal2]|
|t3|[Goal2, Goal3]|
+--+--+

root
 |-- teamid: string (nullable = true)
 |-- goals: array (nullable = true)
 ||-- element: string (containsNull = true)

/Calling the UDAF//

object TestUDAF {
  def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder
  .getOrCreate()

val sc: SparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._

val data = Seq(
  ("t1", Seq("Goal1", "Goal2")),
  ("t1", Seq("Goal1", "Goal3")),
  ("t2", Seq("Goal1", "Goal2")),
  ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")

data.show(truncate = false)
data.printSchema()

import spark.implicits._

val sumgoals = new GoalAggregator
val result = data.groupBy("teamid").agg(sumgoals(col("goals")))

result.show(truncate = false)

  }
}

///UDAF/

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

class GoalAggregator extends UserDefinedAggregateFunction{

  override def inputSchema: org.apache.spark.sql.types.StructType =
  StructType(StructField("value", ArrayType(StringType)) :: Nil)

  override def bufferSchema: StructType = StructType(
  StructField("combined", MapType(StringType,IntegerType)) :: Nil
  )

  override def dataType: DataType = MapType(StringType,IntegerType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, Map[String, Integer]())
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val mapbuf = buffer.getAs[Map[String, Int]](0)
val arrayinput = input.getAs[Array[String]](0)
val result = mapbuf ++ arrayinput.map(goal => {
  val cnt  = mapbuf.get(goal).getOrElse(0) + 1
  goal -> cnt
})
buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Int]](0)
val map2 = buffer2.getAs[Map[String, Int]](0)
val result = map1 ++ map2.map { case (k,v) =>
  val cnt = map1.get(k).getOrElse(0) + 1
  k -> cnt
}
buffer1.update(0, result)
  }

  override def evaluate(buffer: Row): Any = {
buffer.getAs[Map[String, Int]](0)
  }
}


Re: Converting dataframe to dataset question

2017-03-23 Thread shyla deshpande
I made the code even more simpler still getting the error

error: value toDF is not a member of Seq[com.whil.batch.Teamuser]
[ERROR] val df = Seq(Teamuser("t1","u1","r1")).toDF()

object Test {
  def main(args: Array[String]) {
val spark = SparkSession
  .builder
  .appName(getClass.getSimpleName)
  .getOrCreate()
import spark.implicits._
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val df = Seq(Teamuser("t1","u1","r1")).toDF()
df.printSchema()
  }
}
case class Teamuser(teamid:String, userid:String, role:String)




On Thu, Mar 23, 2017 at 1:07 PM, Yong Zhang  wrote:

> Not sure I understand this problem, why I cannot reproduce it?
>
>
> scala> spark.version
> res22: String = 2.1.0
>
> scala> case class Teamuser(teamid: String, userid: String, role: String)
> defined class Teamuser
>
> scala> val df = Seq(Teamuser("t1", "u1", "role1")).toDF
> df: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 1 
> more field]
>
> scala> df.show
> +--+--+-+
> |teamid|userid| role|
> +--+--+-+
> |t1|u1|role1|
> +--+--+-+
>
> scala> df.createOrReplaceTempView("teamuser")
>
> scala> val newDF = spark.sql("select teamid, userid, role from teamuser")
> newDF: org.apache.spark.sql.DataFrame = [teamid: string, userid: string ... 1 
> more field]
>
> scala> val userDS: Dataset[Teamuser] = newDF.as[Teamuser]
> userDS: org.apache.spark.sql.Dataset[Teamuser] = [teamid: string, userid: 
> string ... 1 more field]
>
> scala> userDS.show
> +--+--+-+
> |teamid|userid| role|
> +--+--+-+
> |t1|u1|role1|
> +--+--+-+
>
>
> scala> userDS.printSchema
> root
>  |-- teamid: string (nullable = true)
>  |-- userid: string (nullable = true)
>  |-- role: string (nullable = true)
>
>
> Am I missing anything?
>
>
> Yong
>
>
> --
> *From:* shyla deshpande 
> *Sent:* Thursday, March 23, 2017 3:49 PM
> *To:* user
> *Subject:* Re: Converting dataframe to dataset question
>
> I realized, my case class was inside the object. It should be defined
> outside the scope of the object. Thanks
>
> On Wed, Mar 22, 2017 at 6:07 PM, shyla deshpande  > wrote:
>
>> Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate your 
>> help. Thanks
>>
>> val spark = SparkSession
>>   .builder
>>   .config("spark.cassandra.connection.host", cassandrahost)
>>   .appName(getClass.getSimpleName)
>>   .getOrCreate()
>>
>> import spark.implicits._
>> val sqlContext = spark.sqlContext
>> import sqlContext.implicits._
>>
>> case class Teamuser(teamid:String, userid:String, role:String)
>> spark
>>   .read
>>   .format("org.apache.spark.sql.cassandra")
>>   .options(Map("keyspace" -> "test", "table" -> "teamuser"))
>>   .load
>>   .createOrReplaceTempView("teamuser")
>>
>> val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")
>>
>> userDF.show()
>>
>> val userDS:Dataset[Teamuser] = userDF.as[Teamuser]
>>
>>
>


Re: Converting dataframe to dataset question

2017-03-23 Thread shyla deshpande
now I get a run time error...

error: Unable to find encoder for type stored in a Dataset.  Primitive
types (Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.
[ERROR] val userDS:Dataset[Teamuser] = userDF.as[Teamuser]

On Thu, Mar 23, 2017 at 12:49 PM, shyla deshpande 
wrote:

> I realized, my case class was inside the object. It should be defined
> outside the scope of the object. Thanks
>
> On Wed, Mar 22, 2017 at 6:07 PM, shyla deshpande  > wrote:
>
>> Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate your 
>> help. Thanks
>>
>> val spark = SparkSession
>>   .builder
>>   .config("spark.cassandra.connection.host", cassandrahost)
>>   .appName(getClass.getSimpleName)
>>   .getOrCreate()
>>
>> import spark.implicits._
>> val sqlContext = spark.sqlContext
>> import sqlContext.implicits._
>>
>> case class Teamuser(teamid:String, userid:String, role:String)
>> spark
>>   .read
>>   .format("org.apache.spark.sql.cassandra")
>>   .options(Map("keyspace" -> "test", "table" -> "teamuser"))
>>   .load
>>   .createOrReplaceTempView("teamuser")
>>
>> val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")
>>
>> userDF.show()
>>
>> val userDS:Dataset[Teamuser] = userDF.as[Teamuser]
>>
>>
>


Re: Converting dataframe to dataset question

2017-03-23 Thread shyla deshpande
I realized, my case class was inside the object. It should be defined
outside the scope of the object. Thanks

On Wed, Mar 22, 2017 at 6:07 PM, shyla deshpande 
wrote:

> Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate your 
> help. Thanks
>
> val spark = SparkSession
>   .builder
>   .config("spark.cassandra.connection.host", cassandrahost)
>   .appName(getClass.getSimpleName)
>   .getOrCreate()
>
> import spark.implicits._
> val sqlContext = spark.sqlContext
> import sqlContext.implicits._
>
> case class Teamuser(teamid:String, userid:String, role:String)
> spark
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("keyspace" -> "test", "table" -> "teamuser"))
>   .load
>   .createOrReplaceTempView("teamuser")
>
> val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")
>
> userDF.show()
>
> val userDS:Dataset[Teamuser] = userDF.as[Teamuser]
>
>


Converting dataframe to dataset question

2017-03-22 Thread shyla deshpande
Why userDS is Dataset[Any], instead of Dataset[Teamuser]?  Appreciate
your help. Thanks

val spark = SparkSession
  .builder
  .config("spark.cassandra.connection.host", cassandrahost)
  .appName(getClass.getSimpleName)
  .getOrCreate()

import spark.implicits._
val sqlContext = spark.sqlContext
import sqlContext.implicits._

case class Teamuser(teamid:String, userid:String, role:String)
spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "test", "table" -> "teamuser"))
  .load
  .createOrReplaceTempView("teamuser")

val userDF = spark.sql("SELECT teamid, userid, role FROM teamuser")

userDF.show()

val userDS:Dataset[Teamuser] = userDF.as[Teamuser]


Spark Streaming questions, just 2

2017-03-21 Thread shyla deshpande
Hello all,
I have a couple of spark streaming questions. Thanks.

1.  In the case of stateful operations, the data is, by default,
persistent in memory.
In memory does it mean MEMORY_ONLY?   When is it removed from memory?

2.   I do not see any documentation for spark.cleaner.ttl. Is this no
longer necessary? (SPARK-7689)


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-21 Thread shyla deshpande
Thanks TD.

On Tue, Mar 14, 2017 at 4:37 PM, Tathagata Das  wrote:

> This setting allows multiple spark jobs generated through multiple
> foreachRDD to run concurrently, even if they are across batches. So output
> op2 from batch X, can run concurrently with op1 of batch X+1
> This is not safe because it breaks the checkpointing logic in subtle ways.
> Note that this was never documented in the spark online docs.
>
> On Tue, Mar 14, 2017 at 2:29 PM, shyla deshpande  > wrote:
>
>> Thanks TD for the response. Can you please provide more explanation. I am
>>  having multiple streams in the spark streaming application (Spark 2.0.2
>> using DStreams).  I know many people using this setting. So your
>> explanation will help a lot of people.
>>
>> Thanks
>>
>> On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das 
>> wrote:
>>
>>> That config I not safe. Please do not use it.
>>>
>>> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
>>> wrote:
>>>
>>>> I have a spark streaming application which processes 3 kafka streams
>>>> and has 5 output operations.
>>>>
>>>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>>>
>>>> 1. If the concurrentJobs setting is 4 does that mean 2 output
>>>> operations will be run sequentially?
>>>>
>>>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>>>> this situation?
>>>>
>>>> I appreciate your input. Thanks
>>>>
>>>
>>
>


Re: spark streaming with kafka source, how many concurrent jobs?

2017-03-14 Thread shyla deshpande
Thanks TD for the response. Can you please provide more explanation. I am
 having multiple streams in the spark streaming application (Spark 2.0.2
using DStreams).  I know many people using this setting. So your
explanation will help a lot of people.

Thanks

On Fri, Mar 10, 2017 at 6:24 PM, Tathagata Das  wrote:

> That config I not safe. Please do not use it.
>
> On Mar 10, 2017 10:03 AM, "shyla deshpande" 
> wrote:
>
>> I have a spark streaming application which processes 3 kafka streams and
>> has 5 output operations.
>>
>> Not sure what should be the setting for spark.streaming.concurrentJobs.
>>
>> 1. If the concurrentJobs setting is 4 does that mean 2 output operations
>> will be run sequentially?
>>
>> 2. If I had 6 cores what would be a ideal setting for concurrentJobs in
>> this situation?
>>
>> I appreciate your input. Thanks
>>
>


spark streaming with kafka source, how many concurrent jobs?

2017-03-10 Thread shyla deshpande
I have a spark streaming application which processes 3 kafka streams and
has 5 output operations.

Not sure what should be the setting for spark.streaming.concurrentJobs.

1. If the concurrentJobs setting is 4 does that mean 2 output operations
will be run sequentially?

2. If I had 6 cores what would be a ideal setting for concurrentJobs in
this situation?

I appreciate your input. Thanks


Re: error in kafka producer

2017-02-28 Thread shyla deshpande
*My code:*

  producer.send(message, new Callback {
override def onCompletion(metadata: RecordMetadata, exception:
Exception): Unit = {
  log.info(s"producer send callback metadata: $metadata")
  log.info(s"producer send callback exception: $exception")
}
  })

*Error Message :*

producer send callback metadata: null
producer send callback exception:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for positionevent-1 due to 30024 ms has passed since batch creation
plus linger time



On Tue, Feb 28, 2017 at 3:15 PM, Marco Mistroni  wrote:

> This exception coming from a Spark program?
> could you share few lines of code ?
>
> kr
>  marco
>
> On Tue, Feb 28, 2017 at 10:23 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> producer send callback exception: 
>> org.apache.kafka.common.errors.TimeoutException:
>> Expiring 1 record(s) for positionevent-6 due to 30003 ms has passed since
>> batch creation plus linger time
>>
>
>


error in kafka producer

2017-02-28 Thread shyla deshpande
producer send callback exception:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
positionevent-6 due to 30003 ms has passed since batch creation plus linger
time


Re: In Spark streaming, will saved kafka offsets become invalid if I change the number of partitions in a kafka topic?

2017-02-26 Thread shyla deshpande
Please help!

On Sat, Feb 25, 2017 at 11:10 PM, shyla deshpande 
wrote:

> I am commiting offsets to Kafka after my output has been stored, using the
> commitAsync API.
>
> My question is if I increase/decrease the number of kafka partitions, will
> the saved offsets will become invalid.
>
> Thanks
>


In Spark streaming, will saved kafka offsets become invalid if I change the number of partitions in a kafka topic?

2017-02-25 Thread shyla deshpande
I am commiting offsets to Kafka after my output has been stored, using the
commitAsync API.

My question is if I increase/decrease the number of kafka partitions, will
the saved offsets will become invalid.

Thanks


Spark streaming on AWS EC2 error . Please help

2017-02-20 Thread shyla deshpande
I am running Spark streaming on AWS EC2 in standalone mode.

When I do a spark-submit, I get the following message. I am subscribing to
3 kafka topics and it is reading and processing just 2 topics. Works fine
in local mode.
Appreciate your help. Thanks

Exception in thread "pool-26-thread-132" java.lang.NullPointerException
at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:225)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread shyla deshpande
Thanks TD and Marco for the feedback.

The directory referenced by SPARK_LOCAL_DIRS did not exist. After creating
that directory, it worked.

This was the first time I was trying to run spark on standalone cluster, so
I missed it.

Thanks

On Fri, Feb 17, 2017 at 12:35 PM, Tathagata Das  wrote:

> Seems like an issue with the HDFS you are using for checkpointing. Its not
> able to write data properly.
>
> On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande  > wrote:
>
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> File 
>> /checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
>> could only be replicated to 0 nodes instead of minReplication (=1).  There
>> are 0 datanode(s) running and no node(s) are excluded in this operation.
>>
>> This is the error I get when I run my spark streaming app on 2 node EC2
>> cluster, with 1 master and 1 worker.
>>
>> Works fine in local mode. Please help.
>>
>> Thanks
>>
>
>


Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-16 Thread shyla deshpande
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 0 datanode(s) running and no node(s) are excluded in this operation.

This is the error I get when I run my spark streaming app on 2 node EC2
cluster, with 1 master and 1 worker.

Works fine in local mode. Please help.

Thanks


Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread shyla deshpande
and my cached RDD is not small. If it was maybe I could materialize and
broadcast.

Thanks

On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande 
wrote:

> I have a situation similar to the following and I get SPARK-13758 
> <https://issues.apache.org/jira/browse/SPARK-13758>.
>
>
> I understand why I get this error, but I want to know what should be the 
> approach in dealing with these situations.
>
>
> Thanks
>
>
> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> > words.foreachRDD((rdd: RDD[String]) => {
> >   val res = rdd.map(word => (word, word.length)).collect()
> >   println("words: " + res.mkString(", "))
> >   cached = cached.union(rdd)
> >   cached.checkpoint()
> >   println("cached words: " + cached.collect.mkString(", "))
> > })
>
>


Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread shyla deshpande
I have a situation similar to the following and I get SPARK-13758
.


I understand why I get this error, but I want to know what should be
the approach in dealing with these situations.


Thanks


> var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> words.foreachRDD((rdd: RDD[String]) => {
>   val res = rdd.map(word => (word, word.length)).collect()
>   println("words: " + res.mkString(", "))
>   cached = cached.union(rdd)
>   cached.checkpoint()
>   println("cached words: " + cached.collect.mkString(", "))
> })


Re: saveToCassandra issue. Please help

2017-02-03 Thread shyla deshpande
Thanks Fernando.  But I need to have only 1 row for a given user, date with
very low latency. So none of your options work for me.



On Fri, Feb 3, 2017 at 10:34 AM, Fernando Avalos  wrote:

> Hi Shyla,
>
> Maybe I am wrong, but I can see two options here.
>
> 1.- Do some grouping before insert to Cassandra.
> 2.- Insert to cassandra all the entries and add some logic to your
> request to get the most recent.
>
> Regards,
>
> 2017-02-03 10:26 GMT-08:00 shyla deshpande :
> > Hi All,
> >
> > I wanted to add more info ..
> > The first column is the user and the third is the period. and my key is
> > (userid, date) For a given user and date combination I want to see only 1
> > row. My problem is that PT0H10M0S is overwritten by PT0H9M30S, even
> though
> > the order of the rows in the RDD is PT0H9M30S and then PT0H10M0S.
> >
> > Appreciate your input. Thanks
> >
> > On Fri, Feb 3, 2017 at 12:45 AM, shyla deshpande <
> deshpandesh...@gmail.com>
> > wrote:
> >>
> >> Hello All,
> >>
> >> This is the content of my RDD which I am saving to Cassandra table.
> >>
> >> But looks like the 2nd row is written first and then the first row
> >> overwrites it. So I end up with bad output.
> >>
> >> (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY)
> >> (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)
> >>
> >> Is there a way to force the order of the rows written to Cassandra.
> >>
> >> Please help.
> >>
> >> Thanks
> >
> >
>


Re: saveToCassandra issue. Please help

2017-02-03 Thread shyla deshpande
Hi All,

I wanted to add more info ..
The first column is the user and the third is the period. and my key is
(userid, date) For a given user and date combination I want to see only 1
row. My problem is that PT0H10M0S is overwritten by PT0H9M30S, even though
the order of the rows in the RDD is PT0H9M30S and then PT0H10M0S.

Appreciate your input. Thanks

On Fri, Feb 3, 2017 at 12:45 AM, shyla deshpande 
wrote:

> Hello All,
>
> This is the content of my RDD which I am saving to Cassandra table.
>
> But looks like the 2nd row is written first and then the first row overwrites 
> it. So I end up with bad output.
>
> (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY)
> (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)
>
> Is there a way to force the order of the rows written to Cassandra.
>
> Please help.
>
> Thanks
>
>


saveToCassandra issue. Please help

2017-02-03 Thread shyla deshpande
Hello All,

This is the content of my RDD which I am saving to Cassandra table.

But looks like the 2nd row is written first and then the first row
overwrites it. So I end up with bad output.

(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY)
(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)

Is there a way to force the order of the rows written to Cassandra.

Please help.

Thanks


Re: mapWithState question

2017-01-30 Thread shyla deshpande
Thanks. Appreciate your input.

On Mon, Jan 30, 2017 at 1:41 PM, Tathagata Das 
wrote:

> If you care about the semantics of those writes to Kafka, then you should
> be aware of two things.
> 1. There are no transactional writes to Kafka.
> 2. So, when tasks get reexecuted due to any failure, your mapping function
> will also be reexecuted, and the writes to kafka can happen multiple times.
> So you may only get at least once guarantee about those Kafka writes
>
>
> On Mon, Jan 30, 2017 at 10:02 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hello,
>>
>> TD, your suggestion works great. Thanks
>>
>> I have 1 more question, I need to write to kafka from within the
>> mapWithState function. Just wanted to check if this a bad pattern in any
>> way.
>>
>> Thank you.
>>
>>
>>
>>
>>
>> On Sat, Jan 28, 2017 at 9:14 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Thats a great idea. I will try that. Thanks.
>>>
>>> On Sat, Jan 28, 2017 at 2:35 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> 1 state object for each user.
>>>> union both streams into a single DStream, and apply mapWithState on it
>>>> to update the user state.
>>>>
>>>> On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> Can multiple DStreams manipulate a state? I have a stream that gives
>>>>> me total minutes the user spent on a course material. I have another
>>>>> stream that gives me chapters completed and lessons completed by the 
>>>>> user. I
>>>>> want to keep track for each user total_minutes, chapters_completed and
>>>>> lessons_completed. I am not sure if I should have 1 state or 2
>>>>> states. Can I lookup the state for a given key just like a map
>>>>> outside the mapfunction?
>>>>>
>>>>> Appreciate your input. Thanks
>>>>>
>>>>
>>>>
>>>
>>
>


Re: mapWithState question

2017-01-30 Thread shyla deshpande
Hello,

TD, your suggestion works great. Thanks

I have 1 more question, I need to write to kafka from within the
mapWithState function. Just wanted to check if this a bad pattern in any
way.

Thank you.





On Sat, Jan 28, 2017 at 9:14 AM, shyla deshpande 
wrote:

> Thats a great idea. I will try that. Thanks.
>
> On Sat, Jan 28, 2017 at 2:35 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> 1 state object for each user.
>> union both streams into a single DStream, and apply mapWithState on it to
>> update the user state.
>>
>> On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Can multiple DStreams manipulate a state? I have a stream that gives me
>>> total minutes the user spent on a course material. I have another
>>> stream that gives me chapters completed and lessons completed by the user. I
>>> want to keep track for each user total_minutes, chapters_completed and
>>> lessons_completed. I am not sure if I should have 1 state or 2 states. Can
>>> I lookup the state for a given key just like a map outside the mapfunction?
>>>
>>> Appreciate your input. Thanks
>>>
>>
>>
>


Re: mapWithState question

2017-01-28 Thread shyla deshpande
Thats a great idea. I will try that. Thanks.

On Sat, Jan 28, 2017 at 2:35 AM, Tathagata Das 
wrote:

> 1 state object for each user.
> union both streams into a single DStream, and apply mapWithState on it to
> update the user state.
>
> On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Can multiple DStreams manipulate a state? I have a stream that gives me
>> total minutes the user spent on a course material. I have another stream
>> that gives me chapters completed and lessons completed by the user. I
>> want to keep track for each user total_minutes, chapters_completed and
>> lessons_completed. I am not sure if I should have 1 state or 2 states. Can
>> I lookup the state for a given key just like a map outside the mapfunction?
>>
>> Appreciate your input. Thanks
>>
>
>


mapWithState question

2017-01-28 Thread shyla deshpande
Can multiple DStreams manipulate a state? I have a stream that gives me
total minutes the user spent on a course material. I have another stream
that gives me chapters completed and lessons completed by the user. I want
to keep track for each user total_minutes, chapters_completed and
lessons_completed. I am not sure if I should have 1 state or 2 states. Can
I lookup the state for a given key just like a map outside the mapfunction?

Appreciate your input. Thanks


Re: where is mapWithState executed?

2017-01-25 Thread shyla deshpande
After more reading, I know the state is distributed across the cluster. But If
I need to lookup a map in the updatefunction, I need to broadcast it.

Just want to make sure I am on the right path.

Appreciate your help. Thanks

On Wed, Jan 25, 2017 at 2:33 PM, shyla deshpande 
wrote:

> Is it executed on the driver or executor.  If I need to lookup a map in
> the updatefunction, I need to broadcast it,  if mapWithState executed runs
> on executor.
>
> Thanks
>


where is mapWithState executed?

2017-01-25 Thread shyla deshpande
Is it executed on the driver or executor.  If I need to lookup a map in the
updatefunction, I need to broadcast it,  if mapWithState executed runs on
executor.

Thanks


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread shyla deshpande
Processing of the same data more than once can happen only when the app
recovers after failure or during upgrade. So how do I apply your 2nd
solution only for 1-2 hrs after restart.

On Wed, Jan 25, 2017 at 12:51 PM, shyla deshpande 
wrote:

> Thanks Burak. I do want accuracy, that is why I want to make it
> idempotent.
> I will try out your 2nd solution.
>
> On Wed, Jan 25, 2017 at 12:27 PM, Burak Yavuz  wrote:
>
>> Yes you may. Depends on if you want exact values or if you're okay with
>> approximations. With Big Data, generally you would be okay with
>> approximations. Try both out, see what scales/works with your dataset.
>> Maybe you may handle the second implementation.
>>
>> On Wed, Jan 25, 2017 at 12:23 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Thanks Burak. But with BloomFilter, won't I be getting a false poisitve?
>>>
>>> On Wed, Jan 25, 2017 at 11:28 AM, Burak Yavuz  wrote:
>>>
>>>> I noticed that 1 wouldn't be a problem, because you'll save the
>>>> BloomFilter in the state.
>>>>
>>>> For 2, you would keep a Map of UUID's to the timestamp of when you saw
>>>> them. If the UUID exists in the map, then you wouldn't increase the count.
>>>> If the timestamp of a UUID expires, you would remove it from the map. The
>>>> reason we remove from the map is to keep a bounded amount of space. It'll
>>>> probably take a lot more space than the BloomFilter though depending on
>>>> your data volume.
>>>>
>>>> On Wed, Jan 25, 2017 at 11:24 AM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> In the previous email you gave me 2 solutions
>>>>> 1. Bloom filter --> problem in repopulating the bloom filter on
>>>>> restarts
>>>>> 2. keeping the state of the unique ids
>>>>>
>>>>> Please elaborate on 2.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz 
>>>>> wrote:
>>>>>
>>>>>> I don't have any sample code, but on a high level:
>>>>>>
>>>>>> My state would be: (Long, BloomFilter[UUID])
>>>>>> In the update function, my value will be the UUID of the record,
>>>>>> since the word itself is the key.
>>>>>> I'll ask my BloomFilter if I've seen this UUID before. If not
>>>>>> increase count, also add to Filter.
>>>>>>
>>>>>> Does that make sense?
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Burak,
>>>>>>> Thanks for the response. Can you please elaborate on your idea of
>>>>>>> storing the state of the unique ids.
>>>>>>> Do you have any sample code or links I can refer to.
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Off the top of my head... (Each may have it's own issues)
>>>>>>>>
>>>>>>>> If upstream you add a uniqueId to all your records, then you may
>>>>>>>> use a BloomFilter to approximate if you've seen a row before.
>>>>>>>> The problem I can see with that approach is how to repopulate the
>>>>>>>> bloom filter on restarts.
>>>>>>>>
>>>>>>>> If you are certain that you're not going to reprocess some data
>>>>>>>> after a certain time, i.e. there is no way I'm going to get the same 
>>>>>>>> data
>>>>>>>> in 2 hours, it may only happen in the last 2 hours, then you may also 
>>>>>>>> keep
>>>>>>>> the state of uniqueId's as well, and then age them out after a certain 
>>>>>>>> time.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Burak
>>>>>>>>
>>>>>>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Please share your thoughts.
>>>>>>>>>
>>>>>>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> My streaming application stores lot of aggregations using
>>>>>>>>>>> mapWithState.
>>>>>>>>>>>
>>>>>>>>>>> I want to know what are all the possible ways I can make it
>>>>>>>>>>> idempotent.
>>>>>>>>>>>
>>>>>>>>>>> Please share your views.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> In a Wordcount application which  stores the count of all the
>>>>>>>>>>>> words input so far using mapWithState.  How do I make sure my 
>>>>>>>>>>>> counts are
>>>>>>>>>>>> not messed up if I happen to read a line more than once?
>>>>>>>>>>>>
>>>>>>>>>>>> Appreciate your response.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread shyla deshpande
Thanks Burak. I do want accuracy, that is why I want to make it idempotent.
I will try out your 2nd solution.

On Wed, Jan 25, 2017 at 12:27 PM, Burak Yavuz  wrote:

> Yes you may. Depends on if you want exact values or if you're okay with
> approximations. With Big Data, generally you would be okay with
> approximations. Try both out, see what scales/works with your dataset.
> Maybe you may handle the second implementation.
>
> On Wed, Jan 25, 2017 at 12:23 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Thanks Burak. But with BloomFilter, won't I be getting a false poisitve?
>>
>> On Wed, Jan 25, 2017 at 11:28 AM, Burak Yavuz  wrote:
>>
>>> I noticed that 1 wouldn't be a problem, because you'll save the
>>> BloomFilter in the state.
>>>
>>> For 2, you would keep a Map of UUID's to the timestamp of when you saw
>>> them. If the UUID exists in the map, then you wouldn't increase the count.
>>> If the timestamp of a UUID expires, you would remove it from the map. The
>>> reason we remove from the map is to keep a bounded amount of space. It'll
>>> probably take a lot more space than the BloomFilter though depending on
>>> your data volume.
>>>
>>> On Wed, Jan 25, 2017 at 11:24 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> In the previous email you gave me 2 solutions
>>>> 1. Bloom filter --> problem in repopulating the bloom filter on
>>>> restarts
>>>> 2. keeping the state of the unique ids
>>>>
>>>> Please elaborate on 2.
>>>>
>>>>
>>>>
>>>> On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz  wrote:
>>>>
>>>>> I don't have any sample code, but on a high level:
>>>>>
>>>>> My state would be: (Long, BloomFilter[UUID])
>>>>> In the update function, my value will be the UUID of the record, since
>>>>> the word itself is the key.
>>>>> I'll ask my BloomFilter if I've seen this UUID before. If not increase
>>>>> count, also add to Filter.
>>>>>
>>>>> Does that make sense?
>>>>>
>>>>>
>>>>> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> Hi Burak,
>>>>>> Thanks for the response. Can you please elaborate on your idea of
>>>>>> storing the state of the unique ids.
>>>>>> Do you have any sample code or links I can refer to.
>>>>>> Thanks
>>>>>>
>>>>>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz 
>>>>>> wrote:
>>>>>>
>>>>>>> Off the top of my head... (Each may have it's own issues)
>>>>>>>
>>>>>>> If upstream you add a uniqueId to all your records, then you may use
>>>>>>> a BloomFilter to approximate if you've seen a row before.
>>>>>>> The problem I can see with that approach is how to repopulate the
>>>>>>> bloom filter on restarts.
>>>>>>>
>>>>>>> If you are certain that you're not going to reprocess some data
>>>>>>> after a certain time, i.e. there is no way I'm going to get the same 
>>>>>>> data
>>>>>>> in 2 hours, it may only happen in the last 2 hours, then you may also 
>>>>>>> keep
>>>>>>> the state of uniqueId's as well, and then age them out after a certain 
>>>>>>> time.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Burak
>>>>>>>
>>>>>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Please share your thoughts.
>>>>>>>>
>>>>>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> My streaming application stores lot of aggregations using
>>>>>>>>>> mapWithState.
>>>>>>>>>>
>>>>>>>>>> I want to know what are all the possible ways I can make it
>>>>>>>>>> idempotent.
>>>>>>>>>>
>>>>>>>>>> Please share your views.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In a Wordcount application which  stores the count of all the
>>>>>>>>>>> words input so far using mapWithState.  How do I make sure my 
>>>>>>>>>>> counts are
>>>>>>>>>>> not messed up if I happen to read a line more than once?
>>>>>>>>>>>
>>>>>>>>>>> Appreciate your response.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread shyla deshpande
Thanks Burak. But with BloomFilter, won't I be getting a false poisitve?

On Wed, Jan 25, 2017 at 11:28 AM, Burak Yavuz  wrote:

> I noticed that 1 wouldn't be a problem, because you'll save the
> BloomFilter in the state.
>
> For 2, you would keep a Map of UUID's to the timestamp of when you saw
> them. If the UUID exists in the map, then you wouldn't increase the count.
> If the timestamp of a UUID expires, you would remove it from the map. The
> reason we remove from the map is to keep a bounded amount of space. It'll
> probably take a lot more space than the BloomFilter though depending on
> your data volume.
>
> On Wed, Jan 25, 2017 at 11:24 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> In the previous email you gave me 2 solutions
>> 1. Bloom filter --> problem in repopulating the bloom filter on restarts
>> 2. keeping the state of the unique ids
>>
>> Please elaborate on 2.
>>
>>
>>
>> On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz  wrote:
>>
>>> I don't have any sample code, but on a high level:
>>>
>>> My state would be: (Long, BloomFilter[UUID])
>>> In the update function, my value will be the UUID of the record, since
>>> the word itself is the key.
>>> I'll ask my BloomFilter if I've seen this UUID before. If not increase
>>> count, also add to Filter.
>>>
>>> Does that make sense?
>>>
>>>
>>> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Hi Burak,
>>>> Thanks for the response. Can you please elaborate on your idea of
>>>> storing the state of the unique ids.
>>>> Do you have any sample code or links I can refer to.
>>>> Thanks
>>>>
>>>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz  wrote:
>>>>
>>>>> Off the top of my head... (Each may have it's own issues)
>>>>>
>>>>> If upstream you add a uniqueId to all your records, then you may use a
>>>>> BloomFilter to approximate if you've seen a row before.
>>>>> The problem I can see with that approach is how to repopulate the
>>>>> bloom filter on restarts.
>>>>>
>>>>> If you are certain that you're not going to reprocess some data after
>>>>> a certain time, i.e. there is no way I'm going to get the same data in 2
>>>>> hours, it may only happen in the last 2 hours, then you may also keep the
>>>>> state of uniqueId's as well, and then age them out after a certain time.
>>>>>
>>>>>
>>>>> Best,
>>>>> Burak
>>>>>
>>>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> Please share your thoughts.
>>>>>>
>>>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>
>>>>>>>> My streaming application stores lot of aggregations using
>>>>>>>> mapWithState.
>>>>>>>>
>>>>>>>> I want to know what are all the possible ways I can make it
>>>>>>>> idempotent.
>>>>>>>>
>>>>>>>> Please share your views.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> In a Wordcount application which  stores the count of all the
>>>>>>>>> words input so far using mapWithState.  How do I make sure my counts 
>>>>>>>>> are
>>>>>>>>> not messed up if I happen to read a line more than once?
>>>>>>>>>
>>>>>>>>> Appreciate your response.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread shyla deshpande
In the previous email you gave me 2 solutions
1. Bloom filter --> problem in repopulating the bloom filter on restarts
2. keeping the state of the unique ids

Please elaborate on 2.



On Wed, Jan 25, 2017 at 10:53 AM, Burak Yavuz  wrote:

> I don't have any sample code, but on a high level:
>
> My state would be: (Long, BloomFilter[UUID])
> In the update function, my value will be the UUID of the record, since the
> word itself is the key.
> I'll ask my BloomFilter if I've seen this UUID before. If not increase
> count, also add to Filter.
>
> Does that make sense?
>
>
> On Wed, Jan 25, 2017 at 9:28 AM, shyla deshpande  > wrote:
>
>> Hi Burak,
>> Thanks for the response. Can you please elaborate on your idea of storing
>> the state of the unique ids.
>> Do you have any sample code or links I can refer to.
>> Thanks
>>
>> On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz  wrote:
>>
>>> Off the top of my head... (Each may have it's own issues)
>>>
>>> If upstream you add a uniqueId to all your records, then you may use a
>>> BloomFilter to approximate if you've seen a row before.
>>> The problem I can see with that approach is how to repopulate the bloom
>>> filter on restarts.
>>>
>>> If you are certain that you're not going to reprocess some data after a
>>> certain time, i.e. there is no way I'm going to get the same data in 2
>>> hours, it may only happen in the last 2 hours, then you may also keep the
>>> state of uniqueId's as well, and then age them out after a certain time.
>>>
>>>
>>> Best,
>>> Burak
>>>
>>> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Please share your thoughts.
>>>>
>>>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> My streaming application stores lot of aggregations using
>>>>>> mapWithState.
>>>>>>
>>>>>> I want to know what are all the possible ways I can make it
>>>>>> idempotent.
>>>>>>
>>>>>> Please share your views.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> In a Wordcount application which  stores the count of all the words
>>>>>>> input so far using mapWithState.  How do I make sure my counts are not
>>>>>>> messed up if I happen to read a line more than once?
>>>>>>>
>>>>>>> Appreciate your response.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-25 Thread shyla deshpande
Hi Burak,
Thanks for the response. Can you please elaborate on your idea of storing
the state of the unique ids.
Do you have any sample code or links I can refer to.
Thanks

On Wed, Jan 25, 2017 at 9:13 AM, Burak Yavuz  wrote:

> Off the top of my head... (Each may have it's own issues)
>
> If upstream you add a uniqueId to all your records, then you may use a
> BloomFilter to approximate if you've seen a row before.
> The problem I can see with that approach is how to repopulate the bloom
> filter on restarts.
>
> If you are certain that you're not going to reprocess some data after a
> certain time, i.e. there is no way I'm going to get the same data in 2
> hours, it may only happen in the last 2 hours, then you may also keep the
> state of uniqueId's as well, and then age them out after a certain time.
>
>
> Best,
> Burak
>
> On Tue, Jan 24, 2017 at 9:53 PM, shyla deshpande  > wrote:
>
>> Please share your thoughts.
>>
>> On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>>
>>>
>>> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> My streaming application stores lot of aggregations using mapWithState.
>>>>
>>>> I want to know what are all the possible ways I can make it idempotent.
>>>>
>>>> Please share your views.
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>>>> deshpandesh...@gmail.com> wrote:
>>>>
>>>>> In a Wordcount application which  stores the count of all the words
>>>>> input so far using mapWithState.  How do I make sure my counts are not
>>>>> messed up if I happen to read a line more than once?
>>>>>
>>>>> Appreciate your response.
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>
>>
>


Recovering from checkpoint question

2017-01-24 Thread shyla deshpande
If I just want to resubmit the spark streaming app with different
configuration options like different --executor-memory or
--total-executor-cores,
will the checkpoint directory help me continue from where I left off.

Appreciate your response.

Thanks


Re: How to make the state in a streaming application idempotent?

2017-01-24 Thread shyla deshpande
Please share your thoughts.

On Tue, Jan 24, 2017 at 4:01 PM, shyla deshpande 
wrote:

>
>
> On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande  > wrote:
>
>> My streaming application stores lot of aggregations using mapWithState.
>>
>> I want to know what are all the possible ways I can make it idempotent.
>>
>> Please share your views.
>>
>> Thanks
>>
>> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> In a Wordcount application which  stores the count of all the words
>>> input so far using mapWithState.  How do I make sure my counts are not
>>> messed up if I happen to read a line more than once?
>>>
>>> Appreciate your response.
>>>
>>> Thanks
>>>
>>
>>
>


Re: How to make the state in a streaming application idempotent?

2017-01-24 Thread shyla deshpande
On Tue, Jan 24, 2017 at 9:44 AM, shyla deshpande 
wrote:

> My streaming application stores lot of aggregations using mapWithState.
>
> I want to know what are all the possible ways I can make it idempotent.
>
> Please share your views.
>
> Thanks
>
> On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande  > wrote:
>
>> In a Wordcount application which  stores the count of all the words input
>> so far using mapWithState.  How do I make sure my counts are not messed up
>> if I happen to read a line more than once?
>>
>> Appreciate your response.
>>
>> Thanks
>>
>
>


Re: How to make the state in a streaming application idempotent?

2017-01-24 Thread shyla deshpande
My streaming application stores lot of aggregations using mapWithState.

I want to know what are all the possible ways I can make it idempotent.

Please share your views.

Thanks

On Mon, Jan 23, 2017 at 5:41 PM, shyla deshpande 
wrote:

> In a Wordcount application which  stores the count of all the words input
> so far using mapWithState.  How do I make sure my counts are not messed up
> if I happen to read a line more than once?
>
> Appreciate your response.
>
> Thanks
>


How to make the state in a streaming application idempotent?

2017-01-23 Thread shyla deshpande
In a Wordcount application which  stores the count of all the words input
so far using mapWithState.  How do I make sure my counts are not messed up
if I happen to read a line more than once?

Appreciate your response.

Thanks


Re: Using mapWithState without a checkpoint

2017-01-23 Thread shyla deshpande
Hello spark users,

I do have the same question as Daniel.

I would like to save the state in Cassandra and on failure recover using
the initialState. If some one has already tried this, please share your
experience and sample code.

Thanks.

On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> Is it possible to use mapWithState without checkpointing at all ?
> I'd rather have the whole application fail, restart and reload an
> initialState RDD than pay for checkpointing every 10 batches.
>
> Thank you,
> Daniel
>


Re: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-19 Thread shyla deshpande
There was a issue connecting to Kafka, once that was fixed the spark app
works.  Hope this helps someone.
Thanks

On Mon, Jan 16, 2017 at 7:58 AM, shyla deshpande 
wrote:

> Hello,
> I checked the log file on the worker node and don't see any error there.
> This is the first time I am asked to run on such a small cluster.  I feel
> its the resources issue, but it will be great help is somebody can confirm
> this or share your experience. Thanks
>
> On Sat, Jan 14, 2017 at 4:01 PM, shyla deshpande  > wrote:
>
>> Hello,
>>
>> I want to add that,
>> I don't even see the streaming tab in the application UI on port 4040
>> when I run it on the cluster.
>> The cluster on EC2  has 1 master node and 1 worker node.
>> The cores used on the worker node is 2 of 2 and memory used is 6GB of
>> 6.3GB.
>>
>> Can I run a spark streaming job with just 2 cores?
>>
>> Appreciate your time and help.
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> My spark streaming app that reads kafka topics and prints the DStream
>>> works fine on my laptop, but on AWS cluster it produces no output and no
>>> errors.
>>>
>>> Please help me debug.
>>>
>>> I am using Spark 2.0.2 and kafka-0-10
>>>
>>> Thanks
>>>
>>> The following is the output of the spark streaming app...
>>>
>>>
>>> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not 
>>> exist
>>> Creating new context
>>> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
>>> configuration may not take effect.
>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false 
>>> for executor
>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none for 
>>> executor
>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
>>> spark-executor-whilDataStream
>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 
>>> see KAFKA-3135
>>>
>>>
>>>
>>
>


Re: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-16 Thread shyla deshpande
Hello,
I checked the log file on the worker node and don't see any error there.
This is the first time I am asked to run on such a small cluster.  I feel
its the resources issue, but it will be great help is somebody can confirm
this or share your experience. Thanks

On Sat, Jan 14, 2017 at 4:01 PM, shyla deshpande 
wrote:

> Hello,
>
> I want to add that,
> I don't even see the streaming tab in the application UI on port 4040 when
> I run it on the cluster.
> The cluster on EC2  has 1 master node and 1 worker node.
> The cores used on the worker node is 2 of 2 and memory used is 6GB of
> 6.3GB.
>
> Can I run a spark streaming job with just 2 cores?
>
> Appreciate your time and help.
>
> Thanks
>
>
>
>
>
> On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hello,
>>
>> My spark streaming app that reads kafka topics and prints the DStream
>> works fine on my laptop, but on AWS cluster it produces no output and no
>> errors.
>>
>> Please help me debug.
>>
>> I am using Spark 2.0.2 and kafka-0-10
>>
>> Thanks
>>
>> The following is the output of the spark streaming app...
>>
>>
>> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not exist
>> Creating new context
>> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
>> configuration may not take effect.
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false 
>> for executor
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none for 
>> executor
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
>> spark-executor-whilDataStream
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 
>> see KAFKA-3135
>>
>>
>>
>


Re: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-14 Thread shyla deshpande
Hello,

I want to add that,
I don't even see the streaming tab in the application UI on port 4040 when
I run it on the cluster.
The cluster on EC2  has 1 master node and 1 worker node.
The cores used on the worker node is 2 of 2 and memory used is 6GB of 6.3GB.

Can I run a spark streaming job with just 2 cores?

Appreciate your time and help.

Thanks





On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande 
wrote:

> Hello,
>
> My spark streaming app that reads kafka topics and prints the DStream
> works fine on my laptop, but on AWS cluster it produces no output and no
> errors.
>
> Please help me debug.
>
> I am using Spark 2.0.2 and kafka-0-10
>
> Thanks
>
> The following is the output of the spark streaming app...
>
>
> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not exist
> Creating new context
> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
> configuration may not take effect.
> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false for 
> executor
> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none for 
> executor
> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
> spark-executor-whilDataStream
> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 
> see KAFKA-3135
>
>
>


Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-13 Thread shyla deshpande
Hello,

My spark streaming app that reads kafka topics and prints the DStream works
fine on my laptop, but on AWS cluster it produces no output and no errors.

Please help me debug.

I am using Spark 2.0.2 and kafka-0-10

Thanks

The following is the output of the spark streaming app...


17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not exist
Creating new context
17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext,
some configuration may not take effect.
17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to
false for executor
17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to
none for executor
17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to
spark-executor-whilDataStream
17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to
65536 see KAFKA-3135


Re: Docker image for Spark streaming app

2017-01-08 Thread shyla deshpande
Just want to clarify.

I want to run a single node spark cluster in a docker container. I want to
use Spark 2.0+ and scala.
Looking for a docker image from docker hub. I want this setup for
development and testing purpose.

Please let me know if you know of any docker image that can help me get
started.

Thanks

On Sun, Jan 8, 2017 at 1:52 PM, shyla deshpande 
wrote:

> Thanks really appreciate.
>
> On Sun, Jan 8, 2017 at 1:02 PM, vvshvv  wrote:
>
>> Hi,
>>
>> I am running spark streaming job using spark jobserver via this image:
>>
>> https://hub.docker.com/r/depend/spark-jobserver/.
>>
>> It works well in standalone (using mesos job does not make progress).
>> Spark jobserver that supports Spark 2.0 has new API that is only suitable
>> for non-streaming jobs as for streaming you have to specify
>> StreamingContextFactory that still uses old API, so you can just use old
>> API for now.
>>
>>
>>
>> Sent from my Mi phone
>> On shyla deshpande , Jan 8, 2017 11:51 PM
>> wrote:
>>
>> I looking for a docker image that I can use from docker hub for running a
>> spark streaming app with scala and spark 2.0 +.
>>
>> I am new to docker and unable to find one image from docker hub that
>> suits my needs. Please let me know if anyone is using a docker for spark
>> streaming app and share your experience.
>>
>> Thanks
>>
>>
>


Docker image for Spark streaming app

2017-01-08 Thread shyla deshpande
I looking for a docker image that I can use from docker hub for running a
spark streaming app with scala and spark 2.0 +.

I am new to docker and unable to find one image from docker hub that suits
my needs. Please let me know if anyone is using a docker for spark
streaming app and share your experience.

Thanks


How do I read data in dockerized kafka from a spark streaming application

2017-01-06 Thread shyla deshpande
My kafka is in a docker container.

How do I read this Kafka data in my Spark streaming app.

Also, I need to write data from Spark Streaming to Cassandra database which
is in docker container.

I appreciate any help.

Thanks.


Re: How many Spark streaming applications can be run at a time on a Spark cluster?

2016-12-24 Thread shyla deshpande
Hi All,

Thank you for the response.

As per

https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#07%20Spark%20Streaming/15%20Streaming%20FAQs.html

There can be only one streaming context in a cluster which implies only one
streaming job.

So, I am still confused. Anyone having more than 1 spark streaming app in a
cluster running at the same time, please share your experience.

Thanks

On Wed, Dec 14, 2016 at 6:54 PM, Akhilesh Pathodia <
pathodia.akhil...@gmail.com> wrote:

> If you have enough cores/resources, run them separately depending on your
> use case.
>
>
> On Thursday 15 December 2016, Divya Gehlot 
> wrote:
>
>> It depends on the use case ...
>> Spark always depends on the resource availability .
>> As long as you have resource to acoomodate ,can run as many spark/spark
>> streaming  application.
>>
>>
>> Thanks,
>> Divya
>>
>> On 15 December 2016 at 08:42, shyla deshpande 
>> wrote:
>>
>>> How many Spark streaming applications can be run at a time on a Spark
>>> cluster?
>>>
>>> Is it better to have 1 spark streaming application to consume all the
>>> Kafka topics or have multiple streaming applications when possible to keep
>>> it simple?
>>>
>>> Thanks
>>>
>>>
>>


How many Spark streaming applications can be run at a time on a Spark cluster?

2016-12-14 Thread shyla deshpande
How many Spark streaming applications can be run at a time on a Spark
cluster?

Is it better to have 1 spark streaming application to consume all the Kafka
topics or have multiple streaming applications when possible to keep it
simple?

Thanks


Livy VS Spark Job Server

2016-12-12 Thread shyla deshpande
It will be helpful if someone can compare Livy and Spark Job Server.

Thanks


Wrting data from Spark streaming to AWS Redshift?

2016-12-09 Thread shyla deshpande
Hello all,

Is it possible to Write data from Spark streaming to AWS Redshift?

I came across the following article, so looks like it works from a Spark
batch program.

https://databricks.com/blog/2015/10/19/introducing-redshift-data-source-for-spark.html

I want to write to AWS Redshift from Spark Stream. Please share your
experience and reference docs.

Thanks


Re: Spark 2.0.2 , using DStreams in Spark Streaming . How do I create SQLContext? Please help

2016-12-01 Thread shyla deshpande
Used SparkSession, Works now. Thanks.

On Wed, Nov 30, 2016 at 11:02 PM, Deepak Sharma 
wrote:

> In Spark > 2.0 , spark session was introduced that you can use to query
> hive as well.
> Just make sure you create spark session with enableHiveSupport() option.
>
> Thanks
> Deepak
>
> On Thu, Dec 1, 2016 at 12:27 PM, shyla deshpande  > wrote:
>
>> I am Spark 2.0.2 , using DStreams because I need Cassandra Sink.
>>
>> How do I create SQLContext? I get the error SQLContext deprecated.
>>
>>
>> *[image: Inline image 1]*
>>
>> *Thanks*
>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Spark 2.0.2 , using DStreams in Spark Streaming . How do I create SQLContext? Please help

2016-11-30 Thread shyla deshpande
I am Spark 2.0.2 , using DStreams because I need Cassandra Sink.

How do I create SQLContext? I get the error SQLContext deprecated.


*[image: Inline image 1]*

*Thanks*


Re: updateStateByKey -- when the key is multi-column (like a composite key )

2016-11-30 Thread shyla deshpande
Thanks Miguel for the response.

Works great. I am having a tuple for my key and the values are String and
returning String to the updateStateByKey.

On Wed, Nov 30, 2016 at 12:33 PM, Miguel Morales 
wrote:

> I *think* you can return a map to updateStateByKey which would include
> your fields.  Another approach would be to create a hash (like create a
> json version of the hash and return that.)
>
> On Wed, Nov 30, 2016 at 12:30 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> updateStateByKey - Can this be used when the key is multi-column (like a
>> composite key ) and the value is not numeric. All the examples I have come
>> across is where the key is a simple String and the Value is numeric.
>>
>> Appreciate any help.
>>
>> Thanks
>>
>
>


updateStateByKey -- when the key is multi-column (like a composite key )

2016-11-30 Thread shyla deshpande
updateStateByKey - Can this be used when the key is multi-column (like a
composite key ) and the value is not numeric. All the examples I have come
across is where the key is a simple String and the Value is numeric.

Appreciate any help.

Thanks


Re: Do I have to wrap akka around spark streaming app?

2016-11-29 Thread shyla deshpande
Thanks Vincent for the feedback. I appreciate.

On Tue, Nov 29, 2016 at 1:34 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> You can still achieve it by implementing an actor in each partition but I
> am not sure it's a good design regarding scalability because your
> distributed actors would send a message for each event to your single app
> actor, it would be a huge load
> If you want to experiment this and because actor is thread safe you can
> use the following pattern which allows to reuse actors between micro
> batches in each partitions
> http://allegro.tech/2015/08/spark-kafka-integration.html
>
>
> 2016-11-29 2:18 GMT+01:00 shyla deshpande :
>
>> Hello All,
>>
>> I just want to make sure this is a right use case for Kafka --> Spark
>> Streaming
>>
>> Few words about my use case :
>>
>> When the user watches a video, I get the position events from the user
>> that indicates how much they have completed viewing and at a certain point,
>> I mark that Video as complete and that triggers a lot of other events. I
>> need a way to notify the app about the creation of the completion event.
>>
>> Appreciate any suggestions.
>>
>> Thanks
>>
>>
>> On Mon, Nov 28, 2016 at 2:35 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> In this case, persisting to Cassandra is for future analytics and
>>> Visualization.
>>>
>>> I want to notify that the app of the event, so it makes the app
>>> interactive.
>>>
>>> Thanks
>>>
>>> On Mon, Nov 28, 2016 at 2:24 PM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
>>>> Sorry I don't understand...
>>>> Is it a cassandra acknowledge to actors that you want ? Why do you want
>>>> to ack after writing to cassandra ? Your pipeline kafka=>spark=>cassandra
>>>> is supposed to be exactly once, so you don't need to wait for cassandra
>>>> ack, you can just write to kafka from actors and then notify the user ?
>>>>
>>>> 2016-11-28 23:15 GMT+01:00 shyla deshpande :
>>>>
>>>>> Thanks Vincent for the input. Not sure I understand your suggestion.
>>>>> Please clarify.
>>>>>
>>>>> Few words about my use case :
>>>>> When the user watches a video, I get the position events from the user
>>>>> that indicates how much they have completed viewing and at a certain 
>>>>> point,
>>>>> I mark that Video as complete and persist that info to cassandra.
>>>>>
>>>>> How do I notify the user that it was marked complete?
>>>>>
>>>>> Are you suggesting I write the completed events to kafka(different
>>>>> topic) and the akka consumer could read from this? There could be many
>>>>> completed events from different users in this topic. So the akka consumer
>>>>> should pretty much do what a spark streaming does to process this without
>>>>> the knowledge of the kafka offset.
>>>>>
>>>>> So not sure what you mean by kafka offsets will do the job, how will
>>>>> the akka consumer know the kafka offset?
>>>>>
>>>>> On Mon, Nov 28, 2016 at 12:52 PM, vincent gromakowski <
>>>>> vincent.gromakow...@gmail.com> wrote:
>>>>>
>>>>>> You don't need actors to do kafka=>spark processing=>kafka
>>>>>> Why do you need to notify the akka producer ? If you need to get back
>>>>>> the processed message in your producer, then implement an akka consummer 
>>>>>> in
>>>>>> your akka app and kafka offsets will do the job
>>>>>>
>>>>>> 2016-11-28 21:46 GMT+01:00 shyla deshpande 
>>>>>> :
>>>>>>
>>>>>>> Thanks Daniel for the response.
>>>>>>>
>>>>>>> I am planning to use Spark streaming to do Event Processing. I will
>>>>>>> have akka actors sending messages to kafka. I process them using Spark
>>>>>>> streaming and as a result a new events will be generated. How do I 
>>>>>>> notify
>>>>>>> the akka actor(Message producer)  that a new event has been generated?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 28, 2016 at 9:51 AM, Daniel van der Ende <
>>>>>>> daniel.vandere...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Well, I would say it depends on what you're trying to
>>>>>>>> achieve. Right now I don't know why you are considering using Akka. 
>>>>>>>> Could
>>>>>>>> you please explain your use case a bit?
>>>>>>>>
>>>>>>>> In general, there is no single correct answer to your current
>>>>>>>> question as it's quite broad.
>>>>>>>>
>>>>>>>> Daniel
>>>>>>>>
>>>>>>>> On Mon, Nov 28, 2016 at 9:11 AM, shyla deshpande <
>>>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>>>>>>>>>
>>>>>>>>> Can someone please explain me when would I need to wrap akka
>>>>>>>>> around the spark streaming app. My knowledge of akka and the actor 
>>>>>>>>> system
>>>>>>>>> is poor. Please help!
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Daniel
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
Hello All,

I just want to make sure this is a right use case for Kafka --> Spark
Streaming

Few words about my use case :

When the user watches a video, I get the position events from the user that
indicates how much they have completed viewing and at a certain point, I
mark that Video as complete and that triggers a lot of other events. I need
a way to notify the app about the creation of the completion event.

Appreciate any suggestions.

Thanks


On Mon, Nov 28, 2016 at 2:35 PM, shyla deshpande 
wrote:

> In this case, persisting to Cassandra is for future analytics and
> Visualization.
>
> I want to notify that the app of the event, so it makes the app
> interactive.
>
> Thanks
>
> On Mon, Nov 28, 2016 at 2:24 PM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Sorry I don't understand...
>> Is it a cassandra acknowledge to actors that you want ? Why do you want
>> to ack after writing to cassandra ? Your pipeline kafka=>spark=>cassandra
>> is supposed to be exactly once, so you don't need to wait for cassandra
>> ack, you can just write to kafka from actors and then notify the user ?
>>
>> 2016-11-28 23:15 GMT+01:00 shyla deshpande :
>>
>>> Thanks Vincent for the input. Not sure I understand your suggestion.
>>> Please clarify.
>>>
>>> Few words about my use case :
>>> When the user watches a video, I get the position events from the user
>>> that indicates how much they have completed viewing and at a certain point,
>>> I mark that Video as complete and persist that info to cassandra.
>>>
>>> How do I notify the user that it was marked complete?
>>>
>>> Are you suggesting I write the completed events to kafka(different
>>> topic) and the akka consumer could read from this? There could be many
>>> completed events from different users in this topic. So the akka consumer
>>> should pretty much do what a spark streaming does to process this without
>>> the knowledge of the kafka offset.
>>>
>>> So not sure what you mean by kafka offsets will do the job, how will
>>> the akka consumer know the kafka offset?
>>>
>>> On Mon, Nov 28, 2016 at 12:52 PM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
>>>> You don't need actors to do kafka=>spark processing=>kafka
>>>> Why do you need to notify the akka producer ? If you need to get back
>>>> the processed message in your producer, then implement an akka consummer in
>>>> your akka app and kafka offsets will do the job
>>>>
>>>> 2016-11-28 21:46 GMT+01:00 shyla deshpande :
>>>>
>>>>> Thanks Daniel for the response.
>>>>>
>>>>> I am planning to use Spark streaming to do Event Processing. I will
>>>>> have akka actors sending messages to kafka. I process them using Spark
>>>>> streaming and as a result a new events will be generated. How do I notify
>>>>> the akka actor(Message producer)  that a new event has been generated?
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 28, 2016 at 9:51 AM, Daniel van der Ende <
>>>>> daniel.vandere...@gmail.com> wrote:
>>>>>
>>>>>> Well, I would say it depends on what you're trying to achieve. Right
>>>>>> now I don't know why you are considering using Akka. Could you please
>>>>>> explain your use case a bit?
>>>>>>
>>>>>> In general, there is no single correct answer to your current
>>>>>> question as it's quite broad.
>>>>>>
>>>>>> Daniel
>>>>>>
>>>>>> On Mon, Nov 28, 2016 at 9:11 AM, shyla deshpande <
>>>>>> deshpandesh...@gmail.com> wrote:
>>>>>>
>>>>>>> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>>>>>>>
>>>>>>> Can someone please explain me when would I need to wrap akka around
>>>>>>> the spark streaming app. My knowledge of akka and the actor system is 
>>>>>>> poor.
>>>>>>> Please help!
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Daniel
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
In this case, persisting to Cassandra is for future analytics and
Visualization.

I want to notify that the app of the event, so it makes the app interactive.

Thanks

On Mon, Nov 28, 2016 at 2:24 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Sorry I don't understand...
> Is it a cassandra acknowledge to actors that you want ? Why do you want to
> ack after writing to cassandra ? Your pipeline kafka=>spark=>cassandra is
> supposed to be exactly once, so you don't need to wait for cassandra ack,
> you can just write to kafka from actors and then notify the user ?
>
> 2016-11-28 23:15 GMT+01:00 shyla deshpande :
>
>> Thanks Vincent for the input. Not sure I understand your suggestion.
>> Please clarify.
>>
>> Few words about my use case :
>> When the user watches a video, I get the position events from the user
>> that indicates how much they have completed viewing and at a certain point,
>> I mark that Video as complete and persist that info to cassandra.
>>
>> How do I notify the user that it was marked complete?
>>
>> Are you suggesting I write the completed events to kafka(different topic)
>> and the akka consumer could read from this? There could be many completed
>> events from different users in this topic. So the akka consumer should
>> pretty much do what a spark streaming does to process this without the
>> knowledge of the kafka offset.
>>
>> So not sure what you mean by kafka offsets will do the job, how will the
>> akka consumer know the kafka offset?
>>
>> On Mon, Nov 28, 2016 at 12:52 PM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> You don't need actors to do kafka=>spark processing=>kafka
>>> Why do you need to notify the akka producer ? If you need to get back
>>> the processed message in your producer, then implement an akka consummer in
>>> your akka app and kafka offsets will do the job
>>>
>>> 2016-11-28 21:46 GMT+01:00 shyla deshpande :
>>>
>>>> Thanks Daniel for the response.
>>>>
>>>> I am planning to use Spark streaming to do Event Processing. I will
>>>> have akka actors sending messages to kafka. I process them using Spark
>>>> streaming and as a result a new events will be generated. How do I notify
>>>> the akka actor(Message producer)  that a new event has been generated?
>>>>
>>>>
>>>>
>>>> On Mon, Nov 28, 2016 at 9:51 AM, Daniel van der Ende <
>>>> daniel.vandere...@gmail.com> wrote:
>>>>
>>>>> Well, I would say it depends on what you're trying to achieve. Right
>>>>> now I don't know why you are considering using Akka. Could you please
>>>>> explain your use case a bit?
>>>>>
>>>>> In general, there is no single correct answer to your current question
>>>>> as it's quite broad.
>>>>>
>>>>> Daniel
>>>>>
>>>>> On Mon, Nov 28, 2016 at 9:11 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>>>>>>
>>>>>> Can someone please explain me when would I need to wrap akka around
>>>>>> the spark streaming app. My knowledge of akka and the actor system is 
>>>>>> poor.
>>>>>> Please help!
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Daniel
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
Thanks Vincent for the input. Not sure I understand your suggestion. Please
clarify.

Few words about my use case :
When the user watches a video, I get the position events from the user that
indicates how much they have completed viewing and at a certain point, I
mark that Video as complete and persist that info to cassandra.

How do I notify the user that it was marked complete?

Are you suggesting I write the completed events to kafka(different topic)
and the akka consumer could read from this? There could be many completed
events from different users in this topic. So the akka consumer should
pretty much do what a spark streaming does to process this without the
knowledge of the kafka offset.

So not sure what you mean by kafka offsets will do the job, how will the
akka consumer know the kafka offset?

On Mon, Nov 28, 2016 at 12:52 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> You don't need actors to do kafka=>spark processing=>kafka
> Why do you need to notify the akka producer ? If you need to get back the
> processed message in your producer, then implement an akka consummer in
> your akka app and kafka offsets will do the job
>
> 2016-11-28 21:46 GMT+01:00 shyla deshpande :
>
>> Thanks Daniel for the response.
>>
>> I am planning to use Spark streaming to do Event Processing. I will have
>> akka actors sending messages to kafka. I process them using Spark streaming
>> and as a result a new events will be generated. How do I notify the akka
>> actor(Message producer)  that a new event has been generated?
>>
>>
>>
>> On Mon, Nov 28, 2016 at 9:51 AM, Daniel van der Ende <
>> daniel.vandere...@gmail.com> wrote:
>>
>>> Well, I would say it depends on what you're trying to achieve. Right now
>>> I don't know why you are considering using Akka. Could you please explain
>>> your use case a bit?
>>>
>>> In general, there is no single correct answer to your current question
>>> as it's quite broad.
>>>
>>> Daniel
>>>
>>> On Mon, Nov 28, 2016 at 9:11 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>>>>
>>>> Can someone please explain me when would I need to wrap akka around the
>>>> spark streaming app. My knowledge of akka and the actor system is poor.
>>>> Please help!
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>>
>>> --
>>> Daniel
>>>
>>
>>
>


Re: Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
Thanks Daniel for the response.

I am planning to use Spark streaming to do Event Processing. I will have
akka actors sending messages to kafka. I process them using Spark streaming
and as a result a new events will be generated. How do I notify the akka
actor(Message producer)  that a new event has been generated?



On Mon, Nov 28, 2016 at 9:51 AM, Daniel van der Ende <
daniel.vandere...@gmail.com> wrote:

> Well, I would say it depends on what you're trying to achieve. Right now I
> don't know why you are considering using Akka. Could you please explain
> your use case a bit?
>
> In general, there is no single correct answer to your current question as
> it's quite broad.
>
> Daniel
>
> On Mon, Nov 28, 2016 at 9:11 AM, shyla deshpande  > wrote:
>
>> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>>
>> Can someone please explain me when would I need to wrap akka around the
>> spark streaming app. My knowledge of akka and the actor system is poor.
>> Please help!
>>
>> Thanks
>>
>
>
>
> --
> Daniel
>


Re: Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
Anyone with experience of spark streaming in production, appreciate your
input.

Thanks
-shyla

On Mon, Nov 28, 2016 at 12:11 AM, shyla deshpande 
wrote:

> My data pipeline is Kafka --> Spark Streaming --> Cassandra.
>
> Can someone please explain me when would I need to wrap akka around the
> spark streaming app. My knowledge of akka and the actor system is poor.
> Please help!
>
> Thanks
>


Do I have to wrap akka around spark streaming app?

2016-11-28 Thread shyla deshpande
My data pipeline is Kafka --> Spark Streaming --> Cassandra.

Can someone please explain me when would I need to wrap akka around the
spark streaming app. My knowledge of akka and the actor system is poor.
Please help!

Thanks


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread shyla deshpande
Has anyone written a custom sink to persist data to Cassandra from
structured streaming.

Please provide me any link or reference. Thanks

On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
wrote:

> Forgot the link: https://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#using-foreach
>
> On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
> wrote:
>
>> We are looking to add a native JDBC sink in Spark 2.2.  Until then you
>> can write your own connector using df.writeStream.foreach.
>>
>> On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Structured streaming works great with Kafka source but I need to persist
>>> the data after processing in some database like Cassandra or at least
>>> Postgres.
>>>
>>> Any suggestions, help please.
>>>
>>> Thanks
>>>
>>
>>
>


  1   2   >