Re: How to retrieve multiple columns values (in one row) to variables in Spark Scala method

2019-04-05 Thread ayan guha
Try like this:

val primitiveDS = spark.sql("select 1.2 avg ,2.3 stddev").collect().apply(0)
val arr = Array(primitiveDS.getDecimal(0), primitiveDS.getDecimal(1))

primitiveDS: org.apache.spark.sql.Row = [1.2,2.3] arr:
Array[java.math.BigDecimal] = Array(1.2, 2.3)


How to retrieve multiple columns values (in one row) to variables in Spark Scala method

2019-04-05 Thread Mich Talebzadeh
Hi,

Pretty basic question.

I use Spark on Hbase to retrieve the last  14 prices average and standard
deviation  for a security (ticker) from an Hbase table.

However, the call is expensive in Spark streaming where these values are
used to indicate buy and sell and subsequently the high value prices are
stored to MongoDB in the same spark streaming program.

I have defined these two methods to get the average and STDDEV of prices.

This method gets the average of prices

  def tickerAvg(dfHbase: org.apache.spark.sql.DataFrame, ticker: String):
Double = {
var priceTicker = dfHbase.filter(col("ticker") ===
ticker).sort(col("timeissued").desc).
   limit(14).
   select(round(avg(col("price")),3)).
   collect.apply(0).getDouble(0)
return priceTicker
  }
And this one STDDEV

  def tickerStddev(dfHbase: org.apache.spark.sql.DataFrame, ticker:
String): Double = {
var priceTicker = dfHbase.filter(col("ticker") ===
ticker).sort(col("timeissued").desc).
   limit(14).
   select(round(stddev(col("price")),3)).
   collect.apply(0).getDouble(0)
return priceTicker
  }

Note that each returns one value

However as these calls are expensive,  I want to return both AVG and STDDEV
in one call and from one method as follows


  def tickerGeneric(dfHbase: org.apache.spark.sql.DataFrame, ticker:
String): Double = {
var priceTicker = dfHbase.filter(col("ticker") ===
ticker).sort(col("timeissued").desc).
   limit(14).

select(round(avg(col("price")),3),round(stddev(col("price")),3)).
   collect.?

   return ?
}

How this can be achieved to return both AVG and STDDEV values with one call
to method?

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
*I guess I was focusing on this:*

#2
I want to do the above as a event driven way, *without using the batches*
(i tried micro batches, but I realised that’s not what I want), i.e., *for
each arriving event or as soon as a event message come my stream, not by
accumulating the event *

If you want to update your graph without pulling the older data back
through the entire DAG, it seems like you need to store the graph data
somewhere. So that's why I jumped to accumulators - the state would be
around from event to event, and not require a "reaggregation" for each
event.

Arbitrary stateful streaming has this ability "built in" - that is, the
engine stores your intermediate results in RAM and then the next event
picks up where the last one left off.

I've just implemented the arbitrary stateful streaming option... Couldn't
figure out a better way of avoiding the re-shuffle, so ended up keeping the
prior state in the engine.

I'm not using GraphX, but it seems like the approach should work
irrespective - there's an interface called GroupState that you hand off an
iterator for from call to call.

Do keep in mind that you have to think about out of order event arrivals...

Send me a message to my direct email and I'll provide a link to the
source... Not sure I'm fully grokking your entire use case...


On Fri, Apr 5, 2019 at 1:15 PM Basavaraj  wrote:

> I have checked broadcast of accumulated values, but not satellite stateful
> stabbing
>
> But, I am not sure how that helps here
>
> On Fri, 5 Apr 2019, 10:13 pm Jason Nerothin, 
> wrote:
>
>> Have you looked at Arbitrary Stateful Streaming and Broadcast
>> Accumulators?
>>
>> On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:
>>
>>> Hi
>>>
>>> Have two questions
>>>
>>> #1
>>> I am trying to process events in realtime, outcome of the processing has
>>> to find a node in the GraphX and update that node as well (in case if any
>>> anomaly or state change), If a node is updated, I have to update the
>>> related nodes as well, want to know if GraphX can help in this by providing
>>> some native support
>>>
>>> #2
>>> I want to do the above as a event driven way, without using the batches
>>> (i tried micro batches, but I realised that’s not what I want), i.e., for
>>> each arriving event or as soon as a event message come my stream, not by
>>> accumulating the event
>>>
>>> I humbly welcome any pointers, constructive criticism
>>>
>>> Regards
>>> Basav
>>> - To
>>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
I have checked broadcast of accumulated values, but not satellite stateful
stabbing

