Re: spark using two different versions of netty?

2016-10-10 Thread Paweł Szulc
Yeah, I should be more precise. Those are two direct dependencies.

On Mon, Oct 10, 2016 at 1:15 PM, Sean Owen <so...@cloudera.com> wrote:

> Usually this sort of thing happens because the two versions are in
> different namespaces in different major versions and both are needed. That
> is true of Netty: http://netty.io/wiki/new-and-noteworthy-in-4.0.html
> However, I see that Spark declares a direct dependency on both, when it
> does not use 3.x directly (and should not). The exception is in the Flume
> module, but that could be handled more narrowly. I will look into fixing
> this if applicable.
>
> On Mon, Oct 10, 2016 at 11:56 AM Paweł Szulc <paul.sz...@gmail.com> wrote:
>
>> Hi,
>>
>> quick question, why is Spark using two different versions of netty?:
>>
>>
>>- io.netty:netty-all:4.0.29.Final:jar
>>- io.netty:netty:3.8.0.Final:jar
>>
>>
>> ?
>>
>> --
>> Regards,
>> Paul Szulc
>>
>> twitter: @rabbitonweb
>> blog: www.rabbitonweb.com
>>
>


-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: Apache Spark Slack

2016-05-16 Thread Paweł Szulc
Just realized that people have to be invited to this thing. You see,
that's why Gitter is just simpler.

I will try to figure it out ASAP
16 maj 2016 15:40 "Paweł Szulc" <paul.sz...@gmail.com> napisał(a):

> I've just created this https://apache-spark.slack.com for ad-hoc
> communications within the comunity.
>
> Everybody's welcome!
>
> --
> Regards,
> Paul Szulc
>
> twitter: @rabbitonweb
> blog: www.rabbitonweb.com
>


Apache Spark Slack

2016-05-16 Thread Paweł Szulc
I've just created this https://apache-spark.slack.com for ad-hoc
communications within the comunity.

Everybody's welcome!

-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: apache spark on gitter?

2016-05-16 Thread Paweł Szulc
I've just created https://apache-spark.slack.com

On Thu, May 12, 2016 at 9:28 AM, Paweł Szulc <paul.sz...@gmail.com> wrote:

> Hi,
>
> well I guess the advantage of gitter over maling list is the same as with
> IRC. It's not actually a replacer because mailing list is also important.
> But it is lot easier to build a community around tool with ad-hoc ability
> to connect with each other.
>
> I have gitter running on constantly, I visit my favorite OSS projects on
> it from time to time to read what has recently happened. It allows me to
> stay in touch with the project, help fellow developers to with problems
> they have.
> One might argue that u can achive the same with mailing list, well it's
> hard for me to put this into words.. Malinig list is more of an async
> nature (which is good!) but some times you need more "real-time"
> experience. You know, engage in the conversation in the given moment, not
> conversation that might last few days :)
>
> TLDR: It is not a replacement, it's supplement to build the community
> around OSS. Worth having for real-time conversations.
>
> On Wed, May 11, 2016 at 10:24 PM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
>
>> Hi Pawel,
>>
>> I'd like to hear more about your idea. Could you explain more why you
>> would like to have a gitter channel? What are the advantages over a mailing
>> list (like this one)? Have you had good experiences using gitter on other
>> open source projects?
>>
>> Xinh
>>
>> On Wed, May 11, 2016 at 11:10 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> I don't know of a gitter channel and I don't use it myself, FWIW. I
>>> think anyone's welcome to start one.
>>>
>>> I hesitate to recommend this, simply because it's preferable to have
>>> one place for discussion rather than split it over several, and, we
>>> have to keep the @spark.apache.org mailing lists as the "forums of
>>> records" for project discussions.
>>>
>>> If something like gitter doesn't attract any chat, then it doesn't add
>>> any value. If it does though, then suddenly someone needs to subscribe
>>> to user@ and gitter to follow all of the conversations.
>>>
>>> I think there is a bit of a scalability problem on the user@ list at
>>> the moment, just because it covers all of Spark. But adding a
>>> different all-Spark channel doesn't help that.
>>>
>>> Anyway maybe that's "why"
>>>
>>>
>>> On Wed, May 11, 2016 at 6:26 PM, Paweł Szulc <paul.sz...@gmail.com>
>>> wrote:
>>> > no answer, but maybe one more time, a gitter channel for spark users
>>> would
>>> > be a good idea!
>>> >
>>> > On Mon, May 9, 2016 at 1:45 PM, Paweł Szulc <paul.sz...@gmail.com>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I was wondering - why Spark does not have a gitter channel?
>>> >>
>>> >> --
>>> >> Regards,
>>> >> Paul Szulc
>>> >>
>>> >> twitter: @rabbitonweb
>>> >> blog: www.rabbitonweb.com
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Paul Szulc
>>> >
>>> > twitter: @rabbitonweb
>>> > blog: www.rabbitonweb.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Regards,
> Paul Szulc
>
> twitter: @rabbitonweb
> blog: www.rabbitonweb.com
>



-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: apache spark on gitter?

2016-05-12 Thread Paweł Szulc
Hi,

well I guess the advantage of gitter over maling list is the same as with
IRC. It's not actually a replacer because mailing list is also important.
But it is lot easier to build a community around tool with ad-hoc ability
to connect with each other.

I have gitter running on constantly, I visit my favorite OSS projects on it
from time to time to read what has recently happened. It allows me to stay
in touch with the project, help fellow developers to with problems they
have.
One might argue that u can achive the same with mailing list, well it's
hard for me to put this into words.. Malinig list is more of an async
nature (which is good!) but some times you need more "real-time"
experience. You know, engage in the conversation in the given moment, not
conversation that might last few days :)

TLDR: It is not a replacement, it's supplement to build the community
around OSS. Worth having for real-time conversations.

On Wed, May 11, 2016 at 10:24 PM, Xinh Huynh <xinh.hu...@gmail.com> wrote:

> Hi Pawel,
>
> I'd like to hear more about your idea. Could you explain more why you
> would like to have a gitter channel? What are the advantages over a mailing
> list (like this one)? Have you had good experiences using gitter on other
> open source projects?
>
> Xinh
>
> On Wed, May 11, 2016 at 11:10 AM, Sean Owen <so...@cloudera.com> wrote:
>
>> I don't know of a gitter channel and I don't use it myself, FWIW. I
>> think anyone's welcome to start one.
>>
>> I hesitate to recommend this, simply because it's preferable to have
>> one place for discussion rather than split it over several, and, we
>> have to keep the @spark.apache.org mailing lists as the "forums of
>> records" for project discussions.
>>
>> If something like gitter doesn't attract any chat, then it doesn't add
>> any value. If it does though, then suddenly someone needs to subscribe
>> to user@ and gitter to follow all of the conversations.
>>
>> I think there is a bit of a scalability problem on the user@ list at
>> the moment, just because it covers all of Spark. But adding a
>> different all-Spark channel doesn't help that.
>>
>> Anyway maybe that's "why"
>>
>>
>> On Wed, May 11, 2016 at 6:26 PM, Paweł Szulc <paul.sz...@gmail.com>
>> wrote:
>> > no answer, but maybe one more time, a gitter channel for spark users
>> would
>> > be a good idea!
>> >
>> > On Mon, May 9, 2016 at 1:45 PM, Paweł Szulc <paul.sz...@gmail.com>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> I was wondering - why Spark does not have a gitter channel?
>> >>
>> >> --
>> >> Regards,
>> >> Paul Szulc
>> >>
>> >> twitter: @rabbitonweb
>> >> blog: www.rabbitonweb.com
>> >
>> >
>> >
>> >
>> > --
>> > Regards,
>> > Paul Szulc
>> >
>> > twitter: @rabbitonweb
>> > blog: www.rabbitonweb.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: apache spark on gitter?

2016-05-11 Thread Paweł Szulc
no answer, but maybe one more time, a gitter channel for spark users would
be a good idea!

On Mon, May 9, 2016 at 1:45 PM, Paweł Szulc <paul.sz...@gmail.com> wrote:

> Hi,
>
> I was wondering - why Spark does not have a gitter channel?
>
> --
> Regards,
> Paul Szulc
>
> twitter: @rabbitonweb
> blog: www.rabbitonweb.com
>



-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


apache spark on gitter?

2016-05-09 Thread Paweł Szulc
Hi,

I was wondering - why Spark does not have a gitter channel?

-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: mapValues Transformation (JavaPairRDD)

2015-12-15 Thread Paweł Szulc
Hard to imagine. Can you share a code sample?

On Tue, Dec 15, 2015 at 8:06 AM, Sushrut Ikhar 
wrote:

> Hi,
> I am finding it difficult to understand the following problem :
> I count the number of records before and after applying the mapValues
> transformation for a JavaPairRDD. As expected the number of records were
> same before and after.
>
> Now, I counted number of distinct keys before and after applying the
> mapValues transformation for the same JavaPairRDD. However, I get less
> count after applying the transformation. I expected mapValues will not
> change the keys. Then why am I getting lesser distinct keys? Note that -
> the total records are the same only distinct keys have dropped.
>
> using spark-1.4.1.
>
> Thanks in advance.
>
> Regards,
>
> Sushrut Ikhar
> [image: https://]about.me/sushrutikhar
> 
>
>



-- 
Regards,
Paul Szulc

twitter: @rabbitonweb
blog: www.rabbitonweb.com


Re: How Does aggregate work

2015-03-23 Thread Paweł Szulc
It is actually number of cores. If your processor has hyperthreading then
it will be more (number of processors your OS sees)

niedz., 22 mar 2015, 4:51 PM Ted Yu użytkownik yuzhih...@gmail.com
napisał:

 I assume spark.default.parallelism is 4 in the VM Ashish was using.

 Cheers



Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors, executor
 memory, and more importantly, number of tasks... *all three times it
 failed when approximately 75.1% of the tasks were completed (no matter how
 many tasks resulted from repartitioning the data in textfile(..., N))*.
 Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]




Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code

Re: Question about Spark best practice when counting records.

2015-02-27 Thread Paweł Szulc
Currently if you use accumulators inside actions (like foreach) you have
guarantee that, even if partition will be recalculated, the values will be
correct.  Same thing does NOT apply to transformations and you can not
relay 100% on the values.

Pawel Szulc

pt., 27 lut 2015, 4:54 PM Darin McBeath użytkownik
ddmcbe...@yahoo.com.invalid napisał:

 I have a fairly large Spark job where I'm essentially creating quite a few
 RDDs, do several types of joins using these RDDS resulting in a final RDD
 which I write back to S3.


 Along the way, I would like to capture record counts for some of these
 RDDs. My initial approach was to use the count action on some of these
 intermediate  RDDS (and cache them since the count would force the
 materialization of the RDD and the RDD would be needed again later).  This
 seemed to work 'ok' when my RDDs were fairly small/modest but as they grew
 in size I started to experience problems.

 After watching a recent very good screencast on performance, this doesn't
 seem the correct approach as I believe I'm really breaking (or hindering)
 the pipelining concept in Spark.  If I remove all of my  counts, I'm only
 left with the one job/action (save as Hadoop file at the end).  Spark then
 seems to run smoother (and quite a bit faster) and I really don't need (or
 want) to even cache any of my intermediate RDDs.

 So, the approach I've been kicking around is to use accumulators instead.
 I was already using them to count 'bad' records but why not 'good' records
 as well? I realize that if I lose a partition that I might over count, but
 perhaps that is an acceptable trade-off.

 I'm guessing that others have ran into this before so I would like to
 learn from the experience of others and how they have addressed this.

 Thanks.

 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: High CPU usage in Driver

2015-02-27 Thread Paweł Szulc
Thanks for coming back to the list  with response!

pt., 27 lut 2015, 3:16 PM Himanish Kushary użytkownik himan...@gmail.com
napisał:

 Hi,

 I was able to solve the issue. Putting down the settings that worked for
 me.

 1) It was happening due to the large number of partitions.I *coalesce*'d
 the RDD as early as possible in my code into lot less partitions ( used .
 coalesce(1) to bring down from 500K to 10k)

 2) Increased the settings for the parameters *spark.akka.frameSize (=
 500), **spark.akka.timeout,**spark.akka.askTimeout and 
 **spark.core.connection.ack.wait.timeout
 *to get rid of any insufficient frame size and timeout errors

 Thanks
 Himanish

 On Thu, Feb 26, 2015 at 5:00 PM, Himanish Kushary himan...@gmail.com
 wrote:

 Hi,

 I am working with a RDD (PairRDD) with 500K+ partitions. The RDD is
 loaded into memory , the size is around 18G.

 Whenever I run a distinct() on the RDD, the driver ( spark-shell in
 yarn-client mode) host CPU usage rockets up (400+ %) and the distinct()
 process seems to stall.The spark driver UI also hangs.

 In ganglia the only node with high load is the driver host. I have tried
 repartitioning the data into less number of partitions ( using coalesce or
 repartition) with no luck.

 I have attached the jstack output which shows few threads in BLOCKED
 status. Not sure what exactly is going on here.

 The driver program was started with 15G memory on AWS EMR. Appreciate any
 thoughts regarding the issue.

 --
 Thanks  Regards
 Himanish




 --
 Thanks  Regards
 Himanish



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Paweł Szulc
Correct me if I'm wrong, but he can actually run thus code without
broadcasting the users map,  however the code will be less efficient.

