Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors

2017-10-25 Thread Juan Rodríguez Hortalá
Hi,

I opened https://issues.apache.org/jira/browse/SPARK-22339 some days ago,
and I would like to get some feedback on that. The idea is pushing epoch
updates to the executors after a fetch failure by piggybacking on the
executor heartbeat response, in order to fail faster when an executor and
their shuffle blocks are lost, instead of having to wait for all fetch
retries to fail and a new task to be started on the reader executors. This
can speed up job execution, particularly when executors are lost at the end
of an stage in a Spark application with a single action at a time.There are
more details and a draft patch for this in the JIRA.

Looking forward for your feedback on this.

Greetings,

Juan


(SPARK-22148) TaskSetManager.abortIfCompletelyBlacklisted should not abort when all current executors are blacklisted but dynamic allocation is enabled

2017-10-24 Thread Juan Rodríguez Hortalá
Hi,

I've been working on this issue, and I would like to get your feedback on
the following approach. The idea is that instead of failing in
`TaskSetManager.abortIfCompletelyBlacklisted`, when a task cannot be
scheduled in any executor but dynamic allocation is enabled, we will
register this task with `ExecutorAllocationManager`. Then
`ExecutorAllocationManager` will request additional executors for these
"unscheduleable tasks" by increasing the value returned in
`ExecutorAllocationManager.maxNumExecutorsNeeded`. This way we are counting
these tasks twice, but this makes sense because the current executors don't
have any slot for these tasks, so we actually want to get new executors
that are able to run these tasks. To avoid a deadlock due to tasks being
unscheduleable forever, we store the timestamp when a task was registered
as unscheduleable, and in `ExecutorAllocationManager.schedule` we abort the
application if there is some task that has been unscheduleable for a
configurable age threshold. This way we give an opportunity to dynamic
allocation to get more executors that are able to run the tasks, but we
don't make the application wait forever.

Attached to the JIRA is a patch with a draft for this approach. Looking
forward to your feedback on this.


Re: Graceful node decommission mechanism for Spark

2017-10-20 Thread Juan Rodríguez Hortalá
Hi,

Are there any comments or suggestions regarding this proposal?

Thanks,

Juan


On Mon, Oct 16, 2017 at 10:27 AM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi all,
>
> I have a prototype for "Keep track of nodes which are going to be shut
> down & avoid scheduling new tasks" (https://issues.apache.
> org/jira/browse/SPARK-20628) that I would like to discuss with the
> community. I added a WIP PR for that in https://github.com/apache/
> spark/pull/19267. The basic idea is implementing a mechanism similar to
> YARN's graceful decommission, but for Spark. There is a design document for
> this in https://github.com/apache/spark/files/1349653/Spark_
> Blacklisting_on_decommissioning-Scope.pdf. I would like to know the
> opinion of the list on this approach.
>
> *More details about this proposal*
> In the PR we define a HostState type to represent the state of the cluster
> nodes, and take actions in 
> CoarseGrainedSchedulerBackend.handleUpdatedHostState
> when a node transitions into a state where the node becomes partially or
> totally unavailable. Just like in YARN or Mesos, we propose a decommission
> mechanism with 2 phases, first a drain phase where the node is still
> running but not accepting further work (DECOMMISSIONING in YARN, and DRAIN
> in Mesos), followed by a second phase where executors in the node are
> forcibly shut down (DECOMMISIONED in YARN, and DOWN in Mesos). In this PR
> we focus only in YARN, and in the actions when the node transitions into
> DECOMMISSIONING state: blacklisting the node when it transitions to
> DECOMMISSIONING, and un-blacklist the node when it gets back to the normal
> healthy RUNNING state.
> The decommissioning process would not be initiated by Spark, but by an
> operator or an automated system (e.g. the cloud environment where YARN is
> running), on response to some relevant event (e.g. a cluster resize event),
> and it would consist on calling the YARN administrative command yarn
> rmadmin -refreshNodes -g for the affected node. Spark would just react to
> the node state transition events it receives from the cluster manager.
> To make this extensible to other cluster managers besides YARN, we define
> the HostState type in Spark, and keep the interaction with the specifics of
> each cluster manager into the corresponding packages. For example for YARN,
> when YarnAllocator gets a node state transition event, it converts the node
> event from the YARN specific NodeState into HostState, wraps it into a
> HostStatusUpdate message, and sends it to the
> CoarseGrainedSchedulerBackend, that then performs the required actions for
> that node.
>
> This code works on a modified version of Hadoop 2.7.3 with patches to
> support YARN-4676 (basic graceful decommission), and an approximation to
> YARN-3224 (when a node transitions into DECOMMISSIONING state the resource
> manager notifies that to each relevant application master by adding it to
> the list of updated nodes available in the AllocateResponse returned by the
> RM as a response to the AM heartbeat). For these reasons, this code won't
> work as-is on vanilla Hadoop. The main problem is that the decommissioning
> mechanism for YARN is not completely implemented (see YARN-914), and some
> of the parts that are implemented are only available for YARN 2.9.0 (see
> YARN-4676). To cope with this, we propose implementing an administrative
> command to send node transitions directly to the Spark driver, as
> HostStatusUpdate messages addressed to the CoarseGrainedSchedulerBackend.
> This command would be similar to the yarn rmadmin -refreshNodes -g, which
> is currently used for decommissioning nodes in YARN. When YARN-914 is
> complete, this could still be used as a secondary interface for
> decommissioning nodes, so nodes transitions could be signaled either by the
> cluster manager, or using the administrative command (either manually or
> through some automation implemented by the cloud environment).
>
> We would like to get some feedback on this approach in general, and in the
> administrative command solution in particular. If that sounds good, then we
> will work on modifying this PR so it works on vanilla Hadoop 2.7, and to
> implement the administrative command.
>
> Thanks,
>
> Juan Rodriguez Hortala
>


