Re: How to use ManualClock with Spark streaming

2017-04-05 Thread Hemalatha A
Any updates on how can I use ManualClock other than editing the Spark
source code?

On Wed, Mar 1, 2017 at 10:19 AM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> It is certainly possible through a hack.
> I was referring to below post where TD says it is possible thru a hack. I
> wanted to know if there is  any way other than editing the Spark source
> code.
>
> https://groups.google.com/forum/#!searchin/spark-users/manua
> lclock%7Csort:relevance/spark-users/ES8X1l_xn5s/6PvGGRDfgnMJ
>
> On Wed, Mar 1, 2017 at 7:09 AM, Saisai Shao 
> wrote:
>
>> I don't think using ManualClock is a right way to fix your problem here
>> in Spark Streaming.
>>
>> ManualClock in Spark is mainly used for unit test, it should manually
>> advance the time to make the unit test work. The usage looks different
>> compared to the scenario you mentioned.
>>
>> Thanks
>> Jerry
>>
>> On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A <
>> hemalatha.amru...@googlemail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am running streaming application reading data from kafka and
>>> performing window operations on it. I have a usecase where  all incoming
>>> events have a fixed latency of 10s, which means data belonging to minute
>>> 10:00:00 will arrive 10s late at 10:00:10.
>>>
>>> I want to set the spark clock to "Manualclock" and set the time behind
>>> by 10s so that the batch calculation triggers at 10:00:10, during which
>>> time all the events for the previous minute has arrived.
>>>
>>> But, I see that "spark.streaming.clock" is hardcoded to "
>>> org.apache.spark.util.SystemClock" in the code.
>>>
>>> Is there a way to easily  hack this property to use Manual clock.
>>> --
>>>
>>>
>>> Regards
>>> Hemalatha
>>>
>>
>>
>
>
> --
>
>
> Regards
> Hemalatha
>



-- 


Regards
Hemalatha


How to use ManualClock with Spark streaming

2017-02-28 Thread Hemalatha A
Hi,

I am running streaming application reading data from kafka and performing
window operations on it. I have a usecase where  all incoming events have a
fixed latency of 10s, which means data belonging to minute 10:00:00 will
arrive 10s late at 10:00:10.

I want to set the spark clock to "Manualclock" and set the time behind by
10s so that the batch calculation triggers at 10:00:10, during which time
all the events for the previous minute has arrived.

But, I see that "spark.streaming.clock" is hardcoded to
"org.apache.spark.util.SystemClock"
in the code.

Is there a way to easily  hack this property to use Manual clock.
-- 


Regards
Hemalatha


Re: How does chaining of Windowed Dstreams work?

2016-09-27 Thread Hemalatha A
Hello,

Can anyone please answer the below question and help me understand the
windowing operations.

On Sun, Sep 4, 2016 at 4:42 PM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Hello,
>
> I have a set of Dstreams on which I'm  performing some computation on each
> Dstreams which is widowed on one other from the base stream based on the
> order of window intervals. I want to find out the best Strem on which I
> could window a particular stream on?
>
> Suppose, I have a spark Dstream, with batch interval as 10sec and other
> streams are windowed on base steams as below:
>
> *Stream*
>
> *Window*
>
> *Sliding*
>
> *Windowed On*
>
> StreamA
>
> 30
>
> 10
>
> Base Stream
>
> StreamB
>
> 20
>
> 20
>
> Base Stream
>
> StreamC
>
> 90
>
> 20
>
> ?
>
>
>
> Now, should I base the StreamC on StreamA since its window is multiple of
> StreamA or base it on  StreamB since it has a higher and same sliding
> interval. Which would be a better choice?
>
>
> Or is it the same as window on Base stream? How does it basically work?
>
>
> --
>
>
> Regards
> Hemalatha
>



-- 


Regards
Hemalatha


How does chaining of Windowed Dstreams work?

2016-09-04 Thread Hemalatha A
Hello,

I have a set of Dstreams on which I'm  performing some computation on each
Dstreams which is widowed on one other from the base stream based on the
order of window intervals. I want to find out the best Strem on which I
could window a particular stream on?

Suppose, I have a spark Dstream, with batch interval as 10sec and other
streams are windowed on base steams as below:

*Stream*

*Window*

*Sliding*

*Windowed On*

StreamA

30

10

Base Stream

StreamB

20

20

Base Stream

StreamC

90

20

?



Now, should I base the StreamC on StreamA since its window is multiple of
StreamA or base it on  StreamB since it has a higher and same sliding
interval. Which would be a better choice?


Or is it the same as window on Base stream? How does it basically work?


-- 


Regards
Hemalatha


Any exceptions during an action doesn't fail the Spark streaming batch in yarn-client mode

2016-08-07 Thread Hemalatha A
Hello,

I am seeing multiple exceptions shown in logs during an action, but none of
them fails the  Spark streaming batch in yarn-client mode, whereas the same
exception is thrown in Yarn-cluster mode and the application ends.

I am trying to save a Dataframe To cassandra, which results in error due to
wrong password lets say. The job goes to failed state throwing the below
exception in Jobs tab in Spark UI but in the streaming tab, the
corresponding batch remains in active state forever.It doesn't fail the
streaming batch in yarn-client mode.. Whereas, the same works fine in
Yarn-cluster mode, it throws the same error and ends the application.

Why is this difference in behaviour in the 2 modes? Why does yarn-client
mode behaves in this way?

*Exception seen in both modes:*

