Re: Joining streaming data with static table data.

2017-12-11 Thread Rishi Mishra
You can do a join between streaming dataset and a static dataset. I would
prefer your first approach. But the problem with this approach is
performance.
Unless you cache the dataset , every time you fire a join query it will
fetch the latest records from the table.



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, Dec 12, 2017 at 6:29 AM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I working on real time reporting project and i have a question about
> structured streaming job, that is going to stream a particular table
> records and would have to join to an existing table.
>
> Stream > query/join to another DF/DS ---> update the Stream data
> record.
>
> Now i have a problem on how do i approach the mid layer(query/join to
> another DF/DS), should i create a DF from spark.read.format("JDBC") or
> "stream and maintain the data in memory sink" or if there is any better way
> to do it.
>
> Would like to know, if anyone has faced a similar scenario and have any
> suggestion on how to go ahead.
>
> Regards,
> Satyajit.
>


Re: DataFrame joins with Spark-Java

2017-11-29 Thread Rishi Mishra
Hi Sushma,
can you try as below with a left anti join ..In my example name & id
consists of a key.

df1.alias("a").join(df2.alias("b"),
col("a.name").equalTo(col("b.name"))
.and(col("a.id").equalTo(col("b.id"))) ,
"left_anti").selectExpr("name", "id").show(10, false);

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Nov 30, 2017 at 7:38 AM, sushma spark 
wrote:

> Dear Friends,
>
> I am new to spark DataFrame. My requirement is i have a dataframe1
> contains the today's records and dataframe2 contains yesterday's records. I
> need to compare the today's records with yesterday's records and find out
> new records which are not exists in the yesterday's records based on the
> primary key of the column. Here, the problem is sometimes there are
> multiple columns having primary keys.
>
> I am receiving primary key columns in a List.
>
> example:
>
> List primaryKeyList = listOfPrimarykeys; // single or multiple
> primary key columns
>
> DataFrame currentDataRecords = queryexecutor.getCurrentRecords(); // this
> contains today's records
> DataFrame yesterdayRecords = queryexecutor.getYesterdayRecords();// this
> contains yesterday's records
>
> Can you anyone help me how to join these two dataframes and apply WHERE
> conditions on columns dynamically with SPARK-JAVA code.
>
> Thanks
> Sushma
>
>


Re: Extend Dataframe API

2016-07-08 Thread Rishi Mishra
Or , you can extend SQLContext to add your plans . Not sure if it fits your
requirement , but answered to highlight an option.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Jul 7, 2016 at 8:39 PM, tan shai  wrote:

> That was what I am thinking to do.
>
> Do you have any idea about this? Or any documentations?
>
> Many thanks.
>
> 2016-07-07 17:07 GMT+02:00 Koert Kuipers :
>
>> i dont see any easy way to extend the plans, beyond creating a custom
>> version of spark.
>>
>> On Thu, Jul 7, 2016 at 9:31 AM, tan shai  wrote:
>>
>>> Hi,
>>>
>>> I need to add new operations to the dataframe API.
>>> Can any one explain to me how to extend the plans of query execution?
>>>
>>> Many thanks.
>>>
>>
>>
>


Re: RDD and Dataframes

2016-07-07 Thread Rishi Mishra
Yes, finally it will be converted to an RDD internally. However DataFrame
queries are passed through catalyst , which provides several optimizations
e.g. code generation, intelligent shuffle etc , which is not the case for
pure RDDs.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Jul 7, 2016 at 4:50 PM, brccosta  wrote:

> Dear guys,
>
> I'm investigating the differences between RDDs and Dataframes/Datasets. I
> couldn't find the answer for this question: Dataframes acts as a new layer
> in the Spark stack? I mean, in the execution there is a conversion to RDD?
>
> For example, if I create a Dataframe and perform a query, in the final step
> it will be transformed into a RDD to be executed in Spark?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Dataframes-tp27306.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Rishi Mishra
Will try with JSON relation, but with Spark's temp tables (Spark version
1.6 ) I get an optimized plan as you have mentioned. Should not be much
different though.

Query : "select t1.col2, t1.col3 from t1, t2 where t1.col1=t2.col1 and
t1.col3=7"

Plan :

