Hi Art,
I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful. I
think using an implicit to add your retyring behavior might be useful. I
can think of two options:
1. enriching RDD itself, eg. to add a
on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 --- 10
2 --- 5
On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote:
Hi Art,
I have some advice that isn't spark-specific at all, so
Rishi's approach will work, but its worth mentioning that because all of
the data goes into only two groups, you will only process the resulting
data with two tasks and so you're losing almost all parallelism.
Presumably you're processing a lot of data, since you only want to do one
pass, so I
Hi Franco,
As a fast approximate way to get probability distributions, you might be
interested in t-digests:
https://github.com/tdunning/t-digest
In one pass, you could make a t-digest for each variable, to get its
distribution. And after that, you could make another pass to map each data
It's an easy mistake to make... I wonder if an assertion could be
implemented that makes sure the type parameter is present.
We could use the NotNothing pattern
http://blog.evilmonkeylabs.com/2012/05/31/Forcing_Compiler_Nothing_checks/
but I wonder if it would just make the method signature
Minor correction: I think you want iterator.grouped(10) for
non-overlapping mini batches
On Dec 11, 2014 1:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
You can just do mapPartitions on the whole RDD, and then called sliding()
on the iterator in each one to get a sliding window. One
I'm a little confused by some of the responses. It seems like there are
two different issues being discussed here:
1. How to turn a sequential algorithm into something that works on spark.
Eg deal with the fact that data is split into partitions which are
processed in parallel (though within a
I think accumulators do exactly what you want.
(Scala syntax below, I'm just not familiar with the Java equivalent ...)
val f1counts = sc.accumulator (0)
val f2counts = sc.accumulator (0)
val f3counts = sc.accumulator (0)
textfile.foreach { s =
if(f1matches) f1counts += 1
...
}
Note that
wow, really weird. My intuition is the same as everyone else's, some
unprintable character. Here's a couple more debugging tricks I've used in
the past:
//set up an accumulator to catch the bad rows as a side-effect
val nBadRows = sc.accumulator(0)
val nGoodRows = sc.accumulator(0)
val badRows
Hi Michael,
judging from the logs, it seems that those tasks are just working a really
long time. If you have long running tasks, then you wouldn't expect the
driver to output anything while those tasks are working.
What is unusual is that there is no activity during all that time the tasks
are
Many operations in spark are lazy -- most likely your collect() statement
is actually forcing evaluation of severals steps earlier in the pipeline.
The logs the UI might give you some info about all the stages that are
being run when you get to collect().
I think collect() is just fine if you
used to run some of our jobs on it ... But
that is forked off 1.1 actually).
Regards
Mridul
On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:
Thanks for the explanations, makes sense. For the record looks like this
was worked on a while back (and maybe the work
I think you are interested in secondary sort, which is still being worked
on:
https://issues.apache.org/jira/browse/SPARK-3655
On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote:
I thought thats what sort based shuffled did, sort the keys going to the
same partition.
I
into multiple smaller blocks.
On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:
Michael,
you are right, there is definitely some limit at 2GB. Here is a trivial
example to demonstrate it:
import org.apache.spark.storage.StorageLevel
val d = sc.parallelize(1 to 1e6.toInt
Michael,
you are right, there is definitely some limit at 2GB. Here is a trivial
example to demonstrate it:
import org.apache.spark.storage.StorageLevel
val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
d.count()
It gives the same
You're understanding is basically correct. Each task creates it's own
local accumulator, and just those results get merged together.
However, there are some performance limitations to be aware of. First you
need enough memory on the executors to build up whatever those intermediate
results are.
I'm not positive, but I think this is very unlikely to work.
First, when you call sc.objectFile(...), I think the *driver* will need to
know something about the file, eg to know how many tasks to create. But it
won't even be able to see the file, since it only lives on the local
filesystem of
Spark can definitely process data with optional fields. It kinda depends
on what you want to do with the results -- its more of a object design /
knowing scala types question.
Eg., scala has a built in type Option specifically for handling optional
data, which works nicely in pattern matching
I think you should also just be able to provide an input format that never
splits the input data. This has come up before on the list, but I couldn't
find it.*
I think this should work, but I can't try it out at the moment. Can you
please try and let us know if it works?
class
would really want to do in the first place.
Thanks again for your insights.
Darin.
--
*From:* Imran Rashid iras...@cloudera.com
*To:* Darin McBeath ddmcbe...@yahoo.com
*Cc:* User user@spark.apache.org
*Sent:* Tuesday, February 17, 2015 3:29 PM
*Subject:* Re
a JavaRDD is just a wrapper around a normal RDD defined in scala, which is
stored in the rdd field. You can access everything that way. The
JavaRDD wrappers just provide some interfaces that are a bit easier to work
with in Java.
If this is at all convincing, here's me demonstrating it inside
The important thing here is the master's memory, that's where you're
getting the GC overhead limit. The master is updating its UI to include
your finished app when your app finishes, which would cause a spike in
memory usage.
I wouldn't expect the master to need a ton of memory just to serve the
(trying to repost to the list w/out URLs -- rejected as spam earlier)
Hi,
Using take() is not a good idea, as you have noted it will pull a lot of
data down to the driver so its not scalable. Here are some more scalable
alternatives:
1. Approximate solutions
1a. Sample the data. Just sample
Hi Emre,
there shouldn't be any difference in which files get processed w/ print()
vs. foreachRDD(). In fact, if you look at the definition of print(), it is
just calling foreachRDD() underneath. So there is something else going on
here.
We need a little more information to figure out exactly
Hi Darin,
When you say you see 400GB of shuffle writes from the first code snippet,
what do you mean? There is no action in that first set, so it won't do
anything. By itself, it won't do any shuffle writing, or anything else for
that matter.
Most likely, the .count() on your second code
the relevant methods from the my utility
classes for completeness.
I am as perplexed as you are as to why forcing the output via foreachRDD
ended up in different behaviour compared to simply using print() method.
Kind regards,
Emre
On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras
Hi Tom,
there are a couple of things you can do here to make this more efficient.
first, I think you can replace your self-join with a groupByKey. on your
example data set, this would give you
(1, Iterable(2,3))
(4, Iterable(3))
this reduces the amount of data that needs to be shuffled, and
Hi Joe,
The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached. You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
Also, just
it by the number of files? Or perhaps I'm barking up completely
the wrong tree.
Joe
On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:
Hi Joe,
The issue is not that you have input partitions that are bigger than 2GB
-- its just that they are getting cached. You can see
almost all your data is going to one task. You can see that the shuffle
read for task 0 is 153.3 KB, and for most other tasks its just 26B (which
is probably just some header saying there are no actual records). You need
to ensure your data is more evenly distributed before this step.
On Thu,
I'm not an expert on streaming, but I think you can't do anything like this
right now. It seems like a very sensible use case, though, so I've created
a jira for it:
https://issues.apache.org/jira/browse/SPARK-5467
On Wed, Jan 28, 2015 at 8:54 AM, YaoPau jonrgr...@gmail.com wrote:
The
I'm not sure about this, but I suspect the answer is: spark doesn't
guarantee a stable sort, nor does it plan to in the future, so the
implementation has more flexibility.
But you might be interested in the work being done on secondary sort, which
could give you the guarantees you want:
this looks reasonable to me. As you've done, the important thing is just
to make isSplittable return false.
this shares a bit in common with the sc.wholeTextFile method. It sounds
like you really want something much simpler than what that is doing, but
you might be interested in looking at that
Rashid iras...@cloudera.com
wrote:
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
I am getting the feeling this is an issue w/ pyspark
On Thu, Feb 12, 2015 at 10:43 AM, Imran
unfortunately this is a known issue:
https://issues.apache.org/jira/browse/SPARK-1476
as Sean suggested, you need to think of some other way of doing the same
thing, even if its just breaking your one big broadcast var into a few
smaller ones
On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38
I am getting the feeling this is an issue w/ pyspark
On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote:
ah
Hi Karlson,
I think your assumptions are correct -- that join alone shouldn't require
any shuffling. But its possible you are getting tripped up by lazy
evaluation of RDDs. After you do your partitionBy, are you sure those RDDs
are actually materialized cached somewhere? eg., if you just did
You need to import the implicit conversions to PairRDDFunctions with
import org.apache.spark.SparkContext._
(note that this requirement will go away in 1.3:
https://issues.apache.org/jira/browse/SPARK-4397)
On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com
wrote:
Hi. I
to the
mailing list first.
On 2015-02-12 16:48, Imran Rashid wrote:
Hi Karlson,
I think your assumptions are correct -- that join alone shouldn't require
any shuffling. But its possible you are getting tripped up by lazy
evaluation of RDDs. After you do your partitionBy, are you sure those
this is more-or-less the best you can do now, but as has been pointed out,
accumulators don't quite fit the bill for counters. There is an open issue
to do something better, but no progress on that so far
https://issues.apache.org/jira/browse/SPARK-603
On Fri, Feb 13, 2015 at 11:12 AM, Mark
yeah, this is just the totally normal message when spark executes
something. The first time something is run, all of its tasks are
missing. I would not worry about cases when all tasks aren't missing
if you're new to spark, its probably an advanced concept that you don't
care about. (and would
if you have duplicate values for a key, join creates all pairs. Eg. if you
2 values for key X in rdd A 2 values for key X in rdd B, then a.join(B)
will have 4 records for key X
On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:
Consider the following left outer
the more scalable alternative is to do a join (or a variant like cogroup,
leftOuterJoin, subtractByKey etc. found in PairRDDFunctions)
the downside is this requires a shuffle of both your RDDs
On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com
wrote:
Hi,
I have two RDD's
The error msg is telling you the exact problem, it can't find
ProgramSIM, the thing you are trying to run
Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev):
java.io.IOException: Cannot run program ProgramSIM: error=2, No s\
uch file or directory
On Thu, Feb 19, 2015 at 5:52 PM,
place I can find an example? I never
create my own RDD class before (not RDD instance J). But this is very
valuable approach to me so I am desired to learn.
Regards,
Shuai
*From:* Imran Rashid [mailto:iras...@cloudera.com]
*Sent:* Monday, March 16, 2015 11:22 AM
*To:* Shawn Zheng; user
I'm not super familiar w/ S3, but I think the issue is that you want to use
a different output committers with object stores, that don't have a
simple move operation. There have been a few other threads on S3
outputcommitters. I think the most relevant for you is most probably this
open JIRA:
Interesting, on another thread, I was just arguing that the user should
*not* open the files themselves and read them, b/c then they lose all the
other goodies we have in HadoopRDD, eg. the metric tracking.
I think this encourages Pat's argument that we might actually need better
support for this
/enforce the partition in my own way.
Regards,
Shuai
On Wed, Mar 11, 2015 at 8:09 PM, Imran Rashid iras...@cloudera.com
wrote:
It should be *possible* to do what you want ... but if I understand you
right, there isn't really any very easy way to do it. I think you would
need to write your own
Hi Shuai,
On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote:
Sorry I response late.
Zhan Zhang's solution is very interesting and I look at into it, but it is
not what I want. Basically I want to run the job sequentially and also gain
parallelism. So if possible, if
I think you should see some other errors before that, from
NettyBlockTransferService, with a msg like Exception while beginning
fetchBlocks. There might be a bit more information there. there are an
assortment of possible causes, but first lets just make sure you have all
the details from the
Hi Thomas,
sorry for such a late reply. I don't have any super-useful advice, but
this seems like something that is important to follow up on. to answer
your immediate question, No, there should not be any hard limit to the
number of tasks that MapOutputTracker can handle. Though of course as
I think you are running into a combo of
https://issues.apache.org/jira/browse/SPARK-5928
and
https://issues.apache.org/jira/browse/SPARK-5945
The standard solution is to just increase the number of partitions you are
creating. textFile(), reduceByKey(), and sortByKey() all take an optional
Hi Yong,
yes I think your analysis is correct. I'd imagine almost all serializers
out there will just convert a string to its utf-8 representation. You
might be interested in adding compression on top of a serializer, which
would probably bring the string size down in almost all cases, but then
is your data skewed? Could it be that there are a few keys with a huge
number of records? You might consider outputting
(recordA, count)
(recordB, count)
instead of
recordA
recordA
recordA
...
you could do this with:
input = sc.textFile
pairsCounts = input.map{x = (x,1)}.reduceByKey{_ + _}
I am not entirely sure I understand your question -- are you saying:
* scoring a sample of 50k events is fast
* taking the top N scores of 77M events is slow, no matter what N is
?
if so, this shouldn't come as a huge surprise. You can't find the top
scoring elements (no matter how small N is)
don't break. I want
to benefit from the MapOutputTracker fix in 1.2.0.
On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote:
the scala syntax for arrays is Array[T], not T[], so you want to use
something:
kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element
this is a very interesting use case. First of all, its worth pointing out
that if you really need to process the data sequentially, fundamentally you
are limiting the parallelism you can get. Eg., if you need to process the
entire data set sequentially, then you can't get any parallelism. If
Hi Jonathan,
you might be interested in https://issues.apache.org/jira/browse/SPARK-3655
(not yet available) and https://github.com/tresata/spark-sorted (not part
of spark, but it is available right now). Hopefully thats what you are
looking for. To the best of my knowledge that covers what is
did you forget to specify the main class w/ --class Main? though if that
was it, you should at least see *some* error message, so I'm confused
myself ...
On Wed, Mar 11, 2015 at 6:53 AM, Aung Kyaw Htet akh...@gmail.com wrote:
Hi Everyone,
I am developing a scala app, in which the main object
saying that it needs to be
registered (the error is only when I turn on kryo).
However the code is running smoothly with kryo turned off.
On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com
wrote:
I'm not sure what you mean. Are you asking how you can recompile all of
spark
It should be *possible* to do what you want ... but if I understand you
right, there isn't really any very easy way to do it. I think you would
need to write your own subclass of RDD, which has its own logic on how the
input files get put divided among partitions. You can probably subclass
I think writing to hdfs and reading it back again is totally reasonable.
In fact, in my experience, writing to hdfs and reading back in actually
gives you a good opportunity to handle some other issues as well:
a) instead of just writing as an object file, I've found its helpful to
write in a
the scala syntax for arrays is Array[T], not T[], so you want to use
something:
kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
kryo.register(classOf[Array[Short]])
nonetheless, the spark should take care of this itself. I'll look into it
later today.
On Mon, Mar 2, 2015
Why would you want to use spark to sequentially process your entire data
set? The entire purpose is to let you do distributed processing -- which
means letting partitions get processed simultaneously by different cores /
nodes.
that being said, occasionally in a bigger pipeline with a lot of
This doesn't involve spark at all, I think this is entirely an issue with
how scala deals w/ primitives and boxing. Often it can hide the details
for you, but IMO it just leads to far more confusing errors when things
don't work out. The issue here is that your map has value type Any, which
You can set the number of partitions dynamically -- its just a parameter to
a method, so you can compute it however you want, it doesn't need to be
some static constant:
val dataSizeEstimate = yourMagicFunctionToEstimateDataSize()
val numberOfPartitions =
sortByKey() is the probably the easiest way:
import org.apache.spark.SparkContext._
joinedRdd.map{case(word, (file1Counts, file2Counts)) = (file1Counts,
(word, file1Counts, file2Counts))}.sortByKey()
On Mon, Feb 23, 2015 at 10:41 AM, Anupama Joshi anupama.jo...@gmail.com
wrote:
Hi ,
To
I think you're getting tripped up lazy evaluation and the way stage
boundaries work (admittedly its pretty confusing in this case).
It is true that up until recently, if you unioned two RDDs with the same
partitioner, the result did not have the same partitioner. But that was
just fixed here:
Hi Yiannis,
Broadcast variables are meant for *immutable* data. They are not meant for
data structures that you intend to update. (It might *happen* to work when
running local mode, though I doubt it, and it would probably be a bug if it
did. It will certainly not work when running on a
the spark history server and the yarn history server are totally
independent. Spark knows nothing about yarn logs, and vice versa, so
unfortunately there isn't any way to get all the info in one place.
On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams disc...@uw.edu
wrote:
Looks like in
Hi Tristan,
at first I thought you were just hitting another instance of
https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its
entirely related to kryo. Would it be possible for you to try serializing
your object using kryo, without involving spark at all? If you are
Hi Tushar,
The most scalable option is probably for you to consider doing some
approximation. Eg., sample the first to come up with the bucket
boundaries. Then you can assign data points to buckets without needing to
do a full groupByKey. You could even have more passes which corrects any
any chance your input RDD is being read from hdfs, and you are running into
this issue (in the docs on SparkContext#hadoopFile):
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each
* record, directly caching the returned RDD or directly passing it to an
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
// or whatever persistence makes more sense for you ...
while(true) {
val res = grouped.flatMap(F)
res.collect.foreach(func)
if(criteria)
break
}
On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan
Hi Yong,
mostly correct except for:
- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.
no, you will not get 1000 partitions. Spark has to decide how many
partitions to use before it even knows how many
no, it does not give you transitive dependencies. You'd have to walk the
tree of dependencies yourself, but that should just be a few lines.
On Thu, Feb 26, 2015 at 3:32 PM, Corey Nolet cjno...@gmail.com wrote:
I see the rdd.dependencies() function, does that include ALL the
dependencies of
you also need to register *array*s of MyObject. so change:
conf.registerKryoClasses(Array(classOf[MyObject]))
to
conf.registerKryoClasses(Array(classOf[MyObject], classOf[Array[MyObject]]))
On Wed, Mar 25, 2015 at 2:44 AM, donhoff_h 165612...@qq.com wrote:
Hi, experts
I wrote a very
On the worker side, it all happens in Executor. The task result is
computed here:
https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
then its serialized along with some other goodies, and finally sent
Those funny class names come from scala's specialization -- its compiling a
different version of OpenHashMap for each primitive you stick in the type
parameter. Here's a super simple example:
*β **~ * more Foo.scala
class Foo[@specialized X]
*β **~ * scalac Foo.scala
*β **~ * ls
yes, it sounds like a good use of an accumulator to me
val counts = sc.accumulator(0L)
rdd.map{x =
counts += 1
x
}.saveAsObjectFile(file2)
On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV)
ningjun.w...@lexisnexis.com wrote:
Sean
Yes I know that I can use persist() to persist
broadcast variables count towards spark.storage.memoryFraction, so they
use the same pool of memory as cached RDDs.
That being said, I'm really not sure why you are running into problems, it
seems like you have plenty of memory available. Most likely its got
nothing to do with broadcast
Another issue is that hadooprdd (which sc.textfile uses) might split input
files and even if it doesn't split, it doesn't guarantee that part files
numbers go to the corresponding partition number in the rdd. Eg part-0
could go to partition 27
On Apr 24, 2015 7:41 AM, Michal Michalski
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593
and the discussion around the PRs.
This is probably more complicated than what you are looking for, but you
could copy the code for HadoopReliableRDD in the PR into your own code and
use it, without having to wait for the
Gerard is totally correct -- to expand a little more, I think what you want
to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
solrInputDocumentJavaRDD.foreach:
solrInputDocumentJavaRDD.foreachPartition(
new VoidFunctionIteratorSolrInputDocument() {
@Override
public void
can you give your entire spark submit command? Are you missing
--executor-cores num_cpu? Also, if you intend to use all 6 nodes, you
also need --num-executors 6
On Mon, May 4, 2015 at 2:07 AM, Xi Shen davidshe...@gmail.com wrote:
Hi,
I have two small RDD, each has about 600 records. In my
Are you setting a really large max buffer size for kryo?
Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ?
If not, we should open up another issue to get a better warning in these
cases.
On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote:
Thanks Tristan
operation), implying there are too many
object references, though itβs hard to see how I could get to 2b references
from a few million objects...
T
On 6 May 2015 at 00:58, Imran Rashid iras...@cloudera.com wrote:
Are you setting a really large max buffer size for kryo?
Was this fixed
oh wow, that is a really interesting observation, Marco Jerry.
I wonder if this is worth exposing in combineByKey()? I think Jerry's
proposed workaround is all you can do for now -- use reflection to
side-step the fact that the methods you need are private.
On Mon, Apr 27, 2015 at 8:07 AM,
yes, you should register all three.
The truth is, you only *need* to register classes that will get serialized
-- either via RDD caching or in a shuffle. So if a type is only used as an
intermediate inside a stage, you don't need to register it. But the
overhead of registering extra classes is
Hi Marius,
I am also a little confused -- are you saying that myPartitions is
basically something like:
class MyPartitioner extends Partitioner {
def numPartitions = 1
def getPartition(key: Any) = 0
}
??
If so, I don't understand how you'd ever end up data in two partitions.
Indeed, than
sortByKey() runs one job to sample the data, to determine what range of
keys to put in each partition.
There is a jira to change it to defer launching the job until the
subsequent action, but it will still execute another stage:
https://issues.apache.org/jira/browse/SPARK-1021
On Wed, Apr 29,
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber thomas.ger...@radius.com
wrote:
Now, we noticed that we get java heap OOM exceptions on the output tracker
when we have too many tasks. I wonder:
1. where does the map output tracker live? The driver? The master (when
those are not the same)?
2.
Most likely, you never call sc.stop().
Note that in 1.4, this will happen for you automatically in a shutdown
hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090
On Wed, May 13, 2015 at 8:04 AM, Yifan LI iamyifa...@gmail.com wrote:
Hi,
I have some applications
You can use sc.hadoopFile (or any of the variants) to do what you want.
They even let you reuse your existing HadoopInputFormats. You should be
able to mimic your old use with MR just fine. sc.textFile is just a
convenience method which sits on top.
imran
On Fri, May 8, 2015 at 12:03 PM, tog
depends what you mean by output data. Do you mean:
* the data that is sent back to the driver? that is result size
* the shuffle output? that is in Shuffle Write Metrics
* the data written to a hadoop output format? that is in Output Metrics
On Thu, May 14, 2015 at 2:22 PM, yanwei
I'm not super familiar with this part of the code, but from taking a quick
look:
a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
per feature (mean, max, min, etc. etc.)
b) The limit is on the result size from *all* tasks, not from one task.
You start with 3072 tasks
c)
Looks like this exception is after many more failures have occurred. It is
already on attempt 6 for stage 7 -- I'd try to find out why attempt 0
failed.
This particular exception is probably a result of corruption that can
happen when stages are retried, that I'm working on addressing in
Neither of those two. Instead, the shuffle data is cleaned up when the
stage they are from get GC'ed by the jvm. that is, when you are no longer
holding any references to anything which points to the old stages, and
there is an appropriate gc event.
The data is not cleaned up right after the
Rather than updating the broadcast variable, can't you simply create a
new one? When the old one can be gc'ed in your program, it will also get
gc'ed from spark's cache (and all executors).
I think this will make your code *slightly* more complicated, as you need
to add in another layer of
Hi,
can you take a look at the logs and see what the first error you are
getting is? Its possible that the file doesn't exist when that error is
produced, but it shows up later -- I've seen similar things happen, but
only after there have already been some errors. But, if you see that in
the
1 - 100 of 130 matches
Mail list logo