Hello Dean Others,
Thanks for the response.
I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 and
800 executors. Each time all the tasks of join complete in less than a
minute except one and that one tasks runs forever. I have a huge cluster at
my disposal.
The data for each
IMHO If your data or your algorithm is prone to data skew, I think you have
to fix this from application level, Spark itself cannot overcome this
problem (if one key has large amount of values), you may change your
algorithm to choose another shuffle key, somethings like this to avoid
shuffle on
Hello Shao,
Can you talk more about shuffle key or point me to APIs that allow me to
change shuffle key. I will try with different keys and see the performance.
What is the shuffle key by default ?
On Mon, May 4, 2015 at 2:37 PM, Saisai Shao sai.sai.s...@gmail.com wrote:
IMHO If your data or
Shuffle key is depending on your implementation, I'm not sure if you are
familiar with MapReduce, the mapper output is a key-value pair, where the
key is the shuffle key for shuffling, Spark is also the same.
2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:
Hello Shao,
Can you
One dataset (RDD Pair)
val lstgItem = listings.map { lstg = (lstg.getItemId().toLong, lstg) }
Second Dataset (RDDPair)
val viEvents = viEventsRaw.map { vi = (vi.get(14).asInstanceOf[Long], vi) }
As i want to join based on item Id that is used as first element in the
tuple in both cases and i
Four tasks are now failing with
IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44
/ ExecutorLostFailure
(executor 114 lost)
From the symptoms you mentioned that one task's shuffle write is much
larger than all the other task, it is quite similar to normal data skew
behavior, I just give some advice based on your descriptions, I think you
need to detect whether data is actually skewed or not.
The shuffle will put data
I ran it against one file instead of 10 files and i see one task is still
running after 33 mins its shuffle read size is 780MB/50 mil records.
I did a count of records for each itemId from dataset-2 [One FILE] (Second
Dataset (RDDPair) val viEvents = viEventsRaw.map { vi = (vi.get(14
I tried this
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions
= 1200, rdd = viEvents)).map {
It fired two jobs and still i have 1 task that never completes.
IndexIDAttemptStatusLocality
Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10
files out of it and 10 records
*Item ID Count*
201335783004 3419 191568402102 1793 111654479898 1362 181503913062
1310 261798565828 1028 111654493548 994 231516683056 862
I don't know the full context of what you're doing, but serialization
errors usually mean you're attempting to serialize something that can't be
serialized, like the SparkContext. Kryo won't help there.
The arguments to spark-submit you posted previously look good:
2) --num-executors 96
How big is the data you're returning to the driver with collectAsMap? You
are probably running out of memory trying to copy too much data back to it.
If you're trying to force a map-side join, Spark can do that for you in
some cases within the regular DataFrame/RDD context. See
IMHO, you are trying waaay to hard to optimize work on what is really a
small data set. 25G, even 250G, is not that much data, especially if you've
spent a month trying to get something to work that should be simple. All
these errors are from optimization attempts.
Kryo is great, but if it's not
Hello Dean Others,
Thanks for your suggestions.
I have two data sets and all i want to do is a simple equi join. I have 10G
limit and as my dataset_1 exceeded that it was throwing OOM error. Hence i
switched back to use .join() API instead of map-side broadcast join.
I am repartitioning the data
Hello Deam,
If I don;t use Kryo serializer i got Serialization error and hence am using
it.
If I don';t use partitionBy/reparition then the simply join never completed
even after 7 hours and infact as next step i need to run it against 250G as
that is my full dataset size. Someone here suggested
You could try repartitioning your listings RDD, also doing a collectAsMap
would basically bring all your data to driver, in that case you might want
to set the storage level as Memory and disk not sure that will do any help
on the driver though.
Thanks
Best Regards
On Thu, Apr 30, 2015 at 11:10
You could try increasing your heap space explicitly. like export
_JAVA_OPTIONS=-Xmx10g, its not the correct approach but try.
Thanks
Best Regards
On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB
Did not work. Same problem.
On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:
You could try increasing your heap space explicitly. like export
_JAVA_OPTIONS=-Xmx10g, its not the correct approach but try.
Thanks
Best Regards
On Tue, Apr 28, 2015 at 10:35 PM,
Full Exception
*15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
VISummaryDataProvider.scala:37) failed in 884.087 s*
*15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: collectAsMap
at VISummaryDataProvider.scala:37, took 1093.418249 s*
15/04/30 09:59:49 ERROR
I have a SparkApp that runs completes in 45 mins for 5 files (5*750MB size)
and it takes 16 executors to do so.
I wanted to run it against 10 files of each input type (10*3 files as there
are three inputs that are transformed). [Input1 = 10*750 MB,
Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used
20 matches
Mail list logo