Graceful node decommission mechanism for Spark

2017-10-16 Thread Juan Rodríguez Hortalá
Hi all,

I have a prototype for "Keep track of nodes which are going to be shut down
& avoid scheduling new tasks" (
https://issues.apache.org/jira/browse/SPARK-20628) that I would like to
discuss with the community. I added a WIP PR for that in
https://github.com/apache/spark/pull/19267. The basic idea is implementing
a mechanism similar to YARN's graceful decommission, but for Spark. There
is a design document for this in
https://github.com/apache/spark/files/1349653/Spark_Blacklisting_on_decommissioning-Scope.pdf.
I would like to know the opinion of the list on this approach.

*More details about this proposal*
In the PR we define a HostState type to represent the state of the cluster
nodes, and take actions in
CoarseGrainedSchedulerBackend.handleUpdatedHostState when a node
transitions into a state where the node becomes partially or totally
unavailable. Just like in YARN or Mesos, we propose a decommission
mechanism with 2 phases, first a drain phase where the node is still
running but not accepting further work (DECOMMISSIONING in YARN, and DRAIN
in Mesos), followed by a second phase where executors in the node are
forcibly shut down (DECOMMISIONED in YARN, and DOWN in Mesos). In this PR
we focus only in YARN, and in the actions when the node transitions into
DECOMMISSIONING state: blacklisting the node when it transitions to
DECOMMISSIONING, and un-blacklist the node when it gets back to the normal
healthy RUNNING state.
The decommissioning process would not be initiated by Spark, but by an
operator or an automated system (e.g. the cloud environment where YARN is
running), on response to some relevant event (e.g. a cluster resize event),
and it would consist on calling the YARN administrative command yarn
rmadmin -refreshNodes -g for the affected node. Spark would just react to
the node state transition events it receives from the cluster manager.
To make this extensible to other cluster managers besides YARN, we define
the HostState type in Spark, and keep the interaction with the specifics of
each cluster manager into the corresponding packages. For example for YARN,
when YarnAllocator gets a node state transition event, it converts the node
event from the YARN specific NodeState into HostState, wraps it into a
HostStatusUpdate message, and sends it to the
CoarseGrainedSchedulerBackend, that then performs the required actions for
that node.

This code works on a modified version of Hadoop 2.7.3 with patches to
support YARN-4676 (basic graceful decommission), and an approximation to
YARN-3224 (when a node transitions into DECOMMISSIONING state the resource
manager notifies that to each relevant application master by adding it to
the list of updated nodes available in the AllocateResponse returned by the
RM as a response to the AM heartbeat). For these reasons, this code won't
work as-is on vanilla Hadoop. The main problem is that the decommissioning
mechanism for YARN is not completely implemented (see YARN-914), and some
of the parts that are implemented are only available for YARN 2.9.0 (see
YARN-4676). To cope with this, we propose implementing an administrative
command to send node transitions directly to the Spark driver, as
HostStatusUpdate messages addressed to the CoarseGrainedSchedulerBackend.
This command would be similar to the yarn rmadmin -refreshNodes -g, which
is currently used for decommissioning nodes in YARN. When YARN-914 is
complete, this could still be used as a secondary interface for
decommissioning nodes, so nodes transitions could be signaled either by the
cluster manager, or using the administrative command (either manually or
through some automation implemented by the cloud environment).

We would like to get some feedback on this approach in general, and in the
administrative command solution in particular. If that sounds good, then we
will work on modifying this PR so it works on vanilla Hadoop 2.7, and to
implement the administrative command.

Thanks,

Juan Rodriguez Hortala


Re: RDD API patterns