But, I am not sure how that helps here

On Fri, 5 Apr 2019, 10:13 pm Jason Nerothin, 
wrote:

> Have you looked at Arbitrary Stateful Streaming and Broadcast Accumulators?
>
> On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:
>
>> Hi
>>
>> Have two questions
>>
>> #1
>> I am trying to process events in realtime, outcome of the processing has
>> to find a node in the GraphX and update that node as well (in case if any
>> anomaly or state change), If a node is updated, I have to update the
>> related nodes as well, want to know if GraphX can help in this by providing
>> some native support
>>
>> #2
>> I want to do the above as a event driven way, without using the batches
>> (i tried micro batches, but I realised that’s not what I want), i.e., for
>> each arriving event or as soon as a event message come my stream, not by
>> accumulating the event
>>
>> I humbly welcome any pointers, constructive criticism
>>
>> Regards
>> Basav
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Thanks,
> Jason
>


Re: combineByKey

2019-04-05 Thread Jason Nerothin
Take a look at this SOF:

https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Thank you for the details. It is a typo error while composing the mail.
> Below is the actual flow.
>
> Any idea, why the combineByKey is not working. aggregateByKey is working.
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
>
> def createCombiner = (Id: String, value: String) => (value :: Nil).toSet
>
> def mergeValue = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
>  sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *aggregateByKey =>*
>
> val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
> (x.Id, x.value))).aggregateByKey(Set[String]())(
> (aggr, value) => aggr ++ Set(value._2),
> (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap
>
>  print(result)
>
> Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
> Set(t1, t2))
>
> Regards,
> Rajesh
>
> On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
> wrote:
>
>> I broke some of your code down into the following lines:
>>
>> import spark.implicits._
>>
>> val a: RDD[Messages]= sc.parallelize(messages)
>> val b: Dataset[Messages] = a.toDF.as[Messages]
>> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp
>> + "-" + x.Id, (x.Id, x.value))}
>>
>> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
>> have the types you think for the reduceByKey.
>>
>> I recommend breaking the code down like this to statement-by-statement
>> when you get into a dance with the Scala type system.
>>
>> The type-safety that you're after (that eventually makes life *easier*)
>> is best supported by Dataset (would have prevented the .id vs .Id error).
>> Although there are some performance tradeoffs vs RDD and DataFrame...
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any issue in the below code.
>>>
>>> case class Messages(timeStamp: Int, Id: String, value: String)
>>>
>>> val messages = Array(
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t3"),
>>>   Messages(0, "d1", "t4"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t5"),
>>>   Messages(0, "d2", "t6"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t2")
>>> )
>>>
>>> //Defining createCombiner, mergeValue and mergeCombiner functions
>>> def createCombiner = (id: String, value: String) => Set(value)
>>>
>>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>>> String)) => accumulator1 ++ Set(accumulator2._2)
>>>
>>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>>> accumulator2
>>>
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> *Compile Error:-*
>>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>>  required: ((String, String)) => ?
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi,

Thank you for the details. It is a typo error while composing the mail.
Below is the actual flow.

Any idea, why the combineByKey is not working. aggregateByKey is working.

//Defining createCombiner, mergeValue and mergeCombiner functions

def createCombiner = (Id: String, value: String) => (value :: Nil).toSet

def mergeValue = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*aggregateByKey =>*

val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
(x.Id, x.value))).aggregateByKey(Set[String]())(
(aggr, value) => aggr ++ Set(value._2),
(aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap

 print(result)

Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
Set(t1, t2))

Regards,
Rajesh

On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
wrote:

> I broke some of your code down into the following lines:
>
> import spark.implicits._
>
> val a: RDD[Messages]= sc.parallelize(messages)
> val b: Dataset[Messages] = a.toDF.as[Messages]
> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
> "-" + x.Id, (x.Id, x.value))}
>
> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
> have the types you think for the reduceByKey.
>
> I recommend breaking the code down like this to statement-by-statement
> when you get into a dance with the Scala type system.
>
> The type-safety that you're after (that eventually makes life *easier*) is
> best supported by Dataset (would have prevented the .id vs .Id error).
> Although there are some performance tradeoffs vs RDD and DataFrame...
>
>
>
>
>
>
> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> Any issue in the below code.
>>
>> case class Messages(timeStamp: Int, Id: String, value: String)
>>
>> val messages = Array(
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t3"),
>>   Messages(0, "d1", "t4"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t5"),
>>   Messages(0, "d2", "t6"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t2")
>> )
>>
>> //Defining createCombiner, mergeValue and mergeCombiner functions
>> def createCombiner = (id: String, value: String) => Set(value)
>>
>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>> String)) => accumulator1 ++ Set(accumulator2._2)
>>
>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>> accumulator2
>>
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> *Compile Error:-*
>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>  required: ((String, String)) => ?
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> Regards,
>> Rajesh
>>
>>
>
> --
> Thanks,
> Jason
>


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
Have you looked at Arbitrary Stateful Streaming and Broadcast Accumulators?