16/08/04 08:04:43 ERROR
org.apache.spark.streaming.scheduler.JobScheduler: Error running job
streaming job 147029788 ms.0
java.io.IOException: Failed to open native connection to Cassandra at
{172.x.x.x}:9042
at 
com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at 
com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at 
com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at 
com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at 
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at 
com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184)
at 
org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267)
at 
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:84)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at 
boccstreamingall$$anonfun$process_kv_text_stream$1.apply(bocc_spark_all.scala:249)
at 
boccstreamingall$$anonfun$process_kv_text_stream$1.apply(bocc_spark_all.scala:233)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException:
Authentication error on host /172.x.x.x:9042: Username and/or password
are incorrect
at com.datastax.driver.core.Connection$8.apply(Connection.java:376)
at com.datastax.driver.core.Connection$8.apply(Connection.java:346)
at 
shadeio.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)
at 
shadeio.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at 
shadeio.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
at 
shadeio.common.util.concurrent.ExecutionList.execut

Re: Fail a batch in Spark Streaming forcefully based on business rules

2016-07-28 Thread Hemalatha A
Another usecase why I need to do this is, If Exception A is caught I should
just print it and ignore, but ifException B occurs, I have to end the
batch, fail it and stop processing the batch.
Is it possible to achieve this?? Any hints on this please.


On Wed, Jul 27, 2016 at 10:42 AM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Hello,
>
> I have a uescase where in, I have  to fail certain batches in my
> streaming batches, based on my application specific business rules.
> Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail
> the batch and move on.
>
> How to achieve this behavior?
>
> --
>
>
> Regards
> Hemalatha
>



-- 


Regards
Hemalatha


Fail a batch in Spark Streaming forcefully based on business rules

2016-07-26 Thread Hemalatha A
Hello,

I have a uescase where in, I have  to fail certain batches in my streaming
batches, based on my application specific business rules.
Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail
the batch and move on.

How to achieve this behavior?

-- 


Regards
Hemalatha


How to resolve Scheduling delay in Spark streaming applications?

2016-05-10 Thread Hemalatha A
Hello,

We are facing large  Scheduling delay in our  Spark streaming application.
Not sure how to debug why the delay is happening. We have all the tuning
possible on Spark side.

Can someone advice how to debug the cause of the delay and some tips for
resolving it please?

-- 


Regards
Hemalatha


Spark streaming batch time displayed is not current system time but it is processing current messages

2016-04-16 Thread Hemalatha A
Can anyone help me in debugging  this issue please.


On Thu, Apr 14, 2016 at 12:24 PM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Hi,
>
> I am facing a problem in Spark streaming.
>


Time: 1460823006000 ms
---

---
Time: 1460823008000 ms
---




> The time displayed in Spark streaming console as above is 4 days prior
> i.e.,  April 10th, which is not current system time of the cluster  but the
> job is processing current messages that is pushed right now April 14th.
>
> Can anyone please advice what time does Spark streaming display? Also,
> when there  is scheduling delay of say 8 hours, what time does Spark
> display- current rime or   hours behind?
>
> --
>
>
> Regards
> Hemalatha
>



-- 


Regards
Hemalatha


Spark streaming time displayed is not current system time but it is processing current messages

2016-04-13 Thread Hemalatha A
Hi,

I am facing a problem in Spark streaming. The time displayed in Spark
streaming console is 4 days prior i.e.,  April 10th, which is not current
system time of the cluster  but the job is processing current messages that
is pushed right now April 14th.

Can anyone please advice what time does Spark streaming display? Also, when
there  is scheduling delay of say 8 hours, what time does Spark display-
current rime or   hours behind?

-- 


Regards
Hemalatha


How Application jar is copied to worker machines?

2016-04-10 Thread Hemalatha A
Hello,

I want to know on doing spark-submit, how is the Application jar copied to
worker machines? Who does the copying of Jars?

Similarly who copies DAG from driver to executors?

-- 


Regards
Hemalatha


[no subject]

2016-04-02 Thread Hemalatha A
Hello,

As per Spark programming guide, it says "we should have 2-4 partitions for
each CPU in your cluster.". In this case how does 1 CPU core process 2-4
partitions at the same time?
Link - http://spark.apache.org/docs/latest/programming-guide.html (under
Rdd section)

Does it do context switching between tasks or run them in parallel? If it
does context switching how is it efficient compared to 1:1 partition vs
Core?

PS: If we are using Kafka direct API  in which kafka partitions=  Rdd
partitions. Does that mean we should give 40 kafka partitions for 10 CPU
Cores?

-- 


Regards
Hemalatha


Side effects of using var inside a class object in a Rdd

2016-02-15 Thread Hemalatha A
Hello,

I want to know what are the cons and performance impacts of using a var
inside class object in a Rdd.


Here is a example:

Animal is a huge class with n number of val type variables (approx >600
variables), but frequently, we will have to update Age(just 1 variable)
after some computation. What is the best way to do it?

Class Animal(age: Int, name; String) = {
 var animalAge:Int  = age
 val animalName:String  = name
val ..
}


val animalRdd = sc.parallelize(List(Animal(1,"XYZ"), Animal(2,"ABC") ))
...
...
animalRdd.map(ani=>{
 if(ani.yearChange()) ani.animalAge+=1
 ani
})


Is it advisable to use var in this case? Or can I do ani.copy(animalAge=2)
which will reallocate the memory altogether for the animal. Please advice
which is the best way to handle such cases.



Regards
Hemalatha