2015-09-19 Thread Juan Rodríguez Hortalá
Hi Sim,

I understand that what you propose is defining a trait SparkIterable (and
also PairSparkIterable for RDDs of pairs) that encapsulates the methods in
RDDs, and then program using that trait instead of RDD. That is similar to
programming using scala.collection.GenSeq to abstract from using a
sequential or parallel Seq. This new trait SparkIterable would be needed to
cover methods in RDDs that are not present in GenSeq and other standard
traits. I understand you suggest implementing it using wrapper classes and
implicit conversions, like in PairRDDFunctions, in order to see both RDD,
Iterable and other classes as SparkIterable. That reminds me of type
classes
http://danielwestheide.com/blog/2013/02/06/the-neophytes-guide-to-scala-part-12-type-classes.html,
which could be a similar approach. I think it would be interesting to know
if some standard type classes like for example those in
https://non.github.io/cats//typeclasses.html could be of use here.

A downside I find in this approach is that it would be more difficult to
reason about the performance of programs, and to write them to obtain the
best performance, if we don't know whether a SparkIterable in a distributed
RDD or a node local collection, that for example might even be indexed. Or
we might avoid accessing a SparkIterable from a closure in a map because we
don't know if we are in the driver or in a worker. That could difficult the
development of efficient programs, but this is not very surprising because
the trade off because abstraction level and performance is always there in
programming anyway.

Anyway I find your idea very interesting, I think it could be developed
into a nice library

Greetings,

Juan




2015-09-18 14:55 GMT+02:00 sim :

> @debasish83, yes, there are many ways to optimize and work around the
> limitation of no nested RDDs. The point of this thread is to discuss the
> API
> patterns of Spark in order to make the platform more accessible to lots of
> developers solving interesting problems quickly. We can get API consistency
> without resorting to simulations of nested RDDs.
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-API-patterns-tp14116p14195.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: RDD API patterns

2015-09-16 Thread Juan Rodríguez Hortalá
Hi,

That reminds me to a previous discussion about splitting an RDD into
several RDDs
http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-split-into-multiple-RDDs-td11877.html.
There you can see a simple code to convert RDD[(K, V)] into Map[K, RDD[V]]
through several filters. On top of that maybe you could build an
abstraction that simulates nested RDDs, as a proof of concepts, forgetting
for now about performance. But the main problem I've found is that the
Spark scheduler gets stuck when you have a huge amount of very small RDDs,
or at least that is what happened several versions ago
http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccamassdj+bzv++cr44edv-cpchr-1x-a+y2vmtugwc0ux91f...@mail.gmail.com%3E

Just my two cents





2015-09-16 11:51 GMT+02:00 Aniket :

> I agree that this in issue but I am afraid supporting RDD nesting would be
> hard and perhaps would need rearchitecting Spark. For now, you may to use
> workarounds like storing each group in a separate file, process each file
> as separate RDD and finally merge results in a single RDD.
>
> I know its painful and I share the pain :)
>
> Thanks,
> Aniket
>
> On Tue, Sep 15, 2015, 5:06 AM sim [via Apache Spark Developers List] <[hidden
> email] > wrote:
>
>> I'd like to get some feedback on an API design issue pertaining to RDDs.
>>
>> The design goal to avoid RDD nesting, which I agree with, leads the
>> methods operating on subsets of an RDD (not necessarily partitions) to use
>> Iterable as an abstraction. The mapPartitions and groupBy* family of
>> methods are good examples. The problem with that API choice is that
>> developers often very quickly run out of the benefits of the RDD API,
>> independent of partitioning.
>>
>> Consider two very simple problems that demonstrate the issue. The input
>> is the same for all: an RDD of integers that has been grouped into odd and
>> even.
>>
>> 1. Sample the odds at 10% and the evens at 20%. Trivial, as stratified
>> sampling (sampleByKey) is built into PairRDDFunctions.
>>
>> 2. Sample at 10% if there are more than 1,000 elements in a group and at
>> 20% otherwise. Suddenly, the problem becomes a lot less easy. The
>> sub-groups are no longer RDDs and we can't use the RDD sampling API.
>>
>> Note that the only reason the first problem is easy is because it was
>> part of Spark. If that hadn't happened, implementing it with the
>> higher-level API abstractions wouldn't have been easy. As more an more
>> people use Spark for ever more diverse sets of problems the likelihood that
>> the RDD APIs provide pre-existing high-level abstractions will diminish.
>>
>> How do you feel about this? Do you think it is desirable to lose all
>> high-level RDD API abstractions the very moment we group an RDD or call
>> mapPartitions? Does the goal of no nested RDDs mean there are absolutely no
>> high-level abstractions that we can expose via the Iterables borne of RDDs?
>>
>> I'd love your thoughts.
>>
>> /Sim
>> http://linkedin.com/in/simeons
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-API-patterns-tp14116.html
>> To start a new topic under Apache Spark Developers List, email [hidden
>> email] 
>> To unsubscribe from Apache Spark Developers List, click here.
>> NAML
>> 
>>
>
> --
> View this message in context: Re: RDD API patterns
> 
>
> Sent from the Apache Spark Developers List mailing list archive
>  at
> Nabble.com.
>


JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-15 Thread Juan Rodríguez Hortalá
Hi,

Sorry to insist, anyone has any thoughts on this? Or at least someone can
point me to a documentation of DStream.compute() so I can understand when I
should return None for a batch?

Thanks

Juan


2015-09-14 20:51 GMT+02:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi,
>
> I sent this message to the user list a few weeks ago with no luck, so I'm
> forwarding it to the dev list in case someone could give a hand with this.
> Thanks a lot in advance
>
>
> I've developed a ScalaCheck property for testing Spark Streaming
> transformations. To do that I had to develop a custom InputDStream, which
> is very similar to QueueInputDStream but has a method for adding new test
> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
> You can see the code at
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
> I have developed a few properties that run in local mode
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
> The problem is that when the batch interval is too small, and the machine
> cannot complete the batches fast enough, I get the following exceptions in
> the Spark log
>
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922500 ms
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>   

Fwd: JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-14 Thread Juan Rodríguez Hortalá
Hi,

I sent this message to the user list a few weeks ago with no luck, so I'm
forwarding it to the dev list in case someone could give a hand with this.
Thanks a lot in advance

I've developed a ScalaCheck property for testing Spark Streaming
transformations. To do that I had to develop a custom InputDStream, which
is very similar to QueueInputDStream but has a method for adding new test
cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
You can see the code at
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
I have developed a few properties that run in local mode
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
The problem is that when the batch interval is too small, and the machine
cannot complete the batches fast enough, I get the following exceptions in
the Spark log

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922500 ms
java.lang.NullPointerException
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at

Re: taking an n number of rows from and RDD starting from an index

2015-09-02 Thread Juan Rodríguez Hortalá
Hi,

Maybe you could use zipWithIndex and filter to skip the first elements. For
example starting from

scala> sc.parallelize(100 to 120, 4).zipWithIndex.collect
res12: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3),
(104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11),
(112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18),
(119,19), (120,20))

we can get the 3 first elements starting from the 4th (counting from 0) as

scala> sc.parallelize(100 to 120, 4).zipWithIndex.filter(_._2 >=4).take(3)
res14: Array[(Int, Long)] = Array((104,4), (105,5), (106,6))

Hope that helps


2015-09-02 8:52 GMT+02:00 Hemant Bhanawat :

> I think rdd.toLocalIterator is what you want. But it will keep one
> partition's data in-memory.
>
> On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera 
> wrote:
>
>> Hi all,
>>
>> I have a large set of data which would not fit into the memory. So, I wan
>> to take n number of data from the RDD given a particular index. for an
>> example, take 1000 rows starting from the index 1001.
>>
>> I see that there is a  take(num: Int): Array[T] method in the RDD, but it
>> only returns the 'first n number of rows'.
>>
>> the simplest use case of this, requirement is, say, I write a custom
>> relation provider with a custom relation extending the InsertableRelation.
>>
>> say I submit this query,
>> "insert into table abc select * from xyz sort by x asc"
>>
>> in my custom relation, I have implemented the def insert(data: DataFrame,
>> overwrite: Boolean): Unit
>> method. here, since the data is large, I can not call methods such as
>> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
>> As you could see, the resultant DF from the "select * from xyz sort by x
>> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
>> insert method, this sorted order would be affected, since the inserting
>> operation would be done in parallel in each partition.
>>
>> in order to handle this, my initial idea was to take rows from the RDD in
>> batches and do the insert operation, and for that I was looking for a
>> method to take n number of rows starting from a given index.
>>
>> is there any better way to handle this, in RDDs?
>>
>> your assistance in this regard is highly appreciated.
>>
>> cheers
>>
>> --
>> Niranda
>> @n1r44 
>> https://pythagoreanscript.wordpress.com/
>>
>
>


Re: Compact RDD representation

2015-07-20 Thread Juan Rodríguez Hortalá
Hi,

I'm not an authority in the Spark community, but what I would do is adding
the project to spark packages http://spark-packages.org/. In fact I think
this case is similar to IndexedRDD, which is also in spark packages
http://spark-packages.org/package/amplab/spark-indexedrdd

2015-07-19 21:49 GMT+02:00 Сергей Лихоман :