Project [COL2#1,COL3#2]
+- Join Inner, Some((COL1#0 = COL1#3))
   :- Filter (COL3#2 = 7)
   :  +- LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[4] at apply at
Transformer.scala:22
   +- Project [COL1#3]
  +- LogicalRDD [col1#3,col2#4,col3#5], MapPartitionsRDD[5] at apply at
Transformer.scala:22

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 4:56 PM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> In this case, isn't better to perform the filter earlier as possible even
> there could be unhandled predicates?
>
> Telmo Rodrigues
>
> No dia 11/05/2016, às 09:49, Rishi Mishra <rmis...@snappydata.io>
> escreveu:
>
> It does push the predicate. But as a relations are generic and might or
> might not handle some of the predicates , it needs to apply filter of
> un-handled predicates.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
> telmo.galante.rodrig...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a question about the Catalyst optimizer in Spark 1.6.
>>
>> initial logical plan:
>>
>> !'Project [unresolvedalias(*)]
>> !+- 'Filter ('t.id = 1)
>> !   +- 'Join Inner, Some(('t.id = 'u.id))
>> !  :- 'UnresolvedRelation `t`, None
>> !  +- 'UnresolvedRelation `u`, None
>>
>>
>> logical plan after optimizer execution:
>>
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !  :- Subquery t
>> !  :  +- Relation[id#0L] JSONRelation
>> !  +- Subquery u
>> !  +- Relation[id#1L] JSONRelation
>>
>>
>> Shouldn't the optimizer push down predicates to subquery t in order to
>> the filter be executed before join?
>>
>> Thanks
>>
>>
>>
>


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Rishi Mishra
It does push the predicate. But as a relations are generic and might or
might not handle some of the predicates , it needs to apply filter of
un-handled predicates.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> Hello,
>
> I have a question about the Catalyst optimizer in Spark 1.6.
>
> initial logical plan:
>
> !'Project [unresolvedalias(*)]
> !+- 'Filter ('t.id = 1)
> !   +- 'Join Inner, Some(('t.id = 'u.id))
> !  :- 'UnresolvedRelation `t`, None
> !  +- 'UnresolvedRelation `u`, None
>
>
> logical plan after optimizer execution:
>
> Project [id#0L,id#1L]
> !+- Filter (id#0L = cast(1 as bigint))
> !   +- Join Inner, Some((id#0L = id#1L))
> !  :- Subquery t
> !  :  +- Relation[id#0L] JSONRelation
> !  +- Subquery u
> !  +- Relation[id#1L] JSONRelation
>
>
> Shouldn't the optimizer push down predicates to subquery t in order to the
> filter be executed before join?
>
> Thanks
>
>
>


Re: partitioner aware subtract

2016-05-10 Thread Rishi Mishra
As you have same partitioner and number of partitions probably you can use
zipPartition and provide a user defined function to substract .

A very primitive  example being.

val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
val rdd1 = sc.parallelize(data1, 2)
val rdd2 = sc.parallelize(data2, 2)
val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
sum.foreach(println)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju  wrote:

> We tried that but couldn't figure out a way to efficiently filter it. Lets
> take two RDDs.
>
> rdd1:
>
> (1,2)
> (1,5)
> (2,3)
> (3,20)
> (3,16)
>
> rdd2:
>
> (1,2)
> (3,30)
> (3,16)
> (5,12)
>
> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>
> (1,(2,Some(2)))
> (1,(5,Some(2)))
> (2,(3,None))
> (3,(20,Some(30)))
> (3,(20,Some(16)))
> (3,(16,Some(30)))
> (3,(16,Some(16)))
>
> case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
> we also should filter out (3, (16, Some(30))). How can we do that
> efficiently without resorting to broadcast of any elements of rdd2?
>
> Regards,
> Raghava.
>
>
> On Mon, May 9, 2016 at 6:27 AM, ayan guha  wrote:
>
>> How about outer join?
>> On 9 May 2016 13:18, "Raghava Mutharaju" 
>> wrote:
>>
>>> Hello All,
>>>
>>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>>> (number of partitions are same for both the RDDs). We would like to
>>> subtract rdd2 from rdd1.
>>>
>>> The subtract code at
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>>> seems to group the elements of both the RDDs using (x, null) where x is the
>>> element of the RDD and partition them. Then it makes use of
>>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>>> case, is both key and value combined). In our case, both the RDDs are
>>> already hash partitioned on the key of x. Can we take advantage of this by
>>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>>> mapPartitions() for this?
>>>
>>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>>> be memory consuming and inefficient. We tried to do a local set difference
>>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>>> use destroy() on the broadcasted value, but it does not help.
>>>
>>> The current subtract method is slow for us. rdd1 and rdd2 are around
>>> 700MB each and the subtract takes around 14 seconds.
>>>
>>> Any ideas on this issue is highly appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


Re: Updating Values Inside Foreach Rdd loop

2016-05-10 Thread Rishi Mishra
Hi Harsh,
Probably you need to maintain some state for your values, as you are
updating some of the keys in a batch and check for a global state of your
equation.
Can you check the API mapWithState of DStream ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 8:40 PM, HARSH TAKKAR  wrote:

> Hi
>
> Please help.
>
> On Sat, 7 May 2016, 11:43 p.m. HARSH TAKKAR, 
> wrote:
>
>> Hi Ted
>>
>> Following is my use case.
>>
>> I have a prediction algorithm where i need to update some records to
>> predict the target.
>>
>> For eg.
>> I have an eq. Y=  mX +c
>> I need to change value of Xi of some records and calculate sum(Yi) if the
>> value of prediction is not close to target value then repeat the process.
>>
>> In each iteration different set of values are updated but result is
>> checked when we sum up the values.
>>
>> On Sat, 7 May 2016, 8:58 a.m. Ted Yu,  wrote:
>>
>>> Using RDDs requires some 'low level' optimization techniques.
>>> While using dataframes / Spark SQL allows you to leverage existing code.
>>>
>>> If you can share some more of your use case, that would help other
>>> people provide suggestions.
>>>
>>> Thanks
>>>
>>> On May 6, 2016, at 6:57 PM, HARSH TAKKAR  wrote:
>>>
>>> Hi Ted
>>>
>>> I am aware that rdd are immutable, but in my use case i need to update
>>> same data set after each iteration.
>>>
>>> Following are the points which i was exploring.
>>>
>>> 1. Generating rdd in each iteration.( It might use a lot of memory).
>>>
>>> 2. Using Hive tables and update the same table after each iteration.
>>>
>>> Please suggest,which one of the methods listed above will be good to use
>>> , or is there are more better ways to accomplish it.
>>>
>>> On Fri, 6 May 2016, 7:09 p.m. Ted Yu,  wrote:
>>>
 Please see the doc at the beginning of RDD class:

  * A Resilient Distributed Dataset (RDD), the basic abstraction in
 Spark. Represents an immutable,
  * partitioned collection of elements that can be operated on in
 parallel. This class contains the
  * basic operations available on all RDDs, such as `map`, `filter`, and
 `persist`. In addition,

 On Fri, May 6, 2016 at 5:25 AM, HARSH TAKKAR 
 wrote:

> Hi
>
> Is there a way i can modify a RDD, in for-each loop,
>
> Basically, i have a use case in which i need to perform multiple
> iteration over data and modify few values in each iteration.
>
>
> Please help.
>




Re: Accumulator question

2016-05-10 Thread Rishi Mishra
Your mail does not describe  much , but wont a simple reduce function help
you ?
Something like as below

val data = Seq(1,2,3,4,5,6,7)
val rdd = sc.parallelize(data, 2)
val sum = rdd.reduce((a,b) => a+b)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, May 10, 2016 at 10:44 AM, Abi  wrote:

> I am splitting an integer array in 2 partitions and using an accumulator
> to sum the array. problem is
>
> 1. I am not seeing execution time becoming half of a linear summing.
>
> 2. The second node (from looking at timestamps) takes 3 times as long as
> the first node. This gives the impression it is "waiting" for the first
> node to finish.
>
> Hence, I am given the impression using accumulator.sum () in the kernel
> and rdd.foreach (kernel) is making things sequential.
>
> Any api/setting suggestions where I could make things parallel ?
>
> On Mon, May 9, 2016 at 8:24 PM, Abi  wrote:
>
>> I am splitting an integer array in 2 partitions and using an accumulator
>> to sum the array. problem is
>>
>> 1. I am not seeing execution time becoming half of a linear summing.
>>
>> 2. The second node (from looking at timestamps) takes 3 times as long as
>> the first node. This gives the impression it is "waiting" for the first
>> node to finish.
>>
>> Hence, I am given the impression using accumulator.sum () in the kernel
>> and rdd.foreach (kernel) is making things sequential.
>>
>> Any api/setting suggestions where I could make things parallel ?
>>
>>
>>
>


Re: Spark Streaming share state between two streams

2016-04-08 Thread Rishi Mishra
Hi Shekhar,
As both of your state functions does the same thing can't you do a union of
dtsreams before applying mapWithState() ? It might be difficult if one
state function is dependent on other state. This requires a named state,
which can be accessed in other state functions. I have not gone through the
details but the PR (https://github.com/apache/spark/pull/11645)  from
Tathagat seems to be in that direction .

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 3:53 PM, Shekhar Bansal <
shekhar0...@yahoo.com.invalid> wrote:

> Hi
> Can we share spark streaming state between two DStreams??
> Basically I want to create state using first stream and enrich second
> stream using state.
> Example: I have modified StatefulNetworkWordCount example. I am creating
> state using first stream and enriching second stream with count of first
> stream.
>
> val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
> 1)))
>
>
> val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
>   val output = (word, sum)
>   state.update(sum)
>
>   Some(output)
> }
>
> val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = state.getOption.getOrElse(0)
>   val output = (word, sum)
>
>   Some(output)
> }
>
>
>
> // first stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(1)
>
>
>
> // second stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams2, mergeTopicSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(50)
>
>
> In checkpointing directory, I can see two different state RDDs.
> I am using spark-1.6.1 and kafka-0.8.2.1
>
> Regards
> Shekhar
>


Re: About nested RDD

2016-04-08 Thread Rishi Mishra
As mentioned earlier you can create a broadcast variable containing all the
small RDD elements. I hope they are really small.  Then you can fire
A.updatae(broadcastVariable).

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 2:33 PM, Tenghuan He  wrote:

> Hi
>
> Thanks for your reply.
> Yes, It's very much like the union() method, but there is some difference.
>
> I have a very large RDD A, and a lot of small RDDs b, c, d and so on.
> and A.update(a) will update some element in the A and return a new RDD
>
> when calling
> val B = A.update(b).update(c).update(d).update().
> B.count()
>
> The count action will call the compute method.
> and each update will iterating the large rdd A.
> To avoid this I can merge these small rdds first to rdds then call
> A.update(rdds)
> But I don't hope to do this merge manually outside but inside RDD A
> automatically
>
> I hope I made it clear.
> ​
>
> On Fri, Apr 8, 2016 at 4:22 PM, Holden Karau  wrote:
>
>> It seems like the union function on RDDs might be what you are looking
>> for, or was there something else you were trying to achieve?
>>
>>
>> On Thursday, April 7, 2016, Tenghuan He  wrote:
>>
>>> Hi all,
>>>
>>> I know that nested RDDs are not possible like linke rdd1.map(x => x +
>>> rdd2.count())
>>> I tried to create a custome RDD like following
>>>
>>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>>>
>>> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
>>> def update(rdd: RDD[_]) {
>>>   udds += rdd
>>> }
>>> def comput ...
>>> def getPartitions ...
>>> }
>>>
>>> In the compute method I call the internal rdds' iterators and got
>>> NullPointerException
>>> Is this also a form of nested RDDs and how do I get rid of this?
>>>
>>> Thanks.
>>>
>>>
>>> Tenghuan
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>


Re: About nested RDD

2016-04-08 Thread Rishi Mishra
rdd.count() is a fairly straightforward operations which can  be calculated
on a driver and then the value can be included in the map function.
Is your goal is to write a generic function which operates on two rdds, one
rdd being evaluated for each partition of the other ?
Here also you can use broadcast , if one of your RDD is small enough. If
both the RDDs are fairly big, I would like to understand your use case
better.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 1:52 PM, Holden Karau  wrote:

> It seems like the union function on RDDs might be what you are looking
> for, or was there something else you were trying to achieve?
>
>
> On Thursday, April 7, 2016, Tenghuan He  wrote:
>
>> Hi all,
>>
>> I know that nested RDDs are not possible like linke rdd1.map(x => x +
>> rdd2.count())
>> I tried to create a custome RDD like following
>>
>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] {
>>
>> var rdds = new  ArrayBuffer.empty[RDD[(K, (V, Int))]]
>> def update(rdd: RDD[_]) {
>>   udds += rdd
>> }
>> def comput ...
>> def getPartitions ...
>> }
>>
>> In the compute method I call the internal rdds' iterators and got
>> NullPointerException
>> Is this also a form of nested RDDs and how do I get rid of this?
>>
>> Thanks.
>>
>>
>> Tenghuan
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: Spark SQL Optimization

2016-03-22 Thread Rishi Mishra
What we have observed so far is Spark picks join order in the same order as
tables in from clause is specified.  Sometimes reordering benefits the join
query.
This can be an inbuilt optimization in Spark. But again its not going to be
straight forward, where rather than table size,  selectivity of Join is
important.
Probably some kind of heuristic might help.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, Mar 21, 2016 at 11:18 PM, gtinside  wrote:

> More details :
>
> Execution plan for Original query
> select distinct pge.portfolio_code
> from table1 pge join table2 p
> on p.perm_group = pge.anc_port_group
> join table3 uge
> on p.user_group=uge.anc_user_group
> where uge.user_name = 'user' and p.perm_type = 'TEST'
>
> == Physical Plan ==
> TungstenAggregate(key=[portfolio_code#14119], functions=[],
> output=[portfolio_code#14119])
>  TungstenExchange hashpartitioning(portfolio_code#14119)
>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
> output=[portfolio_code#14119])
>TungstenProject [portfolio_code#14119]
> BroadcastHashJoin [user_group#13665], [anc_user_group#13658],
> BuildRight
>  TungstenProject [portfolio_code#14119,user_group#13665]
>   BroadcastHashJoin [anc_port_group#14117], [perm_group#13667],
> BuildRight
>ConvertToUnsafe
> Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>ConvertToUnsafe
> Project [user_group#13665,perm_group#13667]
>  Filter (perm_type#13666 = TEST)
>   Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666]
>  ConvertToUnsafe
>   Project [anc_user_group#13658]
>Filter (user_name#13659 = user)
> Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>
>
>
> Execution plan for optimized query
> select distinct pge.portfolio_code
> from table1 uge, table2 p, table3 pge
> where uge.user_name = 'user' and p.perm_type = 'TEST'
> and p.perm_group = pge.anc_port_group
> and p.user_group=uge.anc_user_group
>
> == Physical Plan ==
> TungstenAggregate(key=[portfolio_code#14119], functions=[],
> output=[portfolio_code#14119])
>  TungstenExchange hashpartitioning(portfolio_code#14119)
>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
> output=[portfolio_code#14119])
>TungstenProject [portfolio_code#14119]
> BroadcastHashJoin [perm_group#13667], [anc_port_group#14117],
> BuildRight
>  TungstenProject [perm_group#13667]
>   BroadcastHashJoin [anc_user_group#13658], [user_group#13665],
> BuildRight
>ConvertToUnsafe
> Project [anc_user_group#13658]
>  Filter (user_name#13659 = user)
>   Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>ConvertToUnsafe
> Project [perm_group#13667,user_group#13665]
>  Filter (perm_type#13666 = TEST)
>   Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
>  ConvertToUnsafe
>   Scan
>
> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sliding Top N window

2016-03-22 Thread Rishi Mishra
Hi Alexy,
We are also trying to solve similar problems using approximation. Would
like to hear more about your usage.  We can discuss this offline without
boring others.  :)

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson  wrote:

> Hi,
>
> If you can accept approximate top N results, there is a neat solution
> for this problem: Use an approximate Map structure called
> Count-Min Sketch, in combination with a list of the M top items, where
> M > N. When you encounter an item not in the top M, you look up its
> count in the Count-Min Sketch do determine whether it qualifies.
>
> You will need to break down your event stream into time windows with a
> certain time unit, e.g. minutes or hours, and keep one Count-Min
> Sketch for each unit. The CMSs can be added, so you aggregate them to
> form your sliding windows. You also keep a top M (aka "heavy hitters")
> list for each window.
>
> The data structures required are surprisingly small, and will likely
> fit in memory on a single machine, if it can handle the traffic
> volume, so you might not need Spark at all. If you choose to use Spark
> in order to benefit from windowing, be aware that Spark lumps events
> in micro batches based on processing time, not event time.
>
> I made a presentation on approximate counting a couple of years ago.
> Slides and video here:
>
> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
> .
> You can also search for presentation by Ted Dunning and Mikio Braun,
> who have held good presentations on the subject.
>
> There are AFAIK two open source implementations of Count-Min Sketch,
> one of them in Algebird.
>
> Let me know if anything is unclear.
>
> Good luck, and let us know how it goes.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
>
> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>  wrote:
> > Good day,
> >
> > I have a following task: a stream of “page vies” coming to kafka topic.
> Each
> > view contains list of product Ids from a visited page. The task: to have
> in
> > “real time” Top N product.
> >
> > I am interested in some solution that would require minimum intermediate
> > writes … So  need to build a sliding window for top N product, where the
> > product counters dynamically changes and window should present the TOP
> > product for the specified period of time.
> >
> > I believe there is no way to avoid maintaining all product counters
> counters
> > in memory/storage.  But at least I would like to do all logic, all
> > calculation on a fly, in memory, not spilling multiple RDD from memory to
> > disk.
> >
> > So I believe I see one way of doing it:
> >Take, msg from kafka take and line up, all elementary action
> (increase by
> > 1 the counter for the product PID )
> >   Each action will be implemented as a call to HTable.increment()  // or
> > easier, with incrementColumnValue()…
> >   After each increment I can apply my own operation “offer” would provide
> > that only top N products with counters are kept in another Hbase table
> (also
> > with atomic operations).
> >  But there is another stream of events: decreasing product counters when
> > view expires the legth of sliding window….
> >
> > So my question: does anybody know/have and can share the piece code/ know
> > how: how to implement “sliding Top N window” better.
> > If nothing will be offered, I will share what I will do myself.
> >
> > Thank you
> > Alexey
> > This message, including any attachments, is the property of Sears
> Holdings
> > Corporation and/or one of its subsidiaries. It is confidential and may
> > contain proprietary or legally privileged information. If you are not the
> > intended recipient, please delete it without reading the contents. Thank
> > you.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Joins in Spark

2016-03-19 Thread Rishi Mishra
My suspect is your input file partitions are small. Hence small number of
tasks are started.  Can you provide some more details like how you load the
files and how  the result size is around 500GBs ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Mar 17, 2016 at 12:12 PM, Stuti Awasthi 
wrote:

> Hi All,
>
>
>
> I have to join 2 files both not very big say few MBs only but the result
> can be huge say generating 500GBs to TBs of data.  Now I have tried using
> spark Join() function but Im noticing that join is executing on only 1 or 2
> nodes at the max. Since I have a cluster size of 5 nodes , I tried to pass “
> join(*otherDataset*, [*numTasks*])” as numTasks=10 but again what I
> noticed that all the 9 tasks are finished instantly and only 1 executor is
> processing all the data.
>
>
>
> I searched on internet and got that we can use Broadcast variable to send
> data from 1 file to all nodes and then use map function to do the join. In
> this way I should be able to run multiple task on different executors.
>
> Now my question is , since Spark is providing the Join functionality, I
> have assumed that it will handle the data parallelism automatically. Now is
> Spark provide some functionality which I can directly use for join rather
> than implementing Mapside join using Broadcast on my own or any other
> better way is also welcome.
>
>
>
> I assume that this might be very common problem for all and looking out
> for suggestions.
>
>
>
> Thanks 
>
> Stuti Awasthi
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-18 Thread Rishi Mishra
Michael,
Is there any specific reason why DataFrames does not have partitioners like
RDDs ? This will be very useful if one is writing custom datasources ,
which keeps data in partitions. While storing data one can pre-partition
the data at Spark level rather than at the datasource.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy  wrote:

> So suppose I have a bunch of userIds and I need to save them as parquet in
> database. I also need to load them back and need to be able to do a join
> on userId. My idea is to partition by userId hashcode first and then on
> userId. So that I don't have to deal with any performance issues because of
> a number of small files and also to be able to scan faster.
>
>
> Something like ...df.write.format("parquet").partitionBy( "userIdHash"
> , "userId").mode(SaveMode.Append).save("userRecords");
>
> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So suppose I have a bunch of userIds and I need to save them as parquet
>> in database. I also need to load them back and need to be able to do a join
>> on userId. My idea is to partition by userId hashcode first and then on
>> userId.
>>
>>
>>
>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Can you describe what you are trying to accomplish?  What would the
>>> custom partitioner be?
>>>
>>> On Tue, Feb 16, 2016 at 1:21 PM, SRK  wrote:
>>>
 Hi,

 How do I use a custom partitioner when I do a saveAsTable in a
 dataframe.


 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread Rishi Mishra
Unfortunately there is not any,  at least till 1.5.  Have not gone through
the new DataSet of 1.6.  There is some basic support for Parquet like
partitionByColumn.
If you want to partition your dataset on a certain way you have to use an
RDD to partition & convert that into a DataFrame before storing in table.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Tue, Feb 16, 2016 at 11:51 PM, SRK  wrote:

> Hi,
>
> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkSQL parallelism

2016-02-11 Thread Rishi Mishra
I am not sure why all 3 nodes should query.  If you have not mentioned any
partitions it should only be one partition of JDBCRDD where all dataset
should reside.


On Fri, Feb 12, 2016 at 10:15 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a spark cluster with One Master and 3 worker nodes. I have written
> a below code to fetch the records from oracle using sparkSQL
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val employees = sqlContext.read.format("jdbc").options(
> Map("url" -> "jdbc:oracle:thin:@:1525:SID",
> "dbtable" -> "(select * from employee where name like '%18%')",
> "user" -> "username",
> "password" -> "password")).load
>
> I have a submitted this job to spark cluster using spark-submit command.
>
>
>
> *Looks like, All 3 workers are executing same query and fetching same
> data. It means, it is making 3 jdbc calls to oracle.*
> *How to make this code to make a single jdbc call to oracle(In case of
> more than one worker) ?*
>
> Please help me to resolve this use case
>
> Regards,
> Rajesh
>
>
>


-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Spark : Unable to connect to Oracle

2016-02-10 Thread Rishi Mishra
ASFIK sc.addJar() will add the jars to executor's classpath . The
datasource resolution ( createRelation) happens at driver side and driver
classpath should contain the ojdbc6.jar.  You can use
"spark.driver.extraClassPath"
config parameter to set the same.

On Wed, Feb 10, 2016 at 3:08 PM, Jorge Machado  wrote:

> Hi Divya,
>
> You need to install the Oracle jdbc driver on the cluster into lib folder.
>
> On 10/02/2016, at 09:37, Divya Gehlot  wrote:
>
> oracle.jdbc.driver.OracleDrive
>
>
>


-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: spark.executor.memory ? is used just for cache RDD or both cache RDD and the runtime of cores on worker?

2016-02-04 Thread Rishi Mishra
You would probably like to see
http://spark.apache.org/docs/latest/configuration.html#memory-management.
Other config parameters are also explained there.

On Fri, Feb 5, 2016 at 10:56 AM, charles li  wrote:

> if set spark.executor.memory = 2G for each worker [ 10 in total ]
>
> does it mean I can cache 20G RDD in memory ? if so, how about the memory
> for code running in each process on each worker?
>
> thanks.
>
>
> --
> and is there any materials about memory management or resource management
> in spark ? I want to put spark in production, but have little knowing about
> the resource management in spark, great thanks again
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Unit test with sqlContext

2016-02-04 Thread Rishi Mishra
Hi Steve,
Have you cleaned up your SparkContext ( sc.stop())  , in a afterAll(). The
error suggests you are creating more than one SparkContext.


On Fri, Feb 5, 2016 at 10:04 AM, Holden Karau  wrote:

> Thanks for recommending spark-testing-base :) Just wanted to add if anyone
> has feature requests for Spark testing please get in touch (or add an issue
> on the github) :)
>
>
> On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Hi Steve,
>>
>> Have you looked at the spark-testing-base package by Holden? It’s really
>> useful for unit testing Spark apps as it handles all the bootstrapping for
>> you.
>>
>> https://github.com/holdenk/spark-testing-base
>>
>> DataFrame examples are here:
>> https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala
>>
>> Thanks,
>> Silvio
>>
>> From: Steve Annessa 
>> Date: Thursday, February 4, 2016 at 8:36 PM
>> To: "user@spark.apache.org" 
>> Subject: Unit test with sqlContext
>>
>> I'm trying to unit test a function that reads in a JSON file, manipulates
>> the DF and then returns a Scala Map.
>>
>> The function has signature:
>> def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)
>>
>> I've created a bootstrap spec for spark jobs that instantiates the Spark
>> Context and SQLContext like so:
>>
>> @transient var sc: SparkContext = _
>> @transient var sqlContext: SQLContext = _
>>
>> override def beforeAll = {
>>   System.clearProperty("spark.driver.port")
>>   System.clearProperty("spark.hostPort")
>>
>>   val conf = new SparkConf()
>> .setMaster(master)
>> .setAppName(appName)
>>
>>   sc = new SparkContext(conf)
>>   sqlContext = new SQLContext(sc)
>> }
>>
>> When I do not include sqlContext, my tests run. Once I add the sqlContext
>> I get the following errors:
>>
>> 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
>> constructed (or threw an exception in its constructor).  This may indicate
>> an error, since only one SparkContext may be running in this JVM (see
>> SPARK-2243). The other SparkContext was created at:
>> org.apache.spark.SparkContext.(SparkContext.scala:81)
>>
>> 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
>> akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is
>> not unique!
>>
>> and finally:
>>
>> [info] IngestSpec:
>> [info] Exception encountered when attempting to run a suite with class
>> name: com.company.package.IngestSpec *** ABORTED ***
>> [info]   akka.actor.InvalidActorNameException: actor name
>> [ExecutorEndpoint] is not unique!
>>
>>
>> What do I need to do to get a sqlContext through my tests?
>>
>> Thanks,
>>
>> -- Steve
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Union of RDDs without the overhead of Union

2016-02-02 Thread Rishi Mishra
Agree with Koert that UnionRDD should have a narrow dependencies .
Although union of two RDDs increases the number of tasks to be executed (
rdd1.partitions + rdd2.partitions) .
If your two RDDs have same number of partitions , you can also use
zipPartitions, which causes lesser number of tasks, hence less overhead.

On Wed, Feb 3, 2016 at 9:58 AM, Koert Kuipers  wrote:

> i am surprised union introduces a stage. UnionRDD should have only narrow
> dependencies.
>
> On Tue, Feb 2, 2016 at 11:25 PM, Koert Kuipers  wrote:
>
>> well the "hadoop" way is to save to a/b and a/c and read from a/* :)
>>
>> On Tue, Feb 2, 2016 at 11:05 PM, Jerry Lam  wrote:
>>
>>> Hi Spark users and developers,
>>>
>>> anyone knows how to union two RDDs without the overhead of it?
>>>
>>> say rdd1.union(rdd2).saveTextFile(..)
>>> This requires a stage to union the 2 rdds before saveAsTextFile (2
>>> stages). Is there a way to skip the union step but have the contents of the
>>> two rdds save to the same output text file?
>>>
>>> Thank you!
>>>
>>> Jerry
>>>
>>
>>
>


-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: How to use collections inside foreach block

2015-12-09 Thread Rishi Mishra
Your list is defined on the driver, whereas function specified in forEach
will be evaluated on each executor.
You might want to add an accumulator or handle a Sequence of list from each
partition.

On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a below query. Please help me to solve this
>
> I have a 2 ids. I want to join these ids to table. This table contains
> some blob data. So i can not join these 2000 ids to this table in one step.
>
> I'm planning to join this table in a chunks. For example, each step I will
> join 5000 ids.
>
> Below code is not working. I'm not able to add result to ListBuffer.
> Result s giving always ZERO
>
> *Code Block :-*
>
> var listOfIds is a ListBuffer with 2 records
>
> listOfIds.grouped(5000).foreach { x =>
> {
> var v1 = new ListBuffer[String]()
> val r = sc.parallelize(x).toDF()
> r.registerTempTable("r")
> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
> t.id")
>  result.foreach{ y =>
>  {
>  v1 += y
>   }
> }
> println(" SIZE OF V1 === "+ v1.size)  ==>
>
> *THIS VALUE PRINTING AS ZERO*
>
> *// Save v1 values to other db*
> }
>
> Please help me on this.
>
> Regards,
> Rajesh
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: SparkSQL API to insert DataFrame into a static partition?

2015-12-02 Thread Rishi Mishra
As long as all your data is being inserted by Spark , hence using the same
hash partitioner,  what Fengdong mentioned should work.

On Wed, Dec 2, 2015 at 9:32 AM, Fengdong Yu 
wrote:

> Hi
> you can try:
>
> if your table under location “/test/table/“ on HDFS
> and has partitions:
>
>  “/test/table/dt=2012”
>  “/test/table/dt=2013”
>
> df.write.mode(SaveMode.Append).partitionBy("date”).save(“/test/table")
>
>
>
> On Dec 2, 2015, at 10:50 AM, Isabelle Phan  wrote:
>
> df.write.partitionBy("date").insertInto("my_table")
>
>
>


-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Join and HashPartitioner question

2015-11-16 Thread Rishi Mishra
AFAIK and can see in the code both of them should behave same.

On Sat, Nov 14, 2015 at 2:10 AM, Alexander Pivovarov 
wrote:

> Hi Everyone
>
> Is there any difference in performance btw the following two joins?
>
>
> val r1: RDD[(String, String]) = ???
> val r2: RDD[(String, String]) = ???
>
> val partNum = 80
> val partitioner = new HashPartitioner(partNum)
>
> // Join 1
> val res1 = r1.partitionBy(partitioner).join(r2.partitionBy(partitioner))
>
> // Join 2
> val res2 = r1.join(r2, partNum)
>
>
>


-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra