Re: Concurrent execution of actions within a driver

2015-10-26 Thread Rishitesh Mishra
Spark executes tasks on an action. An action is broken down to multiple
tasks. Multiple tasks from different actions run either in FIFO or FAIR
mode depending on spark.scheduler.mode.
Of course to get benefit of FAIR scheduling the two actions should be
called by different threads.

On Mon, Oct 26, 2015 at 5:01 PM, Fengdong Yu <fengdo...@everstring.com>
wrote:

> not parallel.
>
> Spark only execute tasks with Action,(‘collect' here)
>
> rdd1.collect  and rdd2.collect are executed sequencely, so Spark execute
> two tasks one by one.
>
>
>
>
> On Oct 26, 2015, at 7:26 PM, praveen S <mylogi...@gmail.com> wrote:
>
> Does spark run different actions of an rdd within a driver in parallel
> also?
>
> Let's say
> class Driver{
>
> val rdd1= sc. textFile("... ")
> val rdd2=sc.textFile("")
> rdd1. collect //Action 1
> rdd2. collect //Action 2
>
> }
>
> Does Spark run Action 1 & 2 run in parallel? ( some kind of a pass through
> the driver code and than start the execution)?
>
> if not than is using threads safe for independent actions/red's?
>
>
>


-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it
looks like once Kafka RDD is constructed the execution of that RDD is upto
the task scheduler and it can schedule the partitions based on the load on
nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to
the Kafka partitions host . So if Kafka and Spark are co hosted probably
this will work. If not, I am not sure how to get data locality for a
partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>



-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Spark DataFrame GroupBy into List

2015-10-13 Thread Rishitesh Mishra
Hi Liu,
I could not see any operator on DataFrame which will give the desired
result . DataFrame APIs as expected works on Row format and a fixed set of
operators on them.
However you can achive the desired result by accessing the internal RDD as
below..

val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2))
val rdd = testSparkContext.parallelize(s)
val df = snc.createDataFrame(rdd)
val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1

val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) }

val rdd3 = rdd1.reduceByKey(reduceF)
rdd3.foreach(r => println(r))



You can always reconvert the obtained RDD after tranformation and
reduce to a DataFrame.


Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile

On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liu <sliznmail...@gmail.com> wrote:

> Hey Spark users,
>
> I'm trying to group by a dataframe, by appending occurrences into a list
> instead of count.
>
> Let's say we have a dataframe as shown below:
>
> | category | id |
> |  |:--:|
> | A| 1  |
> | A| 2  |
> | B| 3  |
> | B| 4  |
> | C| 5  |
>
> ideally, after some magic group by (reverse explode?):
>
> | category | id_list  |
> |  |  |
> | A| 1,2  |
> | B| 3,4  |
> | C| 5|
>
> any tricks to achieve that? Scala Spark API is preferred. =D
>
> BR,
> Todd Leo
>
>
>
>


--


Re: JdbcRDD Constructor

2015-09-23 Thread Rishitesh Mishra
Which version of Spark you are using ??  I can get correct results using
JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite) .
I changed according to your input and got correct results from this test
suite.

On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j  wrote:

> HI All,
>
> JdbcRDD constructor has following parameters,
>
> *JdbcRDD
> *
> (SparkContext
> 
>  sc,
> scala.Function0 getConnection, String sql, *long 
> lowerBound,
> long upperBound, int numPartitions*, scala.Function1 >
>  mapRow,
> scala.reflect.ClassTag 
> > evidence$1)
>
> where the below parameters *lowerBound* refers to Lower boundary of
> entire data, *upperBound *refers to Upper boundary of entire data and 
> *numPartitions
> *refer to Number of partitions
>
> Source table to which JbdcRDD is fetching data from Oracle DB has more
> than 500 records but its confusing when I tried several executions by
> changing "numPartitions" parameter
>
> LowerBound,UpperBound,numPartitions: Output Count
>
> 0 ,100  ,1   : 100
>
> 0 ,100  ,2   : 151
>
> 0 ,100  ,3   : 201
>
>
> Please help me in understanding the why Output count is 151 if
> numPartitions is 2 and Output count is 201 if numPartitions is 3
>
> Regards,
>
> Satish Chandra
>