> Hi Juan,
>
> It's exactly what I meant. if we will have high load with many repetitions it
> can significantly reduce rdd size and improve performance. in real use
> cases application frequently need to enrich data from cache or external
> system, so we will save time on each repetition.
> I will also do some experiments.  About little repetitions: in what use
> cases we will lose efficiency? it will also test it.
> What I need to do this commitment? Just create ticket in Jira?
>
>
>
> 2015-07-19 21:56 GMT+03:00 Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi,
>>
>> My two cents is that that could be interesting if all RDD and pair
>> RDD operations would be lifted to work on groupedRDD. For example as
>> suggested a map on grouped RDDs would be more efficient if the original RDD
>> had lots of duplicate entries, but for RDDs with little repetitions I guess
>> you in fact lose efficiency. The same applies to filter, sortBy, count,
>> max, ... but for example I guess there is no gain for reduce and other
>> operations. Also note the order is lost when passing to grouped RDD, so the
>> semantics is not exactly the same, but would be good enough for
>> many applications. Also I would look for suitable use cases where RDD with
>> many repetitions arise naturally, and the transformations with performance
>> gain like map are used often, and I would do some experiments to compare
>> performance between a computation with grouped RDD and the same computation
>> without grouping, for different input sizes
>>
>>
>> El domingo, 19 de julio de 2015, Sandy Ryza 
>> escribió:
>>
>>> This functionality already basically exists in Spark.  To create the
>>> "grouped RDD", one can run:
>>>
>>>   val groupedRdd = rdd.reduceByKey(_ + _)
>>>
>>> To get it back into the original form:
>>>
>>>   groupedRdd.flatMap(x => List.fill(x._1)(x._2))
>>>
>>> -Sandy
>>>
>>> -Sandy
>>>
>>> On Sun, Jul 19, 2015 at 10:40 AM, Сергей Лихоман 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am looking for suitable issue for Master Degree project(it sounds
>>>> like scalability problems and improvements for spark streaming) and seems
>>>> like introduction of grouped RDD(for example: don't store
>>>> "Spark", "Spark", "Spark", instead store ("Spark", 3)) can:
>>>>
>>>> 1. Reduce memory needed for RDD (roughly, used memory will be:  % of
>>>> uniq messages)
>>>> 2. Improve performance(no need to apply function several times for the
>>>> same message).
>>>>
>>>> Can I create ticket and introduce API for grouped RDDs? Is it make
>>>> sense? Also I will be very appreciated for critic and ideas
>>>>
>>>
>>>
>


Re: Compact RDD representation

2015-07-19 Thread Juan Rodríguez Hortalá
Hi,

My two cents is that that could be interesting if all RDD and pair
RDD operations would be lifted to work on groupedRDD. For example as
suggested a map on grouped RDDs would be more efficient if the original RDD
had lots of duplicate entries, but for RDDs with little repetitions I guess
you in fact lose efficiency. The same applies to filter, sortBy, count,
max, ... but for example I guess there is no gain for reduce and other
operations. Also note the order is lost when passing to grouped RDD, so the
semantics is not exactly the same, but would be good enough for
many applications. Also I would look for suitable use cases where RDD with
many repetitions arise naturally, and the transformations with performance
gain like map are used often, and I would do some experiments to compare
performance between a computation with grouped RDD and the same computation
without grouping, for different input sizes


El domingo, 19 de julio de 2015, Sandy Ryza 
escribió:

> This functionality already basically exists in Spark.  To create the
> "grouped RDD", one can run:
>
>   val groupedRdd = rdd.reduceByKey(_ + _)
>
> To get it back into the original form:
>
>   groupedRdd.flatMap(x => List.fill(x._1)(x._2))
>
> -Sandy
>
> -Sandy
>
> On Sun, Jul 19, 2015 at 10:40 AM, Сергей Лихоман  > wrote:
>
>> Hi,
>>
>> I am looking for suitable issue for Master Degree project(it sounds like
>> scalability problems and improvements for spark streaming) and seems like
>> introduction of grouped RDD(for example: don't store
>> "Spark", "Spark", "Spark", instead store ("Spark", 3)) can:
>>
>> 1. Reduce memory needed for RDD (roughly, used memory will be:  % of uniq
>> messages)
>> 2. Improve performance(no need to apply function several times for the
>> same message).
>>
>> Can I create ticket and introduce API for grouped RDDs? Is it make sense?
>> Also I will be very appreciated for critic and ideas
>>
>
>


Re: how to implement my own datasource?

2015-06-25 Thread Juan Rodríguez Hortalá
Hi,

You can connect to by JDBC as described in
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases.
Other option is using HadoopRDD and NewHadoopRDD to connect to databases
compatible with Hadoop, like HBase, some examples can be found at chapter 5
of "Learning Spark"
https://books.google.es/books?id=tOptBgAAQBAJ&pg=PT190&dq=learning+spark+hadooprdd&hl=en&sa=X&ei=4bqLVaDaLsXaU46NgfgL&ved=0CCoQ6AEwAA#v=onepage&q=%20hadooprdd&f=false
For Spark Streaming see the section "Custom Sources" of
https://spark.apache.org/docs/latest/streaming-programming-guide.html

