Re: Kafka spark structure streaming out of memory issue

2020-08-13 Thread Srinivas V
It depends on how much memory is available and how much data you are
processing. Please provide data size and cluster details to help.

On Fri, Aug 14, 2020 at 12:54 AM km.santanu  wrote:

> Hi
> I am using Kafka stateless structure streaming.i have enabled watermark as
> 1
> hour.after long running about 2 hour my job is terminating
> automatically.check point has been enabled.
> I am doing average on input data.
> Can you please suggest how to avoid out of memory error
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Metrics Problem

2020-06-30 Thread Srinivas V
Then it should permission issue. What kind of cluster is it and which user
is running it ? Does that user have hdfs permissions to access the folder
where the jar file is ?

On Mon, Jun 29, 2020 at 1:17 AM Bryan Jeffrey 
wrote:

> Srinivas,
>
> Interestingly, I did have the metrics jar packaged as part of my main jar.
> It worked well both on driver and locally, but not on executors.
>
> Regards,
>
> Bryan Jeffrey
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------
> *From:* Srinivas V 
> *Sent:* Saturday, June 27, 2020 1:23:24 AM
>
> *To:* Bryan Jeffrey 
> *Cc:* user 
> *Subject:* Re: Metrics Problem
>
> One option is to create your main jar included with metrics jar like a fat
> jar.
>
> On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
> wrote:
>
> Srinivas,
>
> Thanks for the insight. I had not considered a dependency issue as the
> metrics jar works well applied on the driver. Perhaps my main jar
> includes the Hadoop dependencies but the metrics jar does not?
>
> I am confused as the only Hadoop dependency also exists for the built in
> metrics providers which appear to work.
>
> Regards,
>
> Bryan
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> --
> *From:* Srinivas V 
> *Sent:* Friday, June 26, 2020 9:47:52 PM
> *To:* Bryan Jeffrey 
> *Cc:* user 
> *Subject:* Re: Metrics Problem
>
> It should work when you are giving hdfs path as long as your jar exists in
> the path.
> Your error is more security issue (Kerberos) or Hadoop dependencies
> missing I think, your error says :
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation
>
> On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
> wrote:
>
> It may be helpful to note that I'm running in Yarn cluster mode.  My goal
> is to avoid having to manually distribute the JAR to all of the various
> nodes as this makes versioning deployments difficult.
>
> On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
> wrote:
>
> Hello.
>
> I am running Spark 2.4.4. I have implemented a custom metrics producer. It
> works well when I run locally, or specify the metrics producer only for the
> driver.  When I ask for executor metrics I run into ClassNotFoundExceptions
>
> *Is it possible to pass a metrics JAR via --jars?  If so what am I
> missing?*
>
> Deploy driver stats via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> However, when I pass the JAR with the metrics provider to executors via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> I get ClassNotFoundException:
>
> 20/06/25 21:19:35 ERROR MetricsSystem: Sink class
> org.apache.spark.custommetricssink cannot be instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.custommetricssink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
> at org.apache.spark.metrics.Me

Re: Metrics Problem

2020-06-26 Thread Srinivas V
One option is to create your main jar included with metrics jar like a fat
jar.

On Sat, Jun 27, 2020 at 8:04 AM Bryan Jeffrey 
wrote:

> Srinivas,
>
> Thanks for the insight. I had not considered a dependency issue as the
> metrics jar works well applied on the driver. Perhaps my main jar
> includes the Hadoop dependencies but the metrics jar does not?
>
> I am confused as the only Hadoop dependency also exists for the built in
> metrics providers which appear to work.
>
> Regards,
>
> Bryan
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> --
> *From:* Srinivas V 
> *Sent:* Friday, June 26, 2020 9:47:52 PM
> *To:* Bryan Jeffrey 
> *Cc:* user 
> *Subject:* Re: Metrics Problem
>
> It should work when you are giving hdfs path as long as your jar exists in
> the path.
> Your error is more security issue (Kerberos) or Hadoop dependencies
> missing I think, your error says :
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation
>
> On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
> wrote:
>
> It may be helpful to note that I'm running in Yarn cluster mode.  My goal
> is to avoid having to manually distribute the JAR to all of the various
> nodes as this makes versioning deployments difficult.
>
> On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
> wrote:
>
> Hello.
>
> I am running Spark 2.4.4. I have implemented a custom metrics producer. It
> works well when I run locally, or specify the metrics producer only for the
> driver.  When I ask for executor metrics I run into ClassNotFoundExceptions
>
> *Is it possible to pass a metrics JAR via --jars?  If so what am I
> missing?*
>
> Deploy driver stats via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> However, when I pass the JAR with the metrics provider to executors via:
> --jars hdfs:///custommetricsprovider.jar
> --conf
> spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink
>
> I get ClassNotFoundException:
>
> 20/06/25 21:19:35 ERROR MetricsSystem: Sink class
> org.apache.spark.custommetricssink cannot be instantiated
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.custommetricssink
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
> at
> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at
> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
> at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> ... 4 more
>
> Is it possible to pass a metrics JAR via --jars?  If so what am I missing?
>
> Thank you,
>
> Bryan
>
>


Re: Metrics Problem

2020-06-26 Thread Srinivas V
It should work when you are giving hdfs path as long as your jar exists in
the path.
Your error is more security issue (Kerberos) or Hadoop dependencies missing
I think, your error says :
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation

On Fri, Jun 26, 2020 at 8:44 PM Bryan Jeffrey 
wrote:

> It may be helpful to note that I'm running in Yarn cluster mode.  My goal
> is to avoid having to manually distribute the JAR to all of the various
> nodes as this makes versioning deployments difficult.
>
> On Thu, Jun 25, 2020 at 5:32 PM Bryan Jeffrey 
> wrote:
>
>> Hello.
>>
>> I am running Spark 2.4.4. I have implemented a custom metrics producer.
>> It works well when I run locally, or specify the metrics producer only for
>> the driver.  When I ask for executor metrics I run into
>> ClassNotFoundExceptions
>>
>> *Is it possible to pass a metrics JAR via --jars?  If so what am I
>> missing?*
>>
>> Deploy driver stats via:
>> --jars hdfs:///custommetricsprovider.jar
>> --conf
>> spark.metrics.conf.driver.sink.metrics.class=org.apache.spark.mycustommetricssink
>>
>> However, when I pass the JAR with the metrics provider to executors via:
>> --jars hdfs:///custommetricsprovider.jar
>> --conf
>> spark.metrics.conf.executor.sink.metrics.class=org.apache.spark.mycustommetricssink
>>
>> I get ClassNotFoundException:
>>
>> 20/06/25 21:19:35 ERROR MetricsSystem: Sink class
>> org.apache.spark.custommetricssink cannot be instantiated
>> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
>> at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.custommetricssink
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
>> at
>> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
>> at
>> org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>> at
>> org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
>> at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
>> at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:221)
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>> ... 4 more
>>
>> Is it possible to pass a metrics JAR via --jars?  If so what am I missing?
>>
>> Thank you,
>>
>> Bryan
>>
>


Re: Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Srinivas V
Cool. Are you not using watermark ?
Also, is it possible to start listening offsets from a specific date time ?

Regards
Srini

On Sat, Jun 27, 2020 at 6:12 AM Eric Beabes 
wrote:

> My apologies...  After I set the 'maxOffsetsPerTrigger' to a value such as
> '20' it started working. Hopefully this will help someone. Thanks.
>
> On Fri, Jun 26, 2020 at 2:12 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> My Spark Structured Streaming job works fine when I set "startingOffsets"
>> to "latest". When I simply change it to "earliest" & specify a new "check
>> point directory", the job doesn't work. The states don't get timed out
>> after 10 minutes.
>>
>> While debugging I noticed that my 'state' logic is indeed getting
>> executed but states just don't time out - as they do when I use "latest".
>> Any reason why?
>>
>> Is this a known issue?
>>
>> *Note*: I've tried this under Spark 2.3 & 2.4
>>
>


[spark-structured-streaming] [stateful]

2020-06-14 Thread Srinivas V
Does stateful structured streaming work on a stand-alone spark cluster with
few nodes? Does it need hdfs ? If not how to get it working without hdfs ?

Regards
Srini


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas  wrote:

