Contributing to managed memory, Tungsten..

2016-03-10 Thread Jan Kotek
Hi,

I would like to help with optimizing Spark memory usage. I have some experience 
with offheap, managed memory etc. For example I modified Hazelcast to run with 
'-
Xmx128M' [1] and XAP from Gigaspaces uses my memory store. 

I already studied Spark code, read blogs, videos etc... But I have questions,  
about 
current situation, future direction and what would be the best way to 
contribute. 
Perhaps if some developer would spare a 30 minutes for a chat, and assign me 
some 
Issues for start.

[1]
https://github.com/jankotek/mapdb-hz-offheap[1] 

Thanks,
Jan Kotek
MapDB author


[1] https://github.com/jankotek/mapdb-hz-offheap


Re: Running ALS on comparitively large RDD

2016-03-10 Thread Deepak Gopalakrishnan
1. I'm using about 1 million users against few thousand products. I
basically have around a million ratings
2. Spark 1.6 on Amazon EMR

On Fri, Mar 11, 2016 at 12:46 PM, Nick Pentreath 
wrote:

> Could you provide more details about:
> 1. Data set size (# ratings, # users and # products)
> 2. Spark cluster set up and version
>
> Thanks
>
> On Fri, 11 Mar 2016 at 05:53 Deepak Gopalakrishnan 
> wrote:
>
>> Hello All,
>>
>> I've been running Spark's ALS on a dataset of users and rated items. I
>> first encode my users to integers by using an auto increment function (
>> just like zipWithIndex), I do the same for my items. I then create an RDD
>> of the ratings and feed it to ALS.
>>
>> My issue is that the ALS algorithm never completes. Attached is a
>> screenshot of the stages window.
>>
>> Any help will be greatly appreciated
>>
>> --
>> Regards,
>> *Deepak Gopalakrishnan*
>> *Mobile*:+918891509774
>> *Skype* : deepakgk87
>> http://myexps.blogspot.com
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


Re: Running ALS on comparitively large RDD

2016-03-10 Thread Nick Pentreath
Could you provide more details about:
1. Data set size (# ratings, # users and # products)
2. Spark cluster set up and version

Thanks

On Fri, 11 Mar 2016 at 05:53 Deepak Gopalakrishnan  wrote:

> Hello All,
>
> I've been running Spark's ALS on a dataset of users and rated items. I
> first encode my users to integers by using an auto increment function (
> just like zipWithIndex), I do the same for my items. I then create an RDD
> of the ratings and feed it to ALS.
>
> My issue is that the ALS algorithm never completes. Attached is a
> screenshot of the stages window.
>
> Any help will be greatly appreciated
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org


Understanding fault tolerance in shuffle operations

2016-03-10 Thread Matt Cheah
Hi everyone,

I have a question about the shuffle mechanisms in Spark and the fault-tolerance 
I should expect. Suppose I have a simple job with two stages  – something like 
rdd.textFile().mapToPair().reduceByKey().saveAsTextFile().

The questions I have are,

  1.  Suppose I’m not using the external shuffle service. I’m running the job. 
The first stage succeeds. During the second stage, one of the executors is lost 
(for the simplest case, someone uses kill –9 on it and the job itself should 
have no problems completing otherwise). Should I expect the job to be able to 
recover and complete successfully? My understanding is that the lost shuffle 
files from that executor can still be re-computed and the job should be able to 
complete successfully.
  2.  Suppose I’m using the shuffle service. How does this change the result of 
question #1?
  3.  Suppose I’m using the shuffle service, and I’m using standalone mode. The 
first stage succeeds. During the second stage, I kill both the executor and the 
worker that spawned that executor. Now that the shuffle files associated with 
that worker’s shuffle service daemon have been lost, will the job be able to 
recompute the lost shuffle data? This is the scenario I’m running into most, 
where my tasks fail because they try to reach the shuffle service instead of 
trying to recompute the lost shuffle files.

Thanks,

-Matt Cheah


[ANNOUNCE] Announcing Spark 1.6.1

2016-03-10 Thread Michael Armbrust
Spark 1.6.1 is a maintenance release containing stability fixes. This
release is based on the branch-1.6 maintenance branch of Spark. We
*strongly recommend* all 1.6.0 users to upgrade to this release.

Notable fixes include:
 - Workaround for OOM when writing large partitioned tables SPARK-12546

 - Several fixes to the experimental Dataset API - SPARK-12478
, SPARK-12696
, SPARK-13101
, SPARK-12932


The full list of bug fixes is here: http://s.apache.org/spark-1.6.1
http://spark.apache.org/releases/spark-release-1-6-1.html

(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links, but everything should be in maven
central already.  If you see any issues with the release notes or webpage
*please contact me directly, off-list*)


Re: DynamicPartitionKafkaRDD - 1:n mapping between kafka and RDD partition

2016-03-10 Thread Cody Koeninger
The central problem with doing anything like this is that you break
one of the basic guarantees of kafka, which is in-order processing on
a per-topicpartition basis.

As far as PRs go, because of the new consumer interface for kafka 0.9
and 0.10, there's a lot of potential change already underway.

See

https://issues.apache.org/jira/browse/SPARK-12177

On Thu, Mar 10, 2016 at 1:59 PM, Renyi Xiong  wrote:
> Hi TD,
>
> Thanks a lot for offering to look at our PR (if we fire one) at the
> conference NYC.
>
> As we discussed briefly the issues of unbalanced and under-distributed kafka
> partitions when developing Spark streaming application in Mobius (C# for
> Spark), we're trying the option of repartitioning within
> DirectKafkaInputDStream instead of DStream.repartiton API which introduces
> extra network cost and doesn't really solve the root cause.
>
> However, instead of firing a JIRA with PR directly, we decided to create a
> customized Kafka RDD / DStream (to start with and contribute back later if
> success) - DynamicPartitionKafkaRDD and DynamicPartitionKafkaInputDStream
> using inheritance model and expose a new API
> KafkaUtils.CreateDirectStreamWithRepartition with one more parameter -
> numPartitions (hint number of RDD partitions to create)
>
> it'll be great that you can take look at the code and share your comments:
>
> https://github.com/Microsoft/Mobius/tree/master/scala/src/main/org/apache/spark/streaming/api/kafka
>
> the major relevant change is in DynamicPartitionKafkaRDD.getPartitions where
> an average size of RDD partition is calculated first (total size of the
> topic divided by numPartitions) and used to split partitions (final RDD
> partitions will be greater or equal to numPartitions)
>
> there's a concern that Kafka partition[i] no longer maps to task[i] which
> might break existing application. here's our thinking:
>
> a. OffsetRanges in original implementation may have multiple topics meaning
> 'partition i maps to tasks i' is generally a false statement
>
> b. Even if only one topic is involved, partition sequence in offsetRanges
> comes from Kafka topic meta data response which doesn't necessary guarantee
> the sequence, even if it does, application should not take that dependency
>
> c. Topic partition split happens only when configured
>
>
> there're some other more complicated changes related to fault tolerance
> which are irrelevant here (but you're more than welcome to comment on them
> too) and are introduced to unblock the scenarios we're experiencing on a
> daily basis.
>
> 1. temporally redirect kafka read to C# worker by passing metadata instead
> of actual kafka messages to it, in C# worker, a C# version of kafka client
> is used which enables much easier debugging
>
> 2. bypass metadata request exceptions on driver side and let next batch
> retry
>
> 3. bypass some read errors on worker side
>
>
> Note all above are at very early stage, your comments will be much valuable
> and  appreciated.
>
>
> Thanks a lot,
>
> Reny.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



DynamicPartitionKafkaRDD - 1:n mapping between kafka and RDD partition

2016-03-10 Thread Renyi Xiong
Hi TD,

Thanks a lot for offering to look at our PR (if we fire one) at the
conference NYC.

As we discussed briefly the issues of unbalanced and
under-distributed kafka partitions when developing Spark streaming
application in Mobius (C# for Spark), we're trying the option of
repartitioning within DirectKafkaInputDStream instead of DStream.repartiton
API which introduces extra network cost and doesn't really solve the root
cause.

However, instead of firing a JIRA with PR directly, we decided to create a
customized Kafka RDD / DStream (to start with and contribute back later if
success) - DynamicPartitionKafkaRDD and DynamicPartitionKafkaInputDStream
using inheritance model and expose a new API
KafkaUtils.CreateDirectStreamWithRepartition with one more parameter -
numPartitions (hint number of RDD partitions to create)

it'll be great that you can take look at the code and share your comments:

https://github.com/Microsoft/Mobius/tree/master/scala/src/main/org/apache/spark/streaming/api/kafka

the major relevant change is in DynamicPartitionKafkaRDD.getPartitions
where an average size of RDD partition is calculated first (total size of
the topic divided by numPartitions) and used to split partitions (final RDD
partitions will be greater or equal to numPartitions)

there's a concern that Kafka partition[i] no longer maps to task[i] which
might break existing application. here's our thinking:

a. OffsetRanges in original implementation may have multiple topics meaning
'partition i maps to tasks i' is generally a false statement

b. Even if only one topic is involved, partition sequence in offsetRanges
comes from Kafka topic meta data response which doesn't necessary guarantee
the sequence, even if it does, application should not take that dependency

c. Topic partition split happens only when configured


there're some other more complicated changes related to fault
tolerance which are irrelevant here (but you're more than welcome to
comment on them too) and are introduced to unblock the scenarios we're
experiencing on a daily basis.

1. temporally redirect kafka read to C# worker by passing metadata instead
of actual kafka messages to it, in C# worker, a C# version of kafka client
is used which enables much easier debugging

2. bypass metadata request exceptions on driver side and let next batch
retry

3. bypass some read errors on worker side


Note all above are at very early stage, your comments will be much valuable
and  appreciated.


Thanks a lot,

Reny.


Re: submissionTime vs batchTime, DirectKafka

2016-03-10 Thread Sachin Aggarwal
hi

can this be considered a lag in processing of events?
should we report this as delay.

On Thu, Mar 10, 2016 at 10:51 AM, Mario Ds Briggs 
wrote:

> Look at
> org.apache.spark.streaming.scheduler.JobGenerator
>
> it has a RecurringTimer (timer) that will simply post 'JobGenerate'
> events to a EventLoop at the batchInterval time.
>
> This EventLoop's thread then picks up these events, uses the
> streamingContext.graph' to generate a Job (InputDstream's compute method).
> batchInfo.submissionTime is the time recorded after this generateJob
> completes. The Job is then sent to the or
> g.apache.spark.streaming.scheduler.JobScheduler who has a
> ThreadExecutorPool to execute the Job.
>
> JobGenerate events are not the only event that gets posted to the
> JobGenerator.eventLoop. Other events are like DoCheckpoint,
> ClearCheckpointData, ClearMetadata are also posted and all these events
> are serviced by the EventLoop's single thread. So for instance if a
> DoCheckPoint, ClearCheckpointData and ClearMetadata events are queued
> before your nth JobGenerate event, then there will be a time difference
> between the batchTime and SubmissionTime for that nth batch
>
>
> thanks
> Mario
>
>
>
>
>
>
> On Thu, Mar 10, 2016 at 10:29 AM, Sachin Aggarwal <
> *different.sac...@gmail.com* > wrote:
>
>Hi cody,
>
>let me try once again to explain with example.
>
>In BatchInfo class of spark "scheduling delay" is defined as
>
>*def **schedulingDelay: Option[Long] = processingStartTime.map(_ -
>submissionTime)*
>
>I am dumping batchinfo object in my LatencyListener which
>extends StreamingListener.
>batchTime = 1457424695400 ms
>submissionTime = 1457425630780 ms
>difference = 935380 ms
>
>can this be considered a lag in processing of events . what is
>possible explaination for this lag?
>
>On Thu, Mar 10, 2016 at 12:22 AM, Cody Koeninger <*c...@koeninger.org*
>> wrote:
>I'm really not sure what you're asking.
>
>On Wed, Mar 9, 2016 at 12:43 PM, Sachin Aggarwal
><*different.sac...@gmail.com* > wrote:
>> where are we capturing this delay?
>> I am aware of scheduling delay which is defined as processing
>> time-submission time not the batch create time
>>
>> On Wed, Mar 9, 2016 at 10:46 PM, Cody Koeninger <*c...@koeninger.org*
>> wrote:
>>>
>>> Spark streaming by default will not start processing a batch until
>the
>>> current batch is finished.  So if your processing time is larger
>than
>>> your batch time, delays will build up.
>>>
>>> On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
>>> <*different.sac...@gmail.com* > wrote:
>>> > Hi All,
>>> >
>>> > we have batchTime and submissionTime.
>>> >
>>> > @param batchTime   Time of the batch
>>> >
>>> > @param submissionTime  Clock time of when jobs of this batch was
>>> > submitted
>>> > to the streaming scheduler queue
>>> >
>>> > 1) we are seeing difference between batchTime and submissionTime
>for
>>> > small
>>> > batches(300ms) even in minutes for direct kafka this we see, only
>when
>>> > the
>>> > processing time is more than the batch interval. how can we
>explain this
>>> > delay??
>>> >
>>> > 2) In one of case batch processing time is more then batch
>interval,
>>> > then
>>> > will spark fetch the next batch data from kafka parallelly
>processing
>>> > the
>>> > current batch or it will wait for current batch to finish first ?
>>> >
>>> > I would be thankful if you give me some pointers
>>> >
>>> > Thanks!
>>> > --
>>> >
>>> > Thanks & Regards
>>> >
>>> > Sachin Aggarwal
>>> > *7760502772* <7760502772>
>>
>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sachin Aggarwal
>> *7760502772* <7760502772>
>
>
>
>--
>
>Thanks & Regards
>
>Sachin Aggarwal
>*7760502772* <7760502772>
>
>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>
>
>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772


Re: dataframe.groupby.agg vs sql("select from groupby)")

2016-03-10 Thread Reynold Xin
They should be identical. Can you paste the detailed explain output.

On Thursday, March 10, 2016, FangFang Chen 
wrote:

> hi,
> Based on my testing, the memory cost is very different for
> 1. sql("select * from ...").groupby.agg
> 2. sql("select ... From ... Groupby ...").
>
> For table.partition sized more than 500g, 2# run good, while outofmemory
> happened in 1#. I am using the same spark configurations.
> Could somebody tell why this happened?
>
> 发自 网易邮箱大师 
>
>
>


dataframe.groupby.agg vs sql("select from groupby)")

2016-03-10 Thread FangFang Chen
hi,
Based on my testing, the memory cost is very different for 
1. sql("select * from ...").groupby.agg 
2. sql("select ... From ... Groupby ...").


For table.partition sized more than 500g, 2# run good, while outofmemory 
happened in 1#. I am using the same spark configurations.
Could somebody tell why this happened? 


发自 网易邮箱大师