Hope that helps.

Greetings,

Juan

2015-06-25 8:25 GMT+02:00 诺铁 :

> hi,
>
> I can't find documentation about datasource api,  how to implement custom
> datasource.  any hint is appreciated.thanks.
>


Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-13 Thread Juan Rodríguez Hortalá
Perfect! I'll start working on it

2015-06-13 2:23 GMT+02:00 Amit Ramesh :

>
> Hi Juan,
>
> I have created a ticket for this:
> https://issues.apache.org/jira/browse/SPARK-8337
>
> Thanks!
> Amit
>
>
> On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> If you want I would be happy to work in this. I have worked with
>> KafkaUtils.createDirectStream before, in a pull request that wasn't
>> accepted https://github.com/apache/spark/pull/5367. I'm fluent with
>> Python and I'm starting to feel comfortable with Scala, so if someone opens
>> a JIRA I can take it.
>>
>> Greetings,
>>
>> Juan Rodriguez
>>
>>
>> 2015-06-12 15:59 GMT+02:00 Cody Koeninger :
>>
>>> The scala api has 2 ways of calling createDirectStream.  One of them
>>> allows you to pass a message handler that gets full access to the kafka
>>> MessageAndMetadata, including offset.
>>>
>>> I don't know why the python api was developed with only one way to call
>>> createDirectStream, but the first thing I'd look at would be adding that
>>> functionality back in.  If someone wants help creating a patch for that,
>>> just let me know.
>>>
>>> Dealing with offsets on a per-message basis may not be as efficient as
>>> dealing with them on a batch basis using the HasOffsetRanges interface...
>>> but if efficiency was a primary concern, you probably wouldn't be using
>>> Python anyway.
>>>
>>> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao 
>>> wrote:
>>>
>>>> Scala KafkaRDD uses a trait to handle this problem, but it is not so
>>>> easy and straightforward in Python, where we need to have a specific API to
>>>> handle this, I'm not sure is there any simple workaround to fix this, maybe
>>>> we should think carefully about it.
>>>>
>>>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh :
>>>>
>>>>>
>>>>> Thanks, Jerry. That's what I suspected based on the code I looked at.
>>>>> Any pointers on what is needed to build in this support would be great.
>>>>> This is critical to the project we are currently working on.
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao 
>>>>> wrote:
>>>>>
>>>>>> OK, I get it, I think currently Python based Kafka direct API do not
>>>>>> provide such equivalence like Scala, maybe we should figure out to add 
>>>>>> this
>>>>>> into Python API also.
>>>>>>
>>>>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh :
>>>>>>
>>>>>>>
>>>>>>> Hi Jerry,
>>>>>>>
>>>>>>> Take a look at this example:
>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2
>>>>>>>
>>>>>>> The offsets are needed because as RDDs get generated within spark
>>>>>>> the offsets move further along. With direct Kafka mode the current 
>>>>>>> offsets
>>>>>>> are no more persisted in Zookeeper but rather within Spark itself. If 
>>>>>>> you
>>>>>>> want to be able to use zookeeper based monitoring tools to keep track of
>>>>>>> progress, then this is needed.
>>>>>>>
>>>>>>> In my specific case we need to persist Kafka offsets externally so
>>>>>>> that we can continue from where we left off after a code deployment. In
>>>>>>> other words, we need exactly-once processing guarantees across code
>>>>>>> deployments. Spark does not support any state persistence across
>>>>>>> deployments so this is something we need to handle on our own.
>>>>>>>
>>>>>>> Hope that helps. Let me know if not.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Amit
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao <
>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> What is your meaning of getting the offsets from the RDD, from my
>>>>>>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, 
>>>>>>>> why
>>>>>>>> do you still want to get the one previous you set into?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh :
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Congratulations on the release of 1.4!
>>>>>>>>>
>>>>>>>>> I have been trying out the direct Kafka support in python but
>>>>>>>>> haven't been able to figure out how to get the offsets from the RDD. 
>>>>>>>>> Looks
>>>>>>>>> like the documentation is yet to be updated to include Python 
>>>>>>>>> examples (
>>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>>>>>>>>> I am specifically looking for the equivalent of
>>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
>>>>>>>>> I tried digging through the python code but could not find anything
>>>>>>>>> related. Any pointers would be greatly appreciated.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Amit
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Juan Rodríguez Hortalá
Hi,

If you want I would be happy to work in this. I have worked with
KafkaUtils.createDirectStream before, in a pull request that wasn't
accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python
and I'm starting to feel comfortable with Scala, so if someone opens a JIRA
I can take it.

Greetings,

Juan Rodriguez


2015-06-12 15:59 GMT+02:00 Cody Koeninger :

> The scala api has 2 ways of calling createDirectStream.  One of them
> allows you to pass a message handler that gets full access to the kafka
> MessageAndMetadata, including offset.
>
> I don't know why the python api was developed with only one way to call
> createDirectStream, but the first thing I'd look at would be adding that
> functionality back in.  If someone wants help creating a patch for that,
> just let me know.
>
> Dealing with offsets on a per-message basis may not be as efficient as
> dealing with them on a batch basis using the HasOffsetRanges interface...
> but if efficiency was a primary concern, you probably wouldn't be using
> Python anyway.
>
> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao 
> wrote:
>
>> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
>> and straightforward in Python, where we need to have a specific API to
>> handle this, I'm not sure is there any simple workaround to fix this, maybe
>> we should think carefully about it.
>>
>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh :
>>
>>>
>>> Thanks, Jerry. That's what I suspected based on the code I looked at.
>>> Any pointers on what is needed to build in this support would be great.
>>> This is critical to the project we are currently working on.
>>>
>>> Thanks!
>>>
>>>
>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao 
>>> wrote:
>>>
 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh :

>
> Hi Jerry,
>
> Take a look at this example:
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2
>
> The offsets are needed because as RDDs get generated within spark the
> offsets move further along. With direct Kafka mode the current offsets are
> no more persisted in Zookeeper but rather within Spark itself. If you want
> to be able to use zookeeper based monitoring tools to keep track of
> progress, then this is needed.
>
> In my specific case we need to persist Kafka offsets externally so
> that we can continue from where we left off after a code deployment. In
> other words, we need exactly-once processing guarantees across code
> deployments. Spark does not support any state persistence across
> deployments so this is something we need to handle on our own.
>
> Hope that helps. Let me know if not.
>
> Thanks!
> Amit
>
>
> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao 
> wrote:
>
>> Hi,
>>
>> What is your meaning of getting the offsets from the RDD, from my
>> understanding, the offsetRange is a parameter you offered to KafkaRDD, 
>> why
>> do you still want to get the one previous you set into?
>>
>> Thanks
>> Jerry
>>
>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh :
>>
>>>
>>> Congratulations on the release of 1.4!
>>>
>>> I have been trying out the direct Kafka support in python but
>>> haven't been able to figure out how to get the offsets from the RDD. 
>>> Looks
>>> like the documentation is yet to be updated to include Python examples (
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>>> I am specifically looking for the equivalent of
>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
>>> I tried digging through the python code but could not find anything
>>> related. Any pointers would be greatly appreciated.
>>>
>>> Thanks!
>>> Amit
>>>
>>>
>>
>

>>>
>>
>


Re: Creating topology in spark streaming

2015-05-06 Thread Juan Rodríguez Hortalá
Hi,

You can use the method repartition from DStream (for the Scala API) or
JavaDStream (for the Java API)

defrepartition(numPartitions: Int): DStream

[T]

Return a new DStream with an increased or decreased level of parallelism.
Each RDD in the returned DStream has exactly numPartitions partitions.

I think the post
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
on integration of Spark Streaming gives very interesting review on the
subject, although the integration with Kafka it's not up to date with
https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

Hope that helps.

Greetings,

Juan

2015-05-06 10:32 GMT+02:00 anshu shukla :

> But main problem is how to increase the level of parallelism  for any
> particular bolt logic .
>
> suppose i  want  this type of topology .
>
> https://storm.apache.org/documentation/images/topology.png
>
> How we can manage it .
>
> On Wed, May 6, 2015 at 1:36 PM, ayan guha  wrote:
>
>> Every transformation on a dstream will create another dstream. You may
>> want to take a look at foreachrdd? Also, kindly share your code so people
>> can help better
>> On 6 May 2015 17:54, "anshu shukla"  wrote:
>>
>>> Please help  guys, Even  After going through all the examples given i
>>> have not understood how to pass the  D-streams  from one bolt/logic to
>>> other (without writing it on HDFS etc.) just like emit function in storm .
>>> Suppose i have topology with 3  bolts(say)
>>>
>>> *BOLT1(parse the tweets nd emit tweet using given
>>> hashtags)=>Bolt2(Complex logic for sentiment analysis over
>>> tweets)===>BOLT3(submit tweets to the sql database using spark SQL)*
>>>
>>>
>>> Now  since Sentiment analysis will take most of the time ,we have to
>>> increase its level of parallelism for tuning latency. Howe to increase the
>>> levele of parallelism since the logic of topology is not clear .
>>>
>>> --
>>> Thanks & Regards,
>>> Anshu Shukla
>>> Indian Institute of Sciences
>>>
>>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: RDD split into multiple RDDs

2015-04-29 Thread Juan Rodríguez Hortalá
Hi Daniel,

I understood Sébastien was talking having having a high number of keys, I
guess I was prejudiced by my own problem! :) Anyway I don't think you need
to use disk or a database to generate a RDD per key, you can use filter
which I guess would be more efficient because IO is avoided, especially if
the RDD was cached. For example:

// in the spark shell
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._
import scala.reflect.ClassTag

// generate a map from key to rdd of values
def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K],
vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
val keys = pairRDD.keys.distinct.collect
(for (k <- keys) yield
k -> (pairRDD filter(_._1 == k) values)
) toMap
}