Re: JdbcRDD Constructor

2015-09-23 Thread Rishitesh Mishra
I am using Spark 1.5. I always get count = 100, irrespective of num
partitions.

On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> HI,
> Currently using Spark 1.2.2, could you please let me know correct results
> output count which you got it by using JdbcRDDSuite
>
> Regards,
> Satish Chandra
>
> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
> rishi80.mis...@gmail.com> wrote:
>
>> Which version of Spark you are using ??  I can get correct results using
>> JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite)
>> .
>> I changed according to your input and got correct results from this test
>> suite.
>>
>> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> JdbcRDD constructor has following parameters,
>>>
>>> *JdbcRDD
>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html#JdbcRDD(org.apache.spark.SparkContext,%20scala.Function0,%20java.lang.String,%20long,%20long,%20int,%20scala.Function1,%20scala.reflect.ClassTag)>*
>>> (SparkContext
>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/SparkContext.html>
>>>  sc,
>>> scala.Function0 getConnection, String sql, *long 
>>> lowerBound,
>>> long upperBound, int numPartitions*, scala.Function1<java.sql.ResultSet,
>>> T
>>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>>
>>>  mapRow,
>>> scala.reflect.ClassTag>> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>
>>> > evidence$1)
>>>
>>> where the below parameters *lowerBound* refers to Lower boundary of
>>> entire data, *upperBound *refers to Upper boundary of entire data and 
>>> *numPartitions
>>> *refer to Number of partitions
>>>
>>> Source table to which JbdcRDD is fetching data from Oracle DB has more
>>> than 500 records but its confusing when I tried several executions by
>>> changing "numPartitions" parameter
>>>
>>> LowerBound,UpperBound,numPartitions: Output Count
>>>
>>> 0 ,100  ,1   : 100
>>>
>>> 0 ,100  ,2   : 151
>>>
>>> 0 ,100  ,3   : 201
>>>
>>>
>>> Please help me in understanding the why Output count is 151 if
>>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>>
>>> Regards,
>>>
>>> Satish Chandra
>>>
>>
>>
>


Re: in joins, does one side stream?

2015-09-20 Thread Rishitesh Mishra
Got it..thnx Reynold..
On 20 Sep 2015 07:08, "Reynold Xin" <r...@databricks.com> wrote:

> The RDDs themselves are not materialized, but the implementations can
> materialize.
>
> E.g. in cogroup (which is used by RDD.join), it materializes all the data
> during grouping.
>
> In SQL/DataFrame join, depending on the join:
>
> 1. For broadcast join, only the smaller side is materialized in memory as
> a hash table.
>
> 2. For sort-merge join, both sides are sorted & streamed through --
> however, one of the sides need to buffer all the rows having the same join
> key in order to perform the join.
>
>
>
> On Sat, Sep 19, 2015 at 12:55 PM, Rishitesh Mishra <
> rishi80.mis...@gmail.com> wrote:
>
>> Hi Reynold,
>> Can you please elaborate on this. I thought RDD also opens only an
>> iterator. Does it get materialized for joins?
>>
>> Rishi
>>
>> On Saturday, September 19, 2015, Reynold Xin <r...@databricks.com> wrote:
>>
>>> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
>>> streams.
>>>
>>>
>>> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> in scalding we join with the smaller side on the left, since the
>>>> smaller side will get buffered while the bigger side streams through the
>>>> join.
>>>>
>>>> looking at CoGroupedRDD i do not get the impression such a distiction
>>>> is made. it seems both sided are put into a map that can spill to disk. is
>>>> this correct?
>>>>
>>>> thanks
>>>>
>>>
>>>
>


Re: in joins, does one side stream?