> Hi Srinivas,
>
> Reading from different brokers is possible but you need to connect to each
> Kafka cluster separately.
> Trying to mix connections to two different Kafka clusters in one
> subscriber is not supported. (I'm sure that it would give all kind of weird
> errors)
> The  "kafka.bootstrap.servers" option is there to indicate the potential
> many brokers of the *same* Kafka cluster.
>
> The way to address this is following the suggestion of German to create a
> subscriptions for each Kafka cluster you are talking to.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
>   .option("subscribe", "topic1, topic2")
>  .load()
>
> val df_cluster2 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
>   .option("subscribe", "topic3, topicn, topicn+1,")
>  .load()
>
> After acquiring the DataFrame, you can union them and treat all the data
> with a single process.
>
> val unifiedData = df_cluster1.union(df_cluster2)
> // apply further transformations on `unifiedData`
>
> kr, Gerard.
>
>
> :
>
>
>
> On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:
>
>> Thanks for the quick reply. This may work but I have like 5 topics to
>> listen to right now, I am trying to keep all topics in an array in a
>> properties file and trying to read all at once. This way it is dynamic and
>> you have one code block like below and you may add or delete topics from
>> the config file without changing code. If someone confirms that it does not
>> work, I would have to do something like you have provided.
>>
>> val df_cluster1 = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", 
>> "cluster1_host:cluster1_port,cluster2_host:port")
>>
>> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>>
>>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Thanks for the quick reply. This may work but I have like 5 topics to
listen to right now, I am trying to keep all topics in an array in a
properties file and trying to read all at once. This way it is dynamic and
you have one code block like below and you may add or delete topics from
the config file without changing code. If someone confirms that it does not
work, I would have to do something like you have provided.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers",
"cluster1_host:cluster1_port,cluster2_host:port")

.option("subscribe", "topic1, topic2,topic3,topic4,topic5")


[spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Hello,
 In Structured Streaming, is it possible to have one spark application with
one query to consume topics from multiple kafka clusters?

I am trying to consume two topics each from different Kafka Cluster, but it
gives one of the topics as an unknown topic and the job keeps running
without completing in Spark UI.

Is it not allowed in Spark 2.4.5?

Regards
Srini


Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
Ya, I had asked this question before. No one responded. By the way, what’s
your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <
mailinglist...@gmail.com> wrote:

> What is scary is this interface is marked as "experimental"
>
> @Experimental
> @InterfaceStability.Evolving
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, GroupState state) throws Exception;
> }
>
>
>
>
> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> Right, this is exactly how I've it right now. Problem is in the cluster
>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>> will see what I mean.
>>
>> I think how Zhang is using will work. Will try & revert.
>>
>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V  wrote:
>>
>>>
>>> You don’t need to have a separate class. I created that as it has lot of
>>> code and logic in my case.
>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>> Pasting it below for your quick reference:
>>>
>>> ```scala
>>> spark.streams.addListener(new StreamingQueryListener {
>>>   override def onQueryProgress(event: 
>>> StreamingQueryListener.QueryProgressEvent):
>>> Unit = {
>>> println(event.progress.id + " is on progress")
>>> println(s"My accu is ${myAcc.value} on query progress")
>>>   }
>>> ...
>>> })
>>>
>>> def mappingFunc(key: Long, values: Iterator[String], state:
>>> GroupState[Long]): ... = {
>>>   myAcc.add(1)
>>>   println(s">>> key: $key => state: ${state}")
>>> ...
>>> }
>>>
>>> val wordCounts = words
>>>   .groupByKey(v => ...)
>>>   .mapGroupsWithState(timeoutConf = 
>>> GroupStateTimeout.ProcessingTimeTimeout)(func
>>> = mappingFunc)
>>>
>>> val query = wordCounts.writeStream
>>>   .outputMode(OutputMode.Update)
>>>
>>>
>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>>> Great. I guess the trick is to use a separate class such as
>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>
>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:
>>>>
>>>>> The following Java codes can work in my cluster environment:
>>>>> ```
>>>>> .mapGroupsWithState((MapGroupsWithStateFunction>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>> myAcc.add(1);
>>>>> <...>
>>>>> state.update(newState);
>>>>> return new LeadingCharCount(key, newState);
>>>>> },
>>>>> Encoders.LONG(),
>>>>> Encoders.bean(LeadingCharCount.class),
>>>>> GroupStateTimeout.ProcessingTimeTimeout())
>>>>> ```
>>>>>
>>>>> Also works fine with my `StateUpdateTask`:
>>>>> ```
>>>>> .mapGroupsWithState(
>>>>> new StateUpdateTask(myAcc),
>>>>> Encoders.LONG(),
>>>>> Encoders.bean(LeadingCharCount.class),
>>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> public class StateUpdateTask
>>>>> implements MapGroupsWithStateFunction>>>> Long, LeadingCharCount> {
>>>>> private LongAccumulator myAccInTask;
>>>>>
>>>>> public StateUpdateTask(LongAccumulator acc) {
>>>>> this.myAccInTask = acc;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public LeadingCharCount call(String key, Iterator
>>>>> values, GroupState state) throws Exception {
>>>>> myAccInTask.add(1);
>>>>> <...>
>>>>> state.update(newState);
>>>>> return new LeadingCharCount(key, newState);
>>>>> }
>>>

Re: Using Spark Accumulators with Structured Streaming

2020-06-08 Thread Srinivas V
You don’t need to have a separate class. I created that as it has lot of
code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
spark.streams.addListener(new StreamingQueryListener {
  override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent):
Unit = {
println(event.progress.id + " is on progress")
println(s"My accu is ${myAcc.value} on query progress")
  }
...
})

def mappingFunc(key: Long, values: Iterator[String], state:
GroupState[Long]): ... = {
  myAcc.add(1)
  println(s">>> key: $key => state: ${state}")
...
}

val wordCounts = words
  .groupByKey(v => ...)
  .mapGroupsWithState(timeoutConf =
GroupStateTimeout.ProcessingTimeTimeout)(func
= mappingFunc)

val query = wordCounts.writeStream
  .outputMode(OutputMode.Update)


On Mon, Jun 8, 2020 at 11:14 AM Something Something <
mailinglist...@gmail.com> wrote:

> Great. I guess the trick is to use a separate class such as
> 'StateUpdateTask'. I will try that. My challenge is to convert this into
> Scala. Will try it out & revert. Thanks for the tips.
>
> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:
>
>> The following Java codes can work in my cluster environment:
>> ```
>> .mapGroupsWithState((MapGroupsWithStateFunction> LeadingCharCount>) (key, values, state) -> {
>> myAcc.add(1);
>> <...>
>> state.update(newState);
>> return new LeadingCharCount(key, newState);
>> },
>> Encoders.LONG(),
>> Encoders.bean(LeadingCharCount.class),
>> GroupStateTimeout.ProcessingTimeTimeout())
>> ```
>>
>> Also works fine with my `StateUpdateTask`:
>> ```
>> .mapGroupsWithState(
>> new StateUpdateTask(myAcc),
>> Encoders.LONG(),
>> Encoders.bean(LeadingCharCount.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> public class StateUpdateTask
>> implements MapGroupsWithStateFunction> LeadingCharCount> {
>> private LongAccumulator myAccInTask;
>>
>> public StateUpdateTask(LongAccumulator acc) {
>> this.myAccInTask = acc;
>> }
>>
>> @Override
>> public LeadingCharCount call(String key, Iterator values,
>> GroupState state) throws Exception {
>> myAccInTask.add(1);
>> <...>
>> state.update(newState);
>> return new LeadingCharCount(key, newState);
>> }
>> }
>> ```
>>
>> --
>> Cheers,
>> -z
>>
>> On Tue, 2 Jun 2020 10:28:36 +0800
>> ZHANG Wei  wrote:
>>
>> > Yes, verified on the cluster with 5 executors.
>> >
>> > --
>> > Cheers,
>> > -z
>> >
>> > On Fri, 29 May 2020 11:16:12 -0700
>> > Something Something  wrote:
>> >
>> > > Did you try this on the Cluster? Note: This works just fine under
>> 'Local'
>> > > mode.
>> > >
>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei 
>> wrote:
>> > >
>> > > > I can't reproduce the issue with my simple code:
>> > > > ```scala
>> > > > spark.streams.addListener(new StreamingQueryListener {
>> > > >   override def onQueryProgress(event:
>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>> > > > println(event.progress.id + " is on progress")
>> > > > println(s"My accu is ${myAcc.value} on query progress")
>> > > >   }
>> > > > ...
>> > > > })
>> > > >
>> > > > def mappingFunc(key: Long, values: Iterator[String], state:
>> > > > GroupState[Long]): ... = {
>> > > >   myAcc.add(1)
>> > > >   println(s">>> key: $key => state: ${state}")
>> > > > ...
>> > > > }
>> > > >
>> > > > val wordCounts = words
>> > > >   .groupByKey(v => ...)
>> > > >   .mapGroupsWithState(timeoutConf =
>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>> > > >
>> > > > val query = wordCounts.writeStream
>> > > >   

Re: Using Spark Accumulators with Structured Streaming

2020-05-30 Thread Srinivas V
It’s in constructor

On Sat, May 30, 2020 at 4:15 AM Something Something <
mailinglist...@gmail.com> wrote:

> I mean... I don't see any reference to 'accumulator' in your Class
> *definition*. How can you access it in the class if it's not in your
> definition of class:
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
> InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting
> to see 'accumulator' here in the definition.*
>
> @Override
> public ModelUpdate call(String productId, Iterator
> eventsIterator, GroupState state) {
> }
> }
>
> On Fri, May 29, 2020 at 1:08 PM Srinivas V  wrote:
>
>>
>> Yes, accumulators are updated in the call method of StateUpdateTask. Like
>> when state times out or when the data is pushed to next Kafka topic etc.
>>
>> On Fri, May 29, 2020 at 11:55 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Thanks! I will take a look at the link. Just one question, you seem to
>>> be passing 'accumulators' in the constructor but where do you use it in the
>>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>>> question is dumb. I must be missing something. Thanks for your help so far.
>>> It's been useful.
>>>
>>>
>>> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>>>
>>>> Yes it is application specific class. This is how java Spark Functions
>>>> work.
>>>> You can refer to this code in the documentation:
>>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>>
>>>> public class StateUpdateTask implements
>>>> MapGroupsWithStateFunction>>> ModelUpdate> {
>>>>
>>>> @Override
>>>> public ModelUpdate call(String productId, Iterator
>>>> eventsIterator, GroupState state) {
>>>> }
>>>> }
>>>>
>>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>>> documentation about doing it this way. Can you please direct me to some
>>>>> documentation. Thanks.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V 
>>>>> wrote:
>>>>>
>>>>>> yes, I am using stateful structured streaming. Yes similar to what
>>>>>> you do. This is in Java
>>>>>> I do it this way:
>>>>>> Dataset productUpdates = watermarkedDS
>>>>>> .groupByKey(
>>>>>> (MapFunction) event
>>>>>> -> event.getId(), Encoders.STRING())
>>>>>> .mapGroupsWithState(
>>>>>> new
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> appConfig, accumulators),
>>>>>> Encoders.bean(ModelStateInfo.class),
>>>>>> Encoders.bean(ModelUpdate.class),
>>>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> StateUpdateTask contains the update method.
>>>>>>
>>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, that's exactly how I am creating them.
>>>>>>>
>>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>>> you've something like this?
>>>>>>>
>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>> updateAcrossEvents
>>>>>>>   )
>>>>>>>
>>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>>>> streaming applications it works as expected.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
Yes, accumulators are updated in the call method of StateUpdateTask. Like
when state times out or when the data is pushed to next Kafka topic etc.

On Fri, May 29, 2020 at 11:55 PM Something Something <
mailinglist...@gmail.com> wrote:

> Thanks! I will take a look at the link. Just one question, you seem to be
> passing 'accumulators' in the constructor but where do you use it in the
> StateUpdateTask class? I am still missing that connection. Sorry, if my
> question is dumb. I must be missing something. Thanks for your help so far.
> It's been useful.
>
>
> On Fri, May 29, 2020 at 6:51 AM Srinivas V  wrote:
>
>> Yes it is application specific class. This is how java Spark Functions
>> work.
>> You can refer to this code in the documentation:
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>
>> public class StateUpdateTask implements
>> MapGroupsWithStateFunction> ModelUpdate> {
>>
>> @Override
>> public ModelUpdate call(String productId, Iterator
>> eventsIterator, GroupState state) {
>> }
>> }
>>
>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> I am assuming StateUpdateTask is your application specific class. Does
>>> it have 'updateState' method or something? I googled but couldn't find any
>>> documentation about doing it this way. Can you please direct me to some
>>> documentation. Thanks.
>>>
>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>>>
>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>> do. This is in Java
>>>> I do it this way:
>>>> Dataset productUpdates = watermarkedDS
>>>> .groupByKey(
>>>> (MapFunction) event ->
>>>> event.getId(), Encoders.STRING())
>>>> .mapGroupsWithState(
>>>> new
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> appConfig, accumulators),
>>>> Encoders.bean(ModelStateInfo.class),
>>>> Encoders.bean(ModelUpdate.class),
>>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> StateUpdateTask contains the update method.
>>>>
>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Yes, that's exactly how I am creating them.
>>>>>
>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>> you've something like this?
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> updateAcrossEvents
>>>>>   )
>>>>>
>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>>>> streaming applications it works as expected.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V 
>>>>> wrote:
>>>>>
>>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>>> am getting the values printed in my driver log as well as sent to 
>>>>>> Grafana.
>>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>>> yarn cluster(not local Mac) where I submit from master node. It should 
>>>>>> work
>>>>>> the same for cluster mode as well.
>>>>>> Create accumulators like this:
>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>
>>>>>>
>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>> Accumulators. The other standard counters such as
>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>
>>>>&g

Re: Using Spark Accumulators with Structured Streaming

2020-05-29 Thread Srinivas V
Yes it is application specific class. This is how java Spark Functions work.
You can refer to this code in the documentation:
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

public class StateUpdateTask implements MapGroupsWithStateFunction {

@Override
public ModelUpdate call(String productId, Iterator
eventsIterator, GroupState state) {
}
}

On Thu, May 28, 2020 at 10:59 PM Something Something <
mailinglist...@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does it
> have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V  wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what you
>> do. This is in Java
>> I do it this way:
>> Dataset productUpdates = watermarkedDS
>> .groupByKey(
>> (MapFunction) event ->
>> event.getId(), Encoders.STRING())
>> .mapGroupsWithState(
>> new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>> Encoders.bean(ModelStateInfo.class),
>> Encoders.bean(ModelUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're 
>>> experiencing this only under 'Stateful Structured Streaming'. In other 
>>> streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>>>
>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>> the same for cluster mode as well.
>>>> Create accumulators like this:
>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>
>>>>
>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>> in your code? I am talking about the Application Specific Accumulators. 
>>>>> The
>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>> getting populated correctly!
>>>>>
>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>> values are coming there.
>>>>>>
>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> mailinglist...@gmail.com> wrote:
>>>>>>
>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>
>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>>>
>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation 
>>>>>>>> for
>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>>>> replace
>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>&

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
Giving the code below:
//accumulators is a class level variable in driver.

 sparkSession.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
logger.info("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
logger.info("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {

accumulators.eventsReceived(queryProgress.progress().numInputRows());
long eventsReceived = 0;
long eventsExpired = 0;
long eventSentSuccess = 0;
try {
eventsReceived =
accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
eventsExpired =
accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
eventSentSuccess =
accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
} catch (MissingKeyException e) {
logger.error("Accumulator key not found due to
Exception {}", e.getMessage());
}
logger.info("Events Received:{}", eventsReceived);
logger.info("Events State Expired:{}", eventsExpired);
logger.info("Events Sent Success:{}", eventSentSuccess);
logger.info("Query made progress - batchId: {}
numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
durationMs:{}" ,
queryProgress.progress().batchId(),
queryProgress.progress().numInputRows(),
queryProgress.progress().inputRowsPerSecond(),
queryProgress.progress().processedRowsPerSecond(),
queryProgress.progress().durationMs());


On Thu, May 28, 2020 at 7:04 PM ZHANG Wei  wrote:

> May I get how the accumulator is accessed in the method
> `onQueryProgress()`?
>
> AFAICT, the accumulator is incremented well. There is a way to verify that
> in cluster like this:
> ```
> // Add the following while loop before invoking awaitTermination
> while (true) {
>   println("My acc: " + myAcc.value)
>   Thread.sleep(5 * 1000)
> }
>
> //query.awaitTermination()
> ```
>
> And the accumulator value updated can be found from driver stdout.
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 17:12:48 +0530
> Srinivas V  wrote:
>
> > yes, I am using stateful structured streaming. Yes similar to what you
> do.
> > This is in Java
> > I do it this way:
> > Dataset productUpdates = watermarkedDS
> > .groupByKey(
> > (MapFunction) event ->
> > event.getId(), Encoders.STRING())
> > .mapGroupsWithState(
> > new
> >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> > Encoders.bean(ModelStateInfo.class),
> > Encoders.bean(ModelUpdate.class),
> > GroupStateTimeout.ProcessingTimeTimeout());
> >
> > StateUpdateTask contains the update method.
> >
> > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > mailinglist...@gmail.com> wrote:
> >
> > > Yes, that's exactly how I am creating them.
> > >
> > > Question... Are you using 'Stateful Structured Streaming' in which
> you've
> > > something like this?
> > >
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > updateAcrossEvents
> > >   )
> > >
> > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> experiencing this only under 'Stateful Structured Streaming'. In other
> streaming applications it works as expected.
> > >
> > >
> > >
> > > On Wed, May 27, 2020 at 9:01 AM Srinivas V 
> wrote:
> > >
> > >> Yes, I am talking about Application specific Accumulators. Actually I
> am
> > >> getting the values printed in my driver log as well as sent to
> Grafana. Not
> > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> yarn
> > >> cluster(not local Mac) where I submit from master node. It should
> work the
> > >> same for cluster mode as well.
> > >> Create accumulators like this:
> > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > >>
> > >>
> 

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread Srinivas V
yes, I am using stateful structured streaming. Yes similar to what you do.
This is in Java
I do it this way:
Dataset productUpdates = watermarkedDS
.groupByKey(
(MapFunction) event ->
event.getId(), Encoders.STRING())
.mapGroupsWithState(
new
StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
appConfig, accumulators),
Encoders.bean(ModelStateInfo.class),
Encoders.bean(ModelUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <
mailinglist...@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which you've
> something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> updateAcrossEvents
>   )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing 
> this only under 'Stateful Structured Streaming'. In other streaming 
> applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I am
>> getting the values printed in my driver log as well as sent to Grafana. Not
>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>> cluster(not local Mac) where I submit from master node. It should work the
>> same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting computed in
>>> your code? I am talking about the Application Specific Accumulators. The
>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>> getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>>>
>>>> Hello,
>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>> But one consolation is that when I send metrics to Graphana, the values
>>>> are coming there.
>>>>
>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> mailinglist...@gmail.com> wrote:
>>>>
>>>>> No this is not working even if I use LongAccumulator.
>>>>>
>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>>>
>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
>>>>>> replace
>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
>>>>>> LongAccumulator[3]
>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>
>>>>>> ---
>>>>>> Cheers,
>>>>>> -z
>>>>>> [1]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>> [2]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>> [3]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>
>>>>>> 
>>>>>> From: Something Something 
>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> To: spark-user
>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>
>>>>>> Can someone from Spark Development team tell me if this functionality
>>>>>> is supported and tested? I've spent a lot of time on this but can't get 
>>>>>> it
>>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>>> accumulators. Here's the definition:
>

Re: Using Spark Accumulators with Structured Streaming

2020-05-27 Thread Srinivas V
Yes, I am talking about Application specific Accumulators. Actually I am
getting the values printed in my driver log as well as sent to Grafana. Not
sure where and when I saw 0 before. My deploy mode is “client” on a yarn
cluster(not local Mac) where I submit from master node. It should work the
same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <
mailinglist...@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting computed in
> your code? I am talking about the Application Specific Accumulators. The
> other standard counters such as 'event.progress.inputRowsPerSecond' are
> getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>> But one consolation is that when I send metrics to Graphana, the values
>> are coming there.
>>
>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> No this is not working even if I use LongAccumulator.
>>>
>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>>
>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>>> atomic or thread safe. I'm wondering if the implementation for
>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>> and test if the StreamingListener and other codes are able to work?
>>>>
>>>> ---
>>>> Cheers,
>>>> -z
>>>> [1]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>> [2]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>> [3]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>
>>>> 
>>>> From: Something Something 
>>>> Sent: Saturday, May 16, 2020 0:38
>>>> To: spark-user
>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>
>>>> Can someone from Spark Development team tell me if this functionality
>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>> to work. Just to add more context, we've our own Accumulator class that
>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>> accumulators. Here's the definition:
>>>>
>>>>
>>>> class CollectionLongAccumulator[T]
>>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>
>>>> When the job begins we register an instance of this class:
>>>>
>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>
>>>> Is this working under Structured Streaming?
>>>>
>>>> I will keep looking for alternate approaches but any help would be
>>>> greatly appreciated. Thanks.
>>>>
>>>>
>>>>
>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> mailinglist...@gmail.com<mailto:mailinglist...@gmail.com>> wrote:
>>>>
>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>>> my StreamingListener. Here's the code:
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> updateAcrossEvents
>>>>   )
>>>>
>>>>
>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>> StreamingListener which writes values of the accumulators in
>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>> ZERO!
>>>>
>>>> When I added log statements in the updateAcrossEvents, I could see that
>>>> these accumulators are getting incremented as expected.
>>>>
>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>> works fine which implies that the Accumulators are not getting distributed
>>>> correctly - or something like that!
>>>>
>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>
>>>>
>>>>
>>>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Srinivas V
Hello,
Even for me it comes as 0 when I print in OnQueryProgress. I use
LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are
coming there.

On Tue, May 26, 2020 at 3:10 AM Something Something <
mailinglist...@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>> atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> 
>> From: Something Something 
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality is
>> supported and tested? I've spent a lot of time on this but can't get it to
>> work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in the
>> updateAcrossEvents method but they are always 0 when I try to print them in
>> my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> updateAcrossEvents
>>   )
>>
>>
>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>> StreamingListener which writes values of the accumulators in
>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>> ZERO!
>>
>> When I added log statements in the updateAcrossEvents, I could see that
>> these accumulators are getting incremented as expected.
>>
>> This only happens when I run in the 'Cluster' mode. In Local mode it
>> works fine which implies that the Accumulators are not getting distributed
>> correctly - or something like that!
>>
>> Note: I've seen quite a few answers on the Web that tell me to perform an
>> "Action". That's not a solution here. This is a 'Stateful Structured
>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>
>>
>>
>>


[structured streaming] [stateful] Null value appeared in non-nullable field

2020-05-23 Thread Srinivas V
Hello,
 I am listening to a kaka topic through Spark Structured Streaming [2.4.5].
After processing messages for few mins, I am getting below
NullPointerException.I have three beans used here 1.Event 2.StateInfo
3.SessionUpdateInfo. I am suspecting that the problem is with StateInfo,
when it is writing state to hdfs it might be failing or it could be failing
while I update accumulators. But why would it fail for some events but not
for others? Once it fails, it stops the Streaming query.
When I send all fields null except EevntId in my testing, it works fine.
Any idea what could be happening?
Attaching the full stack trace as well.
This is a - yarn cluster, saving state in HDFS.

Exception:

20/05/23 09:46:46 ERROR
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask:
Aborting commit for partition 42 (task 118121, attempt 9, stage 824.0)
20/05/23 09:46:46 ERROR
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask:
Aborted commit for partition 42 (task 118121, attempt 9, stage 824.0)
20/05/23 09:46:46 ERROR org.apache.spark.executor.Executor: Exception
in task 42.9 in stage 824.0 (TID 118121)
java.lang.NullPointerException: Null value appeared in non-nullable field:
top level input bean
If the schema is inferred from a Scala tuple/case class, or a Java
bean, please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.serializefromobject_doConsume_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/23 09:47:48 ERROR
org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL TERM



Regards

Srini V


Re: GrupState limits

2020-05-12 Thread Srinivas V
If you are talking about total number of objects the state can hold, that
depends on the executor memory you have on your cluster apart from rest of
the memory required for processing. The state is stored in hdfs and
retrieved while processing the next events.
If you maintain million objects with each 20 bytes , it would be 20MB,
which is pretty reasonable to maintain in a executor allocated with few GB
memory. But if you need heavy objects to be stored you need to do the math.
And also it will have a cost in transferring this data back and forth to
hdfs checkpoint location.

Regards
Srini

On Tue, May 12, 2020 at 2:48 AM tleilaxu  wrote:

> Hi,
> I am tracking states in my Spark streaming application with
> MapGroupsWithStateFunction described here:
> https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/streaming/GroupState.html
> Which are the limiting factors on the number of states a job can track at
> the same time? Is it memory? Could be a bounded data structure in the
> internal implementation? Anything else ...
> You might have valuable input on this while I am trying to setup and test
> this.
>
> Thanks,
> Arnold
>


Re: Spark structured streaming - performance tuning

2020-05-08 Thread Srinivas V
Anyone else can answer below questions on performance tuning Structured
streaming?
@Jacek?

On Sun, May 3, 2020 at 12:07 AM Srinivas V  wrote:

> Hi Alex, read the book , it is a good one but i don’t see things which I
> strongly want to understand.
> You are right on the partition and tasks.
> 1.How to use coalesce with spark structured streaming ?
>
> Also I want to ask few more questions,
> 2. How to restrict number of executors on structured streaming?
>  —num-executors is minimum is it ?
> To cap max, can I use spark.dynamicAllocation.maxExecutors ?
>
> 3. Does other streaming properties hold good for structured streaming?
> Like spark.streaming.dynamicAllocation.enabled ?
> If not what are the ones it takes into consideration?
>
> 4. Does structured streaming 2.4.5 allow dynamicAllocation of executors/
> cores? In case of Kafka consumer, when the cluster has to scale down, does
> it reconfigure the mapping of executors cores to kaka partitions?
>
> 5. Why spark srtructured  Streaming web ui (SQL tab) is not so informative
> like streaming tab of Spark streaming ?
>
> It would be great if these questions are answered, otherwise the only
> option left would be to go through the spark code and figure out.
>
> On Sat, Apr 18, 2020 at 1:09 PM Alex Ott  wrote:
>
>> Just to clarify - I didn't write this explicitly in my answer. When you're
>> working with Kafka, every partition in Kafka is mapped into Spark
>> partition. And in Spark, every partition is mapped into task.   But you
>> can
>> use `coalesce` to decrease the number of Spark partitions, so you'll have
>> less tasks...
>>
>> Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
>>  SV> Thank you Alex. I will check it out and let you know if I have any
>> questions
>>
>>  SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:
>>
>>  SV> http://shop.oreilly.com/product/0636920047568.do has quite good
>> information
>>  SV> on it.  For Kafka, you need to start with approximation that
>> processing of
>>  SV> each partition is a separate task that need to be executed, so
>> you need to
>>  SV> plan number of cores correspondingly.
>>  SV>
>>  SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>>  SV>  SV> Hello,
>>  SV>  SV> Can someone point me to a good video or document which
>> takes about performance tuning for structured streaming app?
>>  SV>  SV> I am looking especially for listening to Kafka topics say 5
>> topics each with 100 portions .
>>  SV>  SV> Trying to figure out best cluster size and number of
>> executors and cores required.
>>
>>
>> --
>> With best wishes,Alex Ott
>> http://alexott.net/
>> Twitter: alexott_en (English), alexott (Russian)
>>
>


Re: Spark structured streaming - performance tuning

2020-05-02 Thread Srinivas V
Hi Alex, read the book , it is a good one but i don’t see things which I
strongly want to understand.
You are right on the partition and tasks.
1.How to use coalesce with spark structured streaming ?

Also I want to ask few more questions,
2. How to restrict number of executors on structured streaming?
 —num-executors is minimum is it ?
To cap max, can I use spark.dynamicAllocation.maxExecutors ?

3. Does other streaming properties hold good for structured streaming?
Like spark.streaming.dynamicAllocation.enabled ?
If not what are the ones it takes into consideration?

4. Does structured streaming 2.4.5 allow dynamicAllocation of executors/
cores? In case of Kafka consumer, when the cluster has to scale down, does
it reconfigure the mapping of executors cores to kaka partitions?

5. Why spark srtructured  Streaming web ui (SQL tab) is not so informative
like streaming tab of Spark streaming ?

It would be great if these questions are answered, otherwise the only
option left would be to go through the spark code and figure out.

On Sat, Apr 18, 2020 at 1:09 PM Alex Ott  wrote:

> Just to clarify - I didn't write this explicitly in my answer. When you're
> working with Kafka, every partition in Kafka is mapped into Spark
> partition. And in Spark, every partition is mapped into task.   But you can
> use `coalesce` to decrease the number of Spark partitions, so you'll have
> less tasks...
>
> Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
>  SV> Thank you Alex. I will check it out and let you know if I have any
> questions
>
>  SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:
>
>  SV> http://shop.oreilly.com/product/0636920047568.do has quite good
> information
>  SV> on it.  For Kafka, you need to start with approximation that
> processing of
>  SV> each partition is a separate task that need to be executed, so
> you need to
>  SV> plan number of cores correspondingly.
>  SV>
>  SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>  SV>  SV> Hello,
>  SV>  SV> Can someone point me to a good video or document which takes
> about performance tuning for structured streaming app?
>  SV>  SV> I am looking especially for listening to Kafka topics say 5
> topics each with 100 portions .
>  SV>  SV> Trying to figure out best cluster size and number of
> executors and cores required.
>
>
> --
> With best wishes,Alex Ott
> http://alexott.net/
> Twitter: alexott_en (English), alexott (Russian)
>


Re: Spark structured streaming - performance tuning

2020-04-17 Thread Srinivas V
Thank you Alex. I will check it out and let you know if I have any questions

On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:

> http://shop.oreilly.com/product/0636920047568.do has quite good
> information
> on it.  For Kafka, you need to start with approximation that processing of
> each partition is a separate task that need to be executed, so you need to
> plan number of cores correspondingly.
>
> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>  SV> Hello,
>  SV> Can someone point me to a good video or document which takes about
> performance tuning for structured streaming app?
>  SV> I am looking especially for listening to Kafka topics say 5 topics
> each with 100 portions .
>  SV> Trying to figure out best cluster size and number of executors and
> cores required.
>
>
> --
> With best wishes,Alex Ott
> http://alexott.net/
> Twitter: alexott_en (English), alexott (Russian)
>


Spark structured streaming - performance tuning

2020-04-16 Thread Srinivas V
Hello,
Can someone point me to a good video or document which takes about
performance tuning for structured streaming app?
I am looking especially for listening to Kafka topics say 5 topics each
with 100 portions .
Trying to figure out best cluster size and number of executors and cores
required.

Regards
Srini


Re: Spark Streaming not working

2020-04-10 Thread Srinivas V
Check if your broker details are correct, verify if you have network
connectivity to your client box and Kafka broker server host.

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-31 Thread Srinivas V
Never mind. It got resolved after I removed extra two getter methods (to
calculate duration) I created in my State specific Java bean
(ProductSessionInformation). But I am surprised why it has created so much
problem. I guess when this bean is converted to Scala class it may not be
taking care of non getter methods of the fields defined? Still how is that
causing the state object get corrupt so much?


On Sat, Mar 28, 2020 at 7:46 PM Srinivas V  wrote:

> Ok, I will try to create some simple code to reproduce, if I can. Problem
> is that I am adding this code in an existing big project with several
> dependencies with spark streaming older version(2.2) on root level etc.
>
> Also, I observed that there is @Experimental on GroupState class. What
> state is it in now? Several people using this feature in prod?
>
>
> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim 
> wrote:
>
>> I have't heard known issue for this - that said, this may require new
>> investigation which is not possible or require huge effort without simple
>> reproducer.
>>
>> Contributors (who are basically volunteers) may not want to struggle to
>> reproduce from your partial information - I'd recommend you to spend your
>> time to help volunteers starting from simple reproducer, if you are stuck
>> at it and have to resolve it.
>>
>> Could you please get rid of the business logic which you may want to
>> redact, and provide full of source code which reproduces the bug?
>>
>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>>
>>> Sorry for typos , correcting them below
>>>
>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>>
>>>> Sorry I was just changing some names not to send exact names. Please
>>>> ignore that. I am really struggling with this since couple of days. Can
>>>> this happen due to
>>>> 1. some of the values being null or
>>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>>> 3. Not enough memory ?
>>>> BTW, I am using same names in my code.
>>>>
>>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>
>>>>> Well, the code itself doesn't seem to be OK - you're using
>>>>> ProductStateInformation as the class of State whereas you provide
>>>>> ProductSessionInformation to Encoder for State.
>>>>>
>>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>>>>> your class, and call .schema() with the return value to see how it
>>>>>> transforms to the schema in Spark SQL. The schema must be consistent 
>>>>>> across
>>>>>> multiple JVM runs to make it work properly, but I suspect it doesn't 
>>>>>> retain
>>>>>> the order.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>>>>>> wrote:
>>>>>>
>>>>>>> I am listening to Kafka topic with a structured streaming
>>>>>>> application with Java,  testing it on my local Mac.
>>>>>>> When I retrieve back GroupState object
>>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>>> object, some are interchanging some are default and some are junk 
>>>>>>> values.
>>>>>>>
>>>>>>> See this example below:
>>>>>>> While setting I am setting:
>>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>>> numberOfEvents=1}
>>>>>>>
>>>>>>> When I retrieve it back, it comes like this:
>>>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>>>> here' productId='222112345', priority='222112345',
>>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>>
>>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>>> of days. Immediate help is appreciated.

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Ok, I will try to create some simple code to reproduce, if I can. Problem
is that I am adding this code in an existing big project with several
dependencies with spark streaming older version(2.2) on root level etc.

Also, I observed that there is @Experimental on GroupState class. What
state is it in now? Several people using this feature in prod?


On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim 
wrote:

> I have't heard known issue for this - that said, this may require new
> investigation which is not possible or require huge effort without simple
> reproducer.
>
> Contributors (who are basically volunteers) may not want to struggle to
> reproduce from your partial information - I'd recommend you to spend your
> time to help volunteers starting from simple reproducer, if you are stuck
> at it and have to resolve it.
>
> Could you please get rid of the business logic which you may want to
> redact, and provide full of source code which reproduces the bug?
>
> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>
>> Sorry for typos , correcting them below
>>
>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>
>>> Sorry I was just changing some names not to send exact names. Please
>>> ignore that. I am really struggling with this since couple of days. Can
>>> this happen due to
>>> 1. some of the values being null or
>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>> 3. Not enough memory ?
>>> BTW, I am using same names in my code.
>>>
>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
>>>> Well, the code itself doesn't seem to be OK - you're using
>>>> ProductStateInformation as the class of State whereas you provide
>>>> ProductSessionInformation to Encoder for State.
>>>>
>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>
>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>>>> class, and call .schema() with the return value to see how it transforms 
>>>>> to
>>>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>>>
>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>>>>> wrote:
>>>>>
>>>>>> I am listening to Kafka topic with a structured streaming application
>>>>>> with Java,  testing it on my local Mac.
>>>>>> When I retrieve back GroupState object
>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>> object, some are interchanging some are default and some are junk values.
>>>>>>
>>>>>> See this example below:
>>>>>> While setting I am setting:
>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>> numberOfEvents=1}
>>>>>>
>>>>>> When I retrieve it back, it comes like this:
>>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>>> here' productId='222112345', priority='222112345',
>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>
>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>> of days. Immediate help is appreciated.
>>>>>>
>>>>>> code snippet:
>>>>>>
>>>>>>
>>>>>> public class StateUpdateTask implements 
>>>>>> MapGroupsWithStateFunction>>>>> ProductSessionUpdate> {
>>>>>>
>>>>>>  @Override
>>>>>> public ProductSessionUpdate call(String productId, Iterator 
>>>>>> eventsIterator, GroupState state) throws 
>>>>>> Exception {
>>>>>> {
>>>>>>
>>>>>>
>>>>>>
>>>>>>   if (state.hasTimedOut()) {
>>>>>>
>>>>>> //
>>>>>>
>>>>>> }else{
>>>>>>
>>&g

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Sorry for typos , correcting them below

On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:

> Sorry I was just changing some names not to send exact names. Please
> ignore that. I am really struggling with this since couple of days. Can
> this happen due to
> 1. some of the values being null or
> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
> 3. Not enough memory ?
> BTW, I am using same names in my code.
>
> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Well, the code itself doesn't seem to be OK - you're using
>> ProductStateInformation as the class of State whereas you provide
>> ProductSessionInformation to Encoder for State.
>>
>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>> class, and call .schema() with the return value to see how it transforms to
>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>
>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>>>
>>>> I am listening to Kafka topic with a structured streaming application
>>>> with Java,  testing it on my local Mac.
>>>> When I retrieve back GroupState object with
>>>> state.get(), it is giving some random values for the fields in the object,
>>>> some are interchanging some are default and some are junk values.
>>>>
>>>> See this example below:
>>>> While setting I am setting:
>>>> ProductSessionInformation{requestId='222112345', productId='222112345',
>>>> priority='0', firstEventTimeMillis=1585312384,
>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>> numberOfEvents=1}
>>>>
>>>> When I retrieve it back, it comes like this:
>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>> here' productId='222112345', priority='222112345',
>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>
>>>> Any clue why it might be happening? I am stuck with this for couple of
>>>> days. Immediate help is appreciated.
>>>>
>>>> code snippet:
>>>>
>>>>
>>>> public class StateUpdateTask implements MapGroupsWithStateFunction>>> Event, ProductStateInformation, ProductSessionUpdate> {
>>>>
>>>>  @Override
>>>> public ProductSessionUpdate call(String productId, Iterator 
>>>> eventsIterator, GroupState state) throws 
>>>> Exception {
>>>> {
>>>>
>>>>
>>>>
>>>>   if (state.hasTimedOut()) {
>>>>
>>>> //
>>>>
>>>> }else{
>>>>
>>>> if (state.exists()) {
>>>> ProductStateInformation oldSession = state.get();
>>>> System.out.println("State for productId:"+productId + " with old 
>>>> values "+oldSession);
>>>>
>>>> }
>>>>
>>>>
>>>> public class EventsApp implements Serializable{
>>>>
>>>> public void run(String[] args) throws Exception {
>>>>
>>>> ...
>>>>
>>>>
>>>> Dataset dataSet = sparkSession
>>>> .readStream()
>>>> .format("kafka")
>>>> .option("kafka.bootstrap.servers", "localhost")
>>>> .option("startingOffsets","latest")
>>>> .option("failOnDataLoss", "false")
>>>> .option("subscribe", "topic1,topic2")
>>>> .option("includeTimestamp", true)
>>>>
>>>> .load();
>>>>
>>>>  eventsDS.groupByKey(
>>>> new MapFunction() {
>>>> @Override public String call(Event event) {
>>>> return event.getProductId();
>>>> }
>>>> }, Encoders.STRING())
>>>> .mapGroupsWithState(
>>>> new StateUpdateTask(3),
>>>> Encoders.bean(ProductSessionInformation.class),
>>>>  

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Sorry I was just changing some names not to send exact names. Please ignore
that. I am really struggling with this sine couple of days. Can this happen
due to
1. some of the values being null or
2.UTF8  issue ? Or some sterilization/ deserilization issue ?
3. Not enough memory ?
I am using same names in my code.

On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim 
wrote:

> Well, the code itself doesn't seem to be OK - you're using
> ProductStateInformation as the class of State whereas you provide
> ProductSessionInformation to Encoder for State.
>
> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>> class, and call .schema() with the return value to see how it transforms to
>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>> runs to make it work properly, but I suspect it doesn't retain the order.
>>
>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>>
>>> I am listening to Kafka topic with a structured streaming application
>>> with Java,  testing it on my local Mac.
>>> When I retrieve back GroupState object with
>>> state.get(), it is giving some random values for the fields in the object,
>>> some are interchanging some are default and some are junk values.
>>>
>>> See this example below:
>>> While setting I am setting:
>>> ProductSessionInformation{requestId='222112345', productId='222112345',
>>> priority='0', firstEventTimeMillis=1585312384,
>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>> numberOfEvents=1}
>>>
>>> When I retrieve it back, it comes like this:
>>> ProductSessionInformation{requestId='some junk characters are coming
>>> here' productId='222112345', priority='222112345',
>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>
>>> Any clue why it might be happening? I am stuck with this for couple of
>>> days. Immediate help is appreciated.
>>>
>>> code snippet:
>>>
>>>
>>> public class StateUpdateTask implements MapGroupsWithStateFunction>> Event, ProductStateInformation, ProductSessionUpdate> {
>>>
>>>  @Override
>>> public ProductSessionUpdate call(String productId, Iterator 
>>> eventsIterator, GroupState state) throws Exception 
>>> {
>>> {
>>>
>>>
>>>
>>>   if (state.hasTimedOut()) {
>>>
>>> //
>>>
>>> }else{
>>>
>>> if (state.exists()) {
>>> ProductStateInformation oldSession = state.get();
>>> System.out.println("State for productId:"+productId + " with old values 
>>> "+oldSession);
>>>
>>> }
>>>
>>>
>>> public class EventsApp implements Serializable{
>>>
>>> public void run(String[] args) throws Exception {
>>>
>>> ...
>>>
>>>
>>> Dataset dataSet = sparkSession
>>> .readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "localhost")
>>> .option("startingOffsets","latest")
>>> .option("failOnDataLoss", "false")
>>> .option("subscribe", "topic1,topic2")
>>> .option("includeTimestamp", true)
>>>
>>> .load();
>>>
>>>  eventsDS.groupByKey(
>>> new MapFunction() {
>>> @Override public String call(Event event) {
>>> return event.getProductId();
>>> }
>>> }, Encoders.STRING())
>>> .mapGroupsWithState(
>>> new StateUpdateTask(3),
>>> Encoders.bean(ProductSessionInformation.class),
>>> Encoders.bean(ProductSessionUpdate.class),
>>> GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> ...
>>>
>>>
>>> StreamingQuery query = productUpdates
>>> .writeStream()
>>> .foreach(new ForeachWriter() {
>>> @Override
>>> public boolean open(long l, long l1) {return true;}
>>>
>>> @Override
>>> public void process(ProductSessionUpdate productSessionUpdate) {
>>> logger.info("-> query process: "+ productSessionUpdate);
>>> }
>>>
>>> @Override
>>> public void close(Throwable throwable) {}
>>> })
>>> .outputMode("update")
>>> .option("checkpointLocation", checkpointDir)
>>> .start();
>>>
>>> query.awaitTermination();
>>>
>>> }
>>>
>>> public class ProductStateInformation implements Serializable {
>>>
>>> protected String requestId;
>>> protected String productId;
>>> protected String priority;
>>> protected long firstEventTimeMillis;
>>> protected long lastEventTimeMillis;
>>> protected long firstReceivedTimeMillis;
>>> protected int numberOfEvents;
>>>
>>> ...//getter setters
>>>
>>> }
>>>
>>> These are are the versions I am using:
>>>
>>> 2.3.1
>>> 2.4.3
>>>
>>> 2.6.60.10.2.0
>>>
>>> 3.0.3
>>>
>>>


spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Srinivas V
I am listening to Kafka topic with a structured streaming application with
Java,  testing it on my local Mac.
When I retrieve back GroupState object with
state.get(), it is giving some random values for the fields in the object,
some are interchanging some are default and some are junk values.

See this example below:
While setting I am setting:
ProductSessionInformation{requestId='222112345', productId='222112345',
priority='0', firstEventTimeMillis=1585312384,
lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
numberOfEvents=1}

When I retrieve it back, it comes like this:
ProductSessionInformation{requestId='some junk characters are coming here'
productId='222112345', priority='222112345',
firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
firstReceivedTimeMillis=1585312384, numberOfEvents=1}

Any clue why it might be happening? I am stuck with this for couple of
days. Immediate help is appreciated.

code snippet:


public class StateUpdateTask implements
MapGroupsWithStateFunction {

 @Override
public ProductSessionUpdate call(String productId, Iterator
eventsIterator, GroupState state) throws
Exception {
{



  if (state.hasTimedOut()) {

//

}else{

if (state.exists()) {
ProductStateInformation oldSession = state.get();
System.out.println("State for productId:"+productId + " with old
values "+oldSession);

}


public class EventsApp implements Serializable{

public void run(String[] args) throws Exception {

...


Dataset dataSet = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost")
.option("startingOffsets","latest")
.option("failOnDataLoss", "false")
.option("subscribe", "topic1,topic2")
.option("includeTimestamp", true)

.load();

 eventsDS.groupByKey(
new MapFunction() {
@Override public String call(Event event) {
return event.getProductId();
}
}, Encoders.STRING())
.mapGroupsWithState(
new StateUpdateTask(3),
Encoders.bean(ProductSessionInformation.class),
Encoders.bean(ProductSessionUpdate.class),
GroupStateTimeout.ProcessingTimeTimeout());

...


StreamingQuery query = productUpdates
.writeStream()
.foreach(new ForeachWriter() {
@Override
public boolean open(long l, long l1) {return true;}

@Override
public void process(ProductSessionUpdate productSessionUpdate) {
logger.info("-> query process: "+ productSessionUpdate);
}

@Override
public void close(Throwable throwable) {}
})
.outputMode("update")
.option("checkpointLocation", checkpointDir)
.start();

query.awaitTermination();

}

public class ProductStateInformation implements Serializable {

protected String requestId;
protected String productId;
protected String priority;
protected long firstEventTimeMillis;
protected long lastEventTimeMillis;
protected long firstReceivedTimeMillis;
protected int numberOfEvents;

...//getter setters

}

These are are the versions I am using:

2.3.1
2.4.3

2.6.60.10.2.0

3.0.3


Re: structured streaming Kafka consumer group.id override

2020-03-19 Thread Srinivas V
1. How would a prod admin user/other engineers understand  which process is
this random groupid which is consuming a specific  topic? why is it
designed this way?
2. I don't see the groupid changing all the time. It is repeating on
restarts. Not able to understand when and how it changes. I know it is
trying to get the next offset from last consumed, but it is failing as the
offset has been expired. What is the solution for this?

On Thu, Mar 19, 2020 at 10:53 AM lec ssmi  wrote:

> 1.Maybe  we can't use customized group  id in structured streaming.
> 2.When restarting from failure or killing , the group id changes, but the
> starting offset  will be the last one you consumed last time .
>
> Srinivas V  于2020年3月19日周四 下午12:36写道:
>
>> Hello,
>> 1. My Kafka consumer name is randomly being generated by spark structured
>> streaming. Can I override this?
>> 2. When testing in development, when I stop my streaming job for Kafka
>> consumer job for couple of days and try to start back again, the job keeps
>> failing for missing offsets as the offsets get expired after 4 hours. I
>> read that when restarting to consume messages SS always tries to get the
>> earliest offset but not latest offset. How to handle this problem?
>>
>> Regards
>> Srini
>>
>


structured streaming Kafka consumer group.id override

2020-03-18 Thread Srinivas V
Hello,
1. My Kafka consumer name is randomly being generated by spark structured
streaming. Can I override this?
2. When testing in development, when I stop my streaming job for Kafka
consumer job for couple of days and try to start back again, the job keeps
failing for missing offsets as the offsets get expired after 4 hours. I
read that when restarting to consume messages SS always tries to get the
earliest offset but not latest offset. How to handle this problem?

Regards
Srini


structured streaming with mapGroupWithState

2020-03-11 Thread Srinivas V
Anyone using this combination for prod? I am planning to use for a use case
with 15000 events per second from few Kafka topics. Through events are big,
I would just have to take the businessIds, frequency, first and last event
timestamp and save this into mapGroupWithState. I need to keep them for a
window if say 20 mins then push them to output Kafka. Total memory of the
state will not be more than say 50MB as I have limited number of
businessIds say 1 million.
Questions,
1.Want you to share any issues you might have faced or I may face.
2. How to debug if I am unable to keep up with inflow of events and lag is
increasing constantly?

Regards
Sri


[ spark-streaming ] - Data Locality issue

2020-02-04 Thread Karthik Srinivas
Hi,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Data locality

2020-02-04 Thread Karthik Srinivas
Hi all,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Please unsubscribe me

2016-12-05 Thread Srinivas Potluri



Re: retrieve cell value from a rowMatrix.

2016-01-21 Thread Srivathsan Srinivas
Hi Zhang,
  I am new to Scala and Spark. I am not a Java guy (more of Python and R
guy). Just began playing with matrices in MlLib and looks painful to do
simple things. If you can show me a small example, it would help. Apply
function is not available in RowMatrix.

For eg.,

import org.apache.spark.mllib.linalg.distributed.RowMatrix

/* retrive a cell value */
def getValue(m: RowMatrix): Double = {
   ???
}


Likewise, I have trouble adding two RowMatrices

/* add two RowMatrices */
def addRowMatrices(a: RowMatrix, b: RowMatrix): RowMatrix = {

}


>From what I have read on Stackoverflow and other places is that such simple
things are not exposed in MlLib. But, they are heavily used in the
underlying Breeze libraries. Hence, one should convert the rowMatrics to
its Breeze equivalent, do the required operations and convert it back to
rowMatrix. I am still learning how to do this kind of conversion back and
forth. If you have small examples, it would be very helpful.


Thanks!
Srini.

On Wed, Jan 20, 2016 at 10:08 PM, zhangjp <592426...@qq.com> wrote:

>
> use apply(i,j) function.
> can u know how  to save matrix to a file using java language
> <http://www.baidu.com/link?url=NXDpnPwRtM663-SOtf7UI7Vn88RsjDBih1D0weJSyZKL55PYEfeX6xc_dQUB3bpcAxBmEF9qhmXxZZFRGk0N43KqVWwLLAjgC6_W43ex9Rm>
> ?
>
> -- 原始邮件 --
> *发件人:* "Srivathsan Srinivas";<srivathsan.srini...@gmail.com>;
> *发送时间:* 2016年1月21日(星期四) 上午9:04
> *收件人:* "user"<user@spark.apache.org>;
> *主题:* retrieve cell value from a rowMatrix.
>
> Hi,
>Is there a way to retrieve the cell value of a rowMatrix? Like m(i,j)?
> The docs say that the indices are long. Maybe I am doing something
> wrong...but, there doesn't seem to be any such direct method.
>
> Any suggestions?
>
> --
> Thanks,
> Srini. <http://csc.lsu.edu/~ssrini1/>
>



-- 
Thanks,
Srini. <http://csc.lsu.edu/%7Essrini1/>


retrieve cell value from a rowMatrix.

2016-01-20 Thread Srivathsan Srinivas
Hi,
   Is there a way to retrieve the cell value of a rowMatrix? Like m(i,j)?
The docs say that the indices are long. Maybe I am doing something
wrong...but, there doesn't seem to be any such direct method.

Any suggestions?

-- 
Thanks,
Srini. 


Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-11-10 Thread Srinivas Chamarthi
I am trying to use spark with spray and I have the  dependency problem with
quasiquotes. The issue comes up only when I include spark dependencies. I
am not sure how this one can be excluded.

Jianshi: can you let me know what version of spray + akka + spark are you
using ?

[error]org.scalamacros:quasiquotes _2.10, _2.10.3
[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) Conflicting cross-version suffixes in:
org.scalamacros:quasiq
uotes


On Thu, Oct 30, 2014 at 9:50 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi Preshant, Chester, Mohammed,

 I switched to Spark's Akka and now it works well. Thanks for the help!

 (Need to exclude Akka from Spray dependencies, or specify it as provided)


 Jianshi


 On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

  I am not sure about that.



 Can you try a Spray version built with 2.2.x along with Spark 1.1 and
 include the Akka dependencies in your project’s sbt file?



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 8:58 PM
 *To:* Mohammed Guller
 *Cc:* user
 *Subject:* Re: Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4,
 right?



 Jianshi



 On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Try a version built with Akka 2.2.x



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 3:03 AM
 *To:* user
 *Subject:* Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 Hi,



 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.



   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;



 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.



 Google suggested it's a compatibility issue in Akka. And I'm using latest
 Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.



 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.



 Anyone has idea what went wrong? Need help!



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Spray with Spark-sql build fails with Incompatible dependencies

2014-11-10 Thread Srinivas Chamarthi
I am trying to use spark with spray and I have the  dependency problem with
quasiquotes. The issue comes up only when I include spark dependencies. I
am not sure how this one can be excluded.

does anyone tried this before and it works ?

[error] Modules were resolved with conflicting cross-version suffixes in
{file:/
[error]org.scalamacros:quasiquotes _2.10, _2.10.3
[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) Conflicting cross-version suffixes in:
org.scalamacros:quasiq
uotes

thx
srinivas


Re: Unresolved Attributes

2014-11-09 Thread Srinivas Chamarthi
ok I am answering my question here. looks like name has a reserved key word
or some special treatment. unless you use alias, it doesn't work. so use an
alias always with name attribute.

select a.name from xxx a where a. = 'y' // RIGHT
select name from  where t ='yy' // doesn't work.

not sure if theres an issue already and already fixed in master.

I will raise an issue if someone else also confirms it.

thx
srinivas

On Sat, Nov 8, 2014 at 3:26 PM, Srinivas Chamarthi 
srinivas.chamar...@gmail.com wrote:

 I have an exception when I am trying to run a simple where clause query. I
 can see the name attribute is present in the schema but somehow it still
 throws the exception.

 query = select name from business where business_id= + business_id

 what am I doing wrong ?

 thx
 srinivas


 Exception in thread main
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: 'name, tree:
 Project ['name]
  Filter (business_id#1 = 'Ba1hXOqb3Yhix8bhE0k_WQ)
   Subquery business
SparkLogicalPlan (ExistingRdd
 [attributes#0,business_id#1,categories#2,city#3,full_address#4,hours#5,latitude#6,longitude#7,name#8,neighborhoods#9,open#10,review_count#11,stars#12,state#13,type#14],
 MappedRDD[5] at map at JsonRDD.scala:38)



Unresolved Attributes

2014-11-08 Thread Srinivas Chamarthi
I have an exception when I am trying to run a simple where clause query. I
can see the name attribute is present in the schema but somehow it still
throws the exception.

query = select name from business where business_id= + business_id

what am I doing wrong ?

thx
srinivas


Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'name, tree:
Project ['name]
 Filter (business_id#1 = 'Ba1hXOqb3Yhix8bhE0k_WQ)
  Subquery business
   SparkLogicalPlan (ExistingRdd
[attributes#0,business_id#1,categories#2,city#3,full_address#4,hours#5,latitude#6,longitude#7,name#8,neighborhoods#9,open#10,review_count#11,stars#12,state#13,type#14],
MappedRDD[5] at map at JsonRDD.scala:38)


contains in array in Spark SQL

2014-11-08 Thread Srinivas Chamarthi
hi,

what would be the syntax for check for  an attribute in an array data type
for my where clause ?

select * from business where cateogories contains 'X' // something like
this , is this right syntax ??

attribute: categories
type: Array

thx
srinivas


RE: Data from Mysql using JdbcRDD

2014-08-01 Thread srinivas
Hi Thanks Alli have few more questions on this 
suppose i don't want to pass where caluse in my sql and is their a way that
i can do this.
Right now i am trying to modify JdbcRDD class by removing all the paramaters
for lower bound and upper bound. But i am getting run time exceptions. 
Is their any work around solution to do normal sql queries with or without
using where clause or like selecting values for particular value?
Please help
-Srini.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-from-Mysql-using-JdbcRDD-tp10994p11174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Does spark streaming fit to our application

2014-07-21 Thread srinivas
Hi,
  Our application is required to do some aggregations on data that will be
coming as a stream for over two months. I would like to know if spark
streaming will be suitable for our requirement. After going through some
documentation and videos i think we can do aggregations on data based on
window timeframe that will be in minutes. I am not sure if we can cache that
data or we can store the data in hdfs for further calculations in spark
streaming. Please help!!

Thanks,
-Srini. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-streaming-fit-to-our-application-tp10345.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-18 Thread srinivas
Hi 

 I am able to save my RDD generated to local file that are coming from Spark
SQL that are getting from Spark Streaming. If i put the steamingcontext to
10 sec the data coming in that 10 sec time window is only processed by my
sql and the data is stored in the location i specified and for next set of
data (streamingcontext) its erroring that the save to folder already exist.
So i increase my time sparkcontext duration to 100 sec for this the data
thats comes in 100 sec window is processed at once and outputting the data
to several files in that folder like 10 different files
(part-0001,part-2...) each having one or two records. but i want to save
those files to single file. 
Please let me know if there any work around solution for this. 

the code that i am using

case class Record(ID:String,name:String,score:Int,school:String)
case class OutPut(name:String,score:String)
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()
val datenow = new Date()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount); 
 val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(100))
 val sqlContext = new SQLContext(sc)
val timer = Time(10)
   ssc.remember(Seconds(100))
//val timenow = new java.util.Date
import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
val fields
=jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString.toInt,data(school).toString))
fields.print()
//fields.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`)
val results = fields.foreachRDD((recrdd,timer) = {
recrdd.registerAsTable(table1)
val sqlreport =sqlContext.sql(select max(score) from table1 where ID =
'math' and score  50)
sqlreport.map(t=
OutPut(t(0).toString,t(1).toString)).collect().foreach(println)
//println(sqlreport)
//sqlreport.foreach(println)
sqlreport.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/+datenow)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
Thanks,
-Srinivas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
hi TD,

  Thanks for the solutions for my previous post...I am running into other
issue..i am getting data from json file and i am trying to parse it and
trying to map it to a record given below 

 val jsonf
=lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))



case class Record(ID:String,name:String,score:Int,school:String)


when i am trying to do this i am getting an error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any
[error]
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[Any,Any]]).map(data=Record(data(ID).toString,data(name).toString,data(score).toInt,data(school).toString))
[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:36:
value toInt is not a member of Any

I tried giving immutable.Map[Any,Int] and tried converting Int to string my
application compiled but i am getting exception when i am running it 

14/07/17 17:11:30 ERROR Executor: Exception in task ID 6
java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

Basically i am trying to do max operation in my sparksql.
please let me know if their any work around solution for this.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-17 Thread srinivas
Hi TD,
It Worked...Thank you so much for all your help.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-16 Thread srinivas
)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I am trying to enter data to kafka like
{type:math,name:srinivas,score:10,school:lfs}

I am thinking of some thing wrong with input RDD. Please let me know whats
causing this error.

Thanks,
-Srinivas.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-15 Thread srinivas
I am still getting the error...even if i convert it to record
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sql = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sql)
val timer = Time(1)
   // ssc.checkpoint(checkpoint)

//import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])
 case class Record(ID:String,name:String,score:String,school:String)
val fields =
jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
val results = fields.transform((recrdd,tt) = {
 recrdd.registerAsTable(table1)
 val results = sql(select * from table1)
 println(results)
 results.foreach(println)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
}

I am getting error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36:
value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record]
[error]  recrdd.registerAsTable(table1)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed


Please look into this and let me know if i am missing any thing.

Thanks,
-Srinivas.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-15 Thread srinivas
Hi TD,

I uncomment import sqlContext._ and tried to compile the code

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sql = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sql)
val timer = Time(1)
   // ssc.checkpoint(checkpoint)

import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
 case class Record(ID:String,name:String,score:String,school:String)
val fields =
jsonf.map(data=Record(data(type).toString,data(name).toString,data(score).toString,data(school).toString))
val results = fields.transform((recrdd,tt) = {
 recrdd.registerAsTable(table1)
 val results = sql(select * from table1)
 println(results)
 results.foreach(println)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
}

but received the error

[error]
/home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38:
No TypeTag available for Record
[error]  recrdd.registerAsTable(table1)
[error]  ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM


Please advice me on how to proceed

Thanks,
-Srinivas.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
hi 
  I am new to spark and scala and I am trying to do some aggregations on
json file stream using Spark Streaming. I am able to parse the json string
and it is converted to map(id - 123, name - srini, mobile - 12324214,
score - 123, test_type - math) now i want to use GROUPBY function on each
student map data and wanted to do some aggregations on scores. Here is my
main function 
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(10))
   // ssc.checkpoint(checkpoint)

val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

 
jsonf.print()

ssc.start()
ssc.awaitTermination()
  }

Can anyone please Let me know how to use groupby function..thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id - 123, name - srini, mobile - 12324214, score - 123, test_type
- math)
 map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type
-sci)
 map(id - 432, name -, mobile -423141234,score - 322,test_type -
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id - 123, name - srini, mobile - 12324214, score - 123, test_type
- math)
 map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type
-sci)
 map(id - 432, name -, mobile -423141234,score - 322,test_type -
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi TD,
  Thanks for ur help...i am able to convert map to records using case class.
I am left with doing some aggregations. I am trying to do some SQL type
operations on my records set. My code looks like

 case class Record(ID:Int,name:String,score:Int,school:String)
//val records = jsonf.map(m = Record(m(0),m(1),m(2),m(3)))
val fields = jsonf.map(data =
(data(type),data(name),data(score),data(school)))
val results = fields.transform((rdd,time) = {
 rdd.registerAsTable(table1)
 sqlc.sql(select * from table1)
})

when i am trying to compile my code it  giving me 
jsonfile.scala:30: value registerAsTable is not a member of
org.apache.spark.rdd.RDD[(Any, Any, Any, Any)]

Please let me know if i am missing any thing.
And using Spark Streaming can i really use sql kind of operations on
Dstreams?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.