czw., 26 lut 2015, 12:31 PM Sean Owen użytkownik so...@cloudera.com
napisał:

 Yes, but there is no concept of executors 'deleting' an RDD. And you
 would want to broadcast the usersMap if you're using it this way.

 On Thu, Feb 26, 2015 at 11:26 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:
  One last time to be sure I got it right, the executing sequence here
  goes like this?:
 
  val usersMap = contacts.collectAsMap()
  #The contacts RDD is collected by the executors and sent to the
  driver, the executors delete the rdd
  contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()
  #The userMap object is sent again to the executors to run the code,
  and with the collect(), the result is sent again back to the driver
 
 
  2015-02-26 11:57 GMT+01:00 Sean Owen so...@cloudera.com:
  Yes, in that code, usersMap has been serialized to every executor.
  I thought you were referring to accessing the copy in the driver.
 
  On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:
  Isn't it contacts.map(v = (v._1, (usersMap(v._1), v._2))).collect()
  executed in the executors?  why is it executed in the driver?
  contacts are not a local object, right?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Paweł Szulc
Maybe you can omit using grouping all together with groupByKey? What is
your next step after grouping elements by key? Are you trying to reduce
values? If so then I would recommend using some reducing functions like for
example reduceByKey or aggregateByKey. Those will first reduce value for
each key locally on each node before doing actual IO over the network.
There will also be no grouping phase so you will not run into memory issues.

Please let me know if that helped

Pawel Szulc
@rabbitonweb
http://www.rabbitonweb.com


On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 I'm writing a Spark program where I want to divide a RDD into different
 groups, but the groups are too big to use groupByKey. To cope with that,
 since I know in advance the list of keys for each group, I build a map from
 the keys to the RDDs that result from filtering the input RDD to get the
 records for the corresponding key. This works when I have a small number of
 keys, but for big number of keys (tens of thousands) the execution gets
 stuck, without issuing any new Spark stage. I suspect the reason is that
 the Spark scheduler is not able to handle so many RDDs. Does it make sense?
 I'm rewriting the program to use a single RDD of pairs, with cached
 partions, but I wanted to be sure I understand the problem here.

 Thanks a lot in advance,

 Greetings,

 Juan Rodriguez



Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-15 Thread Paweł Szulc
Yes, this is what I also found in Spark documentation, that foreach can
have side effects. Nevertheless I have this weird error, that sometimes
files are just empty.

using is simply a wrapper that takes our code, makes try-catch-finally
and flush  close all resources.

I honestly have no clue what can possibly be wrong.

No errors in logs.

On Thu, Dec 11, 2014 at 2:29 PM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Yes, this is perfectly legal. This is what RDD.foreach() is for! You may
 be encountering an IO exception while writing, and maybe using() suppresses
 it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
 expect there is less that can go wrong with that simple call.

 On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 Imagine simple Spark job, that will store each line of the RDD to a
 separate file


 val lines = sc.parallelize(1 to 100).map(n = sthis is line $n)
 lines.foreach(line = writeToFile(line))

 def writeToFile(line: String) = {
 def filePath = file://...
 val file = new File(new URI(path).getPath)
 // using function simply closes the output stream
 using(new FileOutputStream(file)) { output =
   output.write(value)
 }
 }


 Now, example above works 99,9% of a time. Files are generated for each
 line, each file contains that particular line.

 However, when dealing with large number of data, we encounter situations
 where some of the files are empty! Files are generated, but there is no
 content inside of them (0 bytes).

 Now the question is: can Spark job have side effects. Is it even legal to
 write such code?
 If no, then what other choice do we have when we want to save data from
 our RDD?
 If yes, then do you guys see what could be the reason of this job acting
 in this strange manner 0.1% of the time?


 disclaimer: we are fully aware of .saveAsTextFile method in the API,
 however the example above is a simplification of our code - normally we
 produce PDF files.


 Best regards,
 Paweł Szulc