2015-09-19 Thread Rishitesh Mishra
Hi Reynold,
Can you please elaborate on this. I thought RDD also opens only an
iterator. Does it get materialized for joins?

Rishi

On Saturday, September 19, 2015, Reynold Xin  wrote:

> Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
> streams.
>
>
> On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers  > wrote:
>
>> in scalding we join with the smaller side on the left, since the smaller
>> side will get buffered while the bigger side streams through the join.
>>
>> looking at CoGroupedRDD i do not get the impression such a distiction is
>> made. it seems both sided are put into a map that can spill to disk. is
>> this correct?
>>
>> thanks
>>
>
>


Re: RDD from partitions

2015-08-28 Thread Rishitesh Mishra
Hi Jem,
A simple way to get this is to use MapPartitionedRDD. Please see the below
code. For this you need to know your parent RDD's partition numbers that
you want to exclude.  One drawback here is the new RDD will also invoke
similar number of tasks as parent RDDs as both the RDDs have same number of
partitions. We only be excluding the results from certain partitions. If
you can live with that , then its OK.

val ones = sc.makeRDD(1 to 100, 10).map(x = x) // base RDD

// Reduced RDD
val reduced = ones.mapPartitions { iter = {

  new Iterator[Int](){
override def hasNext: Boolean = {
  if(Seq(0,1,2).contains(TaskContext.get().partitionId)) {
false
} else{
iter.hasNext
  }
}

override def next():Int = iter.next()
  }

}
}.collect().foreach(println)




On Fri, Aug 28, 2015 at 12:33 PM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 I am trying to create an RDD from a selected number of its parents
 partitions. My current approach is to create my own SelectedPartitionRDD
 and implement compute and numPartitions myself, problem is the compute
 method is marked as @developerApi, and hence unsuitable for me to be using
 in my application. Are there any alternative methods that will only use the
 stable parts of the spark API?

 Thanks,

 Jem



Re: Spark driver locality

2015-08-28 Thread Rishitesh Mishra
Hi Swapnil,

1. All the task scheduling/retry happens from Driver. So you are right that
a lot of communication happens from driver to cluster. It all depends on
the how you want to go about your Spark application, whether your
application has direct access to Spark cluster or its routed through a
gateway machine. Accordingly you can take your decision.

2. I am not familiar with NFS layer concurrency. But parallel reads should
be OK I think. Some one with the knowledge of NFS workings should correct
if I am wrong.