// simple demo
val xs = sc.parallelize(1 to 1000)
val ixs = xs map(x => (x % 10, x))
val gs = groupByKeyToRDDs(ixs)
gs(1).collect

Just an idea.

Greetings,

Juan Rodriguez



2015-04-29 14:20 GMT+02:00 Daniel Darabos 
:

> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method
> for saving the RDD into separate files by key in a single pass. Then you
> can read the files into separate RDDs.
>
> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi Sébastien,
>>
>> I came with a similar problem some time ago, you can see the discussion in
>> the Spark users mailing list at
>>
>> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
>> . My experience was that when you create too many RDDs the Spark scheduler
>> gets stuck, so if you have many keys in the map you are creating you'll
>> probably have problems. On the other hand, the latest example I proposed
>> in
>> that mailing thread was a batch job in which we start from a single RDD of
>> time tagged data, transform the RDD in a list of RDD corresponding to
>> generating windows according to the time tag of the records, and then
>> apply
>> a transformation of RDD to each window RDD, like for example KMeans.run of
>> MLlib. This is very similar to what you propose.
>> So in my humble opinion the approach of generating thousands of RDDs by
>> filtering doesn't work, and a new RDD class should be implemented for
>> this.
>> I have never implemented a custom RDD, but if you want some help I would
>> be
>> happy to join you in this task
>>
>
> Sebastien said nothing about thousands of keys. This is a valid problem
> even if you only have two different keys.
>
> Greetings,
>>
>> Juan
>>
>>
>>
>> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère > >:
>>
>> > Hello,
>> >
>> > I'm facing a problem with custom RDD transformations.
>> >
>> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
>> map
>> > of RDD by key.
>> >
>> > This would be great, for example, in order to process mllib clustering
>> on V
>> > values grouped by K.
>> >
>> > I know I could do it using filter() on my RDD as many times I have keys,
>> > but I'm afraid this would not be efficient (the entire RDD would be read
>> > each time, right ?). Then, I could mapByPartition my RDD before
>> filtering,
>> > but the code is finally huge...
>> >
>> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
>> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
>> only,
>> > but I cannot achieve my development.
>> >
>> > Please, could you tell me first if this is really faisable, and then,
>> could
>> > you give me some pointers ?
>> >
>> > Thank you,
>> > Regards,
>> > Sebastien
>> >
>>
>
>