Can spark job have sideeffects (write files to FileSystem)

2014-12-11 Thread Paweł Szulc
Imagine simple Spark job, that will store each line of the RDD to a
separate file


val lines = sc.parallelize(1 to 100).map(n = sthis is line $n)
lines.foreach(line = writeToFile(line))

def writeToFile(line: String) = {
def filePath = file://...
val file = new File(new URI(path).getPath)
// using function simply closes the output stream
using(new FileOutputStream(file)) { output =
  output.write(value)
}
}


Now, example above works 99,9% of a time. Files are generated for each
line, each file contains that particular line.

However, when dealing with large number of data, we encounter situations
where some of the files are empty! Files are generated, but there is no
content inside of them (0 bytes).

Now the question is: can Spark job have side effects. Is it even legal to
write such code?
If no, then what other choice do we have when we want to save data from our
RDD?
If yes, then do you guys see what could be the reason of this job acting in
this strange manner 0.1% of the time?


disclaimer: we are fully aware of .saveAsTextFile method in the API,
however the example above is a simplification of our code - normally we
produce PDF files.


Best regards,
Paweł Szulc


multiple spark context in same driver program

2014-11-06 Thread Paweł Szulc
Hi,

quick question: I found this:
http://docs.sigmoidanalytics.com/index.php/Problems_and_their_Solutions#Multiple_SparkContext:Failed_to_bind_to:.2F127.0.1.1:45916

My main question: is this constrain still valid? AM I not allowed to have
two SparkContexts pointing to the same Spark Master in one driver program?


Regards,
Pawel Szulc


hi all

2014-10-16 Thread Paweł Szulc
Hi,

I just wanted to say hi all to the Spark community. I'm developing some
stuff right now using Spark (we've started very recently). As the API
documentation of Spark is really really good, I like to get deeper
knowledge of the internal stuff  -you know, the goodies. Watching movies
from Spark Summits helps, nevertheless I hope to learn a lot from reading
this mailing list.

Regrads,
Pawel Szulc


Re: reverse an rdd

2014-10-16 Thread Paweł Szulc
Just to have this clear, can you answer with quick yes or no:

Does it mean that when I create RDD from a file and I simply iterate
through it like this:

 sc.textFile(some_text_file.txt).foreach(line = println(line))

then the actual lines might come in different order then they are in the
file?

On Thu, Oct 16, 2014 at 9:13 PM, Sean Owen so...@cloudera.com wrote:

 Since you're concerned with the particular ordering, you will need to
 sort your RDD to ensure the ordering you have in mind. Simply reverse
 the Ordering with Ordering.reverse() and sort by that instead, and
 then use toLocalIterator() I suppose.

 Depending on what you're really trying to achieve, there may be a better
 way.

 On Thu, Oct 16, 2014 at 2:49 PM, ll duy.huynh@gmail.com wrote:
  hello... what is the best way to iterate through an rdd backward (last
  element first, first element last)?  thanks!
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: reverse an rdd

2014-10-16 Thread Paweł Szulc
Nevermind, I've just run the code in the REPL. Indeed if we do not sort,
then the order is totally random. Which actually makes sens if you think
about it


On Thu, Oct 16, 2014 at 9:58 PM, Paweł Szulc paul.sz...@gmail.com wrote:

 Just to have this clear, can you answer with quick yes or no:

 Does it mean that when I create RDD from a file and I simply iterate
 through it like this:

  sc.textFile(some_text_file.txt).foreach(line = println(line))

 then the actual lines might come in different order then they are in the
 file?

 On Thu, Oct 16, 2014 at 9:13 PM, Sean Owen so...@cloudera.com wrote:

 Since you're concerned with the particular ordering, you will need to
 sort your RDD to ensure the ordering you have in mind. Simply reverse
 the Ordering with Ordering.reverse() and sort by that instead, and
 then use toLocalIterator() I suppose.

 Depending on what you're really trying to achieve, there may be a better
 way.

 On Thu, Oct 16, 2014 at 2:49 PM, ll duy.huynh@gmail.com wrote:
  hello... what is the best way to iterate through an rdd backward (last
  element first, first element last)?  thanks!
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/reverse-an-rdd-tp16602.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org