Re: How to use ManualClock with Spark streaming
Any updates on how can I use ManualClock other than editing the Spark source code? On Wed, Mar 1, 2017 at 10:19 AM, Hemalatha A < hemalatha.amru...@googlemail.com> wrote: > It is certainly possible through a hack. > I was referring to below post where TD says it is possible thru a hack. I > wanted to know if there is any way other than editing the Spark source > code. > > https://groups.google.com/forum/#!searchin/spark-users/manua > lclock%7Csort:relevance/spark-users/ES8X1l_xn5s/6PvGGRDfgnMJ > > On Wed, Mar 1, 2017 at 7:09 AM, Saisai Shao > wrote: > >> I don't think using ManualClock is a right way to fix your problem here >> in Spark Streaming. >> >> ManualClock in Spark is mainly used for unit test, it should manually >> advance the time to make the unit test work. The usage looks different >> compared to the scenario you mentioned. >> >> Thanks >> Jerry >> >> On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A < >> hemalatha.amru...@googlemail.com> wrote: >> >>> >>> Hi, >>> >>> I am running streaming application reading data from kafka and >>> performing window operations on it. I have a usecase where all incoming >>> events have a fixed latency of 10s, which means data belonging to minute >>> 10:00:00 will arrive 10s late at 10:00:10. >>> >>> I want to set the spark clock to "Manualclock" and set the time behind >>> by 10s so that the batch calculation triggers at 10:00:10, during which >>> time all the events for the previous minute has arrived. >>> >>> But, I see that "spark.streaming.clock" is hardcoded to " >>> org.apache.spark.util.SystemClock" in the code. >>> >>> Is there a way to easily hack this property to use Manual clock. >>> -- >>> >>> >>> Regards >>> Hemalatha >>> >> >> > > > -- > > > Regards > Hemalatha > -- Regards Hemalatha
How to use ManualClock with Spark streaming
Hi, I am running streaming application reading data from kafka and performing window operations on it. I have a usecase where all incoming events have a fixed latency of 10s, which means data belonging to minute 10:00:00 will arrive 10s late at 10:00:10. I want to set the spark clock to "Manualclock" and set the time behind by 10s so that the batch calculation triggers at 10:00:10, during which time all the events for the previous minute has arrived. But, I see that "spark.streaming.clock" is hardcoded to "org.apache.spark.util.SystemClock" in the code. Is there a way to easily hack this property to use Manual clock. -- Regards Hemalatha
Re: How does chaining of Windowed Dstreams work?
Hello, Can anyone please answer the below question and help me understand the windowing operations. On Sun, Sep 4, 2016 at 4:42 PM, Hemalatha A < hemalatha.amru...@googlemail.com> wrote: > Hello, > > I have a set of Dstreams on which I'm performing some computation on each > Dstreams which is widowed on one other from the base stream based on the > order of window intervals. I want to find out the best Strem on which I > could window a particular stream on? > > Suppose, I have a spark Dstream, with batch interval as 10sec and other > streams are windowed on base steams as below: > > *Stream* > > *Window* > > *Sliding* > > *Windowed On* > > StreamA > > 30 > > 10 > > Base Stream > > StreamB > > 20 > > 20 > > Base Stream > > StreamC > > 90 > > 20 > > ? > > > > Now, should I base the StreamC on StreamA since its window is multiple of > StreamA or base it on StreamB since it has a higher and same sliding > interval. Which would be a better choice? > > > Or is it the same as window on Base stream? How does it basically work? > > > -- > > > Regards > Hemalatha > -- Regards Hemalatha
How does chaining of Windowed Dstreams work?
Hello, I have a set of Dstreams on which I'm performing some computation on each Dstreams which is widowed on one other from the base stream based on the order of window intervals. I want to find out the best Strem on which I could window a particular stream on? Suppose, I have a spark Dstream, with batch interval as 10sec and other streams are windowed on base steams as below: *Stream* *Window* *Sliding* *Windowed On* StreamA 30 10 Base Stream StreamB 20 20 Base Stream StreamC 90 20 ? Now, should I base the StreamC on StreamA since its window is multiple of StreamA or base it on StreamB since it has a higher and same sliding interval. Which would be a better choice? Or is it the same as window on Base stream? How does it basically work? -- Regards Hemalatha
Any exceptions during an action doesn't fail the Spark streaming batch in yarn-client mode
Hello, I am seeing multiple exceptions shown in logs during an action, but none of them fails the Spark streaming batch in yarn-client mode, whereas the same exception is thrown in Yarn-cluster mode and the application ends. I am trying to save a Dataframe To cassandra, which results in error due to wrong password lets say. The job goes to failed state throwing the below exception in Jobs tab in Spark UI but in the streaming tab, the corresponding batch remains in active state forever.It doesn't fail the streaming batch in yarn-client mode.. Whereas, the same works fine in Yarn-cluster mode, it throws the same error and ends the application. Why is this difference in behaviour in the 2 modes? Why does yarn-client mode behaves in this way? *Exception seen in both modes:* 16/08/04 08:04:43 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 147029788 ms.0 java.io.IOException: Failed to open native connection to Cassandra at {172.x.x.x}:9042 at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184) at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:84) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at boccstreamingall$$anonfun$process_kv_text_stream$1.apply(bocc_spark_all.scala:249) at boccstreamingall$$anonfun$process_kv_text_stream$1.apply(bocc_spark_all.scala:233) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /172.x.x.x:9042: Username and/or password are incorrect at com.datastax.driver.core.Connection$8.apply(Connection.java:376) at com.datastax.driver.core.Connection$8.apply(Connection.java:346) at shadeio.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861) at shadeio.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at shadeio.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156) at shadeio.common.util.concurrent.ExecutionList.execut
Re: Fail a batch in Spark Streaming forcefully based on business rules
Another usecase why I need to do this is, If Exception A is caught I should just print it and ignore, but ifException B occurs, I have to end the batch, fail it and stop processing the batch. Is it possible to achieve this?? Any hints on this please. On Wed, Jul 27, 2016 at 10:42 AM, Hemalatha A < hemalatha.amru...@googlemail.com> wrote: > Hello, > > I have a uescase where in, I have to fail certain batches in my > streaming batches, based on my application specific business rules. > Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail > the batch and move on. > > How to achieve this behavior? > > -- > > > Regards > Hemalatha > -- Regards Hemalatha
Fail a batch in Spark Streaming forcefully based on business rules
Hello, I have a uescase where in, I have to fail certain batches in my streaming batches, based on my application specific business rules. Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail the batch and move on. How to achieve this behavior? -- Regards Hemalatha
How to resolve Scheduling delay in Spark streaming applications?
Hello, We are facing large Scheduling delay in our Spark streaming application. Not sure how to debug why the delay is happening. We have all the tuning possible on Spark side. Can someone advice how to debug the cause of the delay and some tips for resolving it please? -- Regards Hemalatha
Spark streaming batch time displayed is not current system time but it is processing current messages
Can anyone help me in debugging this issue please. On Thu, Apr 14, 2016 at 12:24 PM, Hemalatha A < hemalatha.amru...@googlemail.com> wrote: > Hi, > > I am facing a problem in Spark streaming. > Time: 1460823006000 ms --- --- Time: 1460823008000 ms --- > The time displayed in Spark streaming console as above is 4 days prior > i.e., April 10th, which is not current system time of the cluster but the > job is processing current messages that is pushed right now April 14th. > > Can anyone please advice what time does Spark streaming display? Also, > when there is scheduling delay of say 8 hours, what time does Spark > display- current rime or hours behind? > > -- > > > Regards > Hemalatha > -- Regards Hemalatha
Spark streaming time displayed is not current system time but it is processing current messages
Hi, I am facing a problem in Spark streaming. The time displayed in Spark streaming console is 4 days prior i.e., April 10th, which is not current system time of the cluster but the job is processing current messages that is pushed right now April 14th. Can anyone please advice what time does Spark streaming display? Also, when there is scheduling delay of say 8 hours, what time does Spark display- current rime or hours behind? -- Regards Hemalatha
How Application jar is copied to worker machines?
Hello, I want to know on doing spark-submit, how is the Application jar copied to worker machines? Who does the copying of Jars? Similarly who copies DAG from driver to executors? -- Regards Hemalatha
[no subject]
Hello, As per Spark programming guide, it says "we should have 2-4 partitions for each CPU in your cluster.". In this case how does 1 CPU core process 2-4 partitions at the same time? Link - http://spark.apache.org/docs/latest/programming-guide.html (under Rdd section) Does it do context switching between tasks or run them in parallel? If it does context switching how is it efficient compared to 1:1 partition vs Core? PS: If we are using Kafka direct API in which kafka partitions= Rdd partitions. Does that mean we should give 40 kafka partitions for 10 CPU Cores? -- Regards Hemalatha
Side effects of using var inside a class object in a Rdd
Hello, I want to know what are the cons and performance impacts of using a var inside class object in a Rdd. Here is a example: Animal is a huge class with n number of val type variables (approx >600 variables), but frequently, we will have to update Age(just 1 variable) after some computation. What is the best way to do it? Class Animal(age: Int, name; String) = { var animalAge:Int = age val animalName:String = name val .. } val animalRdd = sc.parallelize(List(Animal(1,"XYZ"), Animal(2,"ABC") )) ... ... animalRdd.map(ani=>{ if(ani.yearChange()) ani.animalAge+=1 ani }) Is it advisable to use var in this case? Or can I do ani.copy(animalAge=2) which will reallocate the memory altogether for the animal. Please advice which is the best way to handle such cases. Regards Hemalatha