On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:

> Hi
>
> Have two questions
>
> #1
> I am trying to process events in realtime, outcome of the processing has
> to find a node in the GraphX and update that node as well (in case if any
> anomaly or state change), If a node is updated, I have to update the
> related nodes as well, want to know if GraphX can help in this by providing
> some native support
>
> #2
> I want to do the above as a event driven way, without using the batches (i
> tried micro batches, but I realised that’s not what I want), i.e., for each
> arriving event or as soon as a event message come my stream, not by
> accumulating the event
>
> I humbly welcome any pointers, constructive criticism
>
> Regards
> Basav
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Jason Nerothin
I broke some of your code down into the following lines:

import spark.implicits._

val a: RDD[Messages]= sc.parallelize(messages)
val b: Dataset[Messages] = a.toDF.as[Messages]
val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
"-" + x.Id, (x.Id, x.value))}

You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have
the types you think for the reduceByKey.

I recommend breaking the code down like this to statement-by-statement when
you get into a dance with the Scala type system.

The type-safety that you're after (that eventually makes life *easier*) is
best supported by Dataset (would have prevented the .id vs .Id error).
Although there are some performance tradeoffs vs RDD and DataFrame...






On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any issue in the below code.
>
> case class Messages(timeStamp: Int, Id: String, value: String)
>
> val messages = Array(
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t3"),
>   Messages(0, "d1", "t4"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t5"),
>   Messages(0, "d2", "t6"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t2")
> )
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
> def createCombiner = (id: String, value: String) => Set(value)
>
> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> Regards,
> Rajesh
>
>

-- 
Thanks,
Jason


Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
HiHave two questions #1 I am trying to process events in realtime, outcome of the processing has to find a node in the GraphX and update that node as well (in case if any anomaly or state change), If a node is updated, I have to update the related nodes as well, want to know if GraphX can help in this by providing some native support#2 I want to do the above as a event driven way, without using the batches (i tried micro batches, but I realised that’s not what I want), i.e., for each arriving event or as soon as a event message come my stream, not by accumulating the event I humbly welcome any pointers, constructive criticism RegardsBasav
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
Hi

Have two questions 

#1 
I am trying to process events in realtime, outcome of the processing has to 
find a node in the GraphX and update that node as well (in case if any anomaly 
or state change), If a node is updated, I have to update the related nodes as 
well, want to know if GraphX can help in this by providing some native support

#2 
I want to do the above as a event driven way, without using the batches (i 
tried micro batches, but I realised that’s not what I want), i.e., for each 
arriving event or as soon as a event message come my stream, not by 
accumulating the event 

I humbly welcome any pointers, constructive criticism 

Regards
Basav

smime.p7s
Description: S/MIME cryptographic signature


Is there any spark API function to handle a group of companies at once in this scenario?

2019-04-05 Thread Shyam P
Hi ,
In my scenario I have few companies , for which I need to calculate few
stats like avg I need to be stored in Cassandra , for next set of records I
need to get previously calculated and over it i need to calculate
accumulated results ( i.e preset set of data + previously stored stats) and
stored it back to Cassandra.

what function/API of spark be used while calculating the above for a group
of companies?


Regards,
Shyam


combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi,

Any issue in the below code.

case class Messages(timeStamp: Int, Id: String, value: String)

val messages = Array(
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t2"),
  Messages(0, "d1", "t2"),
  Messages(0, "d1", "t3"),
  Messages(0, "d1", "t4"),
  Messages(0, "d2", "t1"),
  Messages(0, "d2", "t1"),
  Messages(0, "d2", "t5"),
  Messages(0, "d2", "t6"),
  Messages(0, "d2", "t2"),
  Messages(0, "d2", "t2"),
  Messages(0, "d3", "t1"),
  Messages(0, "d3", "t1"),
  Messages(0, "d3", "t2")
)

//Defining createCombiner, mergeValue and mergeCombiner functions
def createCombiner = (id: String, value: String) => Set(value)

def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

Regards,
Rajesh