Re: RDD split into multiple RDDs

2015-04-29 Thread Juan Rodríguez Hortalá
Hi Sébastien,

I came with a similar problem some time ago, you can see the discussion in
the Spark users mailing list at
http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
. My experience was that when you create too many RDDs the Spark scheduler
gets stuck, so if you have many keys in the map you are creating you'll
probably have problems. On the other hand, the latest example I proposed in
that mailing thread was a batch job in which we start from a single RDD of
time tagged data, transform the RDD in a list of RDD corresponding to
generating windows according to the time tag of the records, and then apply
a transformation of RDD to each window RDD, like for example KMeans.run of
MLlib. This is very similar to what you propose.
So in my humble opinion the approach of generating thousands of RDDs by
filtering doesn't work, and a new RDD class should be implemented for this.
I have never implemented a custom RDD, but if you want some help I would be
happy to join you in this task

Greetings,

Juan



2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère :

> Hello,
>
> I'm facing a problem with custom RDD transformations.
>
> I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a map
> of RDD by key.
>
> This would be great, for example, in order to process mllib clustering on V
> values grouped by K.
>
> I know I could do it using filter() on my RDD as many times I have keys,
> but I'm afraid this would not be efficient (the entire RDD would be read
> each time, right ?). Then, I could mapByPartition my RDD before filtering,
> but the code is finally huge...
>
> So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
> V]): Map[K, RDD[V]] method, which would iterate on the RDD once time only,
> but I cannot achieve my development.
>
> Please, could you tell me first if this is really faisable, and then, could
> you give me some pointers ?
>
> Thank you,
> Regards,
> Sebastien
>