On Fri, Aug 28, 2015 at 1:12 AM, Swapnil Shinde swapnilushi...@gmail.com
wrote:

 Thanks Rishitesh !!
 1. I get that driver doesn't need to be on master but there is lot of
 communication between driver and cluster. That's why co-located gateway was
 recommended. How much is the impact of driver not being co-located with
 cluster?

 4. How does hdfs split get assigned to worker node to read data from
 remote hadoop cluster? I am more interested to know how mapr NFS layer is
 accessed in parallel.

 -
 Swapnil


 On Thu, Aug 27, 2015 at 2:53 PM, Rishitesh Mishra 
 rishi80.mis...@gmail.com wrote:

 Hi Swapnil,
 Let me try to answer some of the questions. Answers inline. Hope it helps.

 On Thursday, August 27, 2015, Swapnil Shinde swapnilushi...@gmail.com
 wrote:

 Hello
 I am new to spark world and started to explore recently in standalone
 mode. It would be great if I get clarifications on below doubts-

 1. Driver locality - It is mentioned in documentation that client
 deploy-mode is not good if machine running spark-submit is not co-located
 with worker machines. cluster mode is not available with standalone
 clusters. Therefore, do we have to submit all applications on master
 machine? (Assuming we don't have separate co-located gateway machine)


 No. In standalone mode also your master and driver machines can be
 different.

 Driver should have access to Master as well as worker machines.



 2. How does above driver locality work with spark shell running on local
 machine ?


 Spark shell itself acts as driver. This means your local machine should
 have access to all the cluster machines.


 3. I am little confused with role of driver program. Does driver do any
 computation in spark app life cycle? For instance, in simple row count app,
 worker node calculates local row counts. Does driver sum up local row
 counts? In short where does reduce phase runs in this case?


 Role of driver is to co-ordinate with cluster manager for initial
 resource allocation. After that it needs to schedule tasks to different
 executors assigned to it. It does not do any computation.(unless the
 application itself does something on its own ). Reduce phase is also a
 bunch of tasks, which gets assigned to one or more executors.


 4. In case of accessing hdfs data over network, do worker nodes read
 data in parallel? How does hdfs data over network get accessed in spark
 application?



 Yes. All worker will get a split to read. They read their own split in
 parallel.This means all worker nodes should have access to Hadoop file
 system.



 Sorry if these questions were already discussed..

 Thanks
 Swapnil





Re: Spark driver locality

2015-08-27 Thread Rishitesh Mishra
Hi Swapnil,
Let me try to answer some of the questions. Answers inline. Hope it helps.

On Thursday, August 27, 2015, Swapnil Shinde swapnilushi...@gmail.com
wrote:

 Hello
 I am new to spark world and started to explore recently in standalone
 mode. It would be great if I get clarifications on below doubts-

 1. Driver locality - It is mentioned in documentation that client
 deploy-mode is not good if machine running spark-submit is not co-located
 with worker machines. cluster mode is not available with standalone
 clusters. Therefore, do we have to submit all applications on master
 machine? (Assuming we don't have separate co-located gateway machine)


No. In standalone mode also your master and driver machines can be
different.

 Driver should have access to Master as well as worker machines.



 2. How does above driver locality work with spark shell running on local
 machine ?


Spark shell itself acts as driver. This means your local machine should
have access to all the cluster machines.


 3. I am little confused with role of driver program. Does driver do any
 computation in spark app life cycle? For instance, in simple row count app,
 worker node calculates local row counts. Does driver sum up local row
 counts? In short where does reduce phase runs in this case?


Role of driver is to co-ordinate with cluster manager for initial resource
allocation. After that it needs to schedule tasks to different executors
assigned to it. It does not do any computation.(unless the application
itself does something on its own ). Reduce phase is also a bunch of tasks,
which gets assigned to one or more executors.


 4. In case of accessing hdfs data over network, do worker nodes read data
 in parallel? How does hdfs data over network get accessed in spark
 application?



 Yes. All worker will get a split to read. They read their own split in
 parallel.This means all worker nodes should have access to Hadoop file
 system.



 Sorry if these questions were already discussed..

 Thanks
 Swapnil



Re: Spark streaming multi-tasking during I/O

2015-08-21 Thread Rishitesh Mishra
Hi Sateesh,
It is interesting to know , how did you determine that the Dstream runs on
a single core. Did you mean receivers?

Coming back to your question, could you not start disk io in a separate
thread, so that the sceduler can go ahead and assign other tasks ?
On 21 Aug 2015 16:06, Sateesh Kavuri sateesh.kav...@gmail.com wrote:

 Hi,

 My scenario goes like this:
 I have an algorithm running in Spark streaming mode on a 4 core virtual
 machine. Majority of the time, the algorithm does disk I/O and database
 I/O. Question is, during the I/O, where the CPU is not considerably loaded,
 is it possible to run any other task/thread so as to efficiently utilize
 the CPU?

 Note that one DStream of the algorithm runs completely on a single CPU

 Thank you,
 Sateesh



Re: How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Rishitesh Mishra
I am not sure if you can view all RDDs in a session. Tables are maintained
in a catalogue . Hence its easier. However  you can see the DAG
representation , which lists all the RDDs in a job , with Spark UI.
On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote:

 Apologies

 I accidentally included Spark User DL on BCC. The actual email message is
 below.
 =


 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all *dataframes/RDDs*
 that has been created in current session. Anyone knows if there is any such
 commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval





Subscribe

2015-08-17 Thread Rishitesh Mishra