Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Aaron Davidson
ConnectionManager has been deprecated and is no longer used by default (NettyBlockTransferService is the replacement). Hopefully you would no longer see these messages unless you have explicitly flipped it back on. On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote: And also

Re: S3 vs HDFS

2015-07-11 Thread Aaron Davidson
Note that if you use multi-part upload, each part becomes 1 block, which allows for multiple concurrent readers. One would typically use fixed-size block sizes which align with Spark's default HDFS block size (64 MB, I think) to ensure the reads are aligned. On Sat, Jul 11, 2015 at 11:14 AM,

Re: All master are unreponsive issue

2015-07-05 Thread Aaron Davidson
Are you seeing this after the app has already been running for some time, or just at the beginning? Generally, registration should only occur once initially, and a timeout would be due to the master not being accessible. Try telneting to the master IP/port from the machine on which the driver will

Re: s3 bucket access/read file

2015-07-01 Thread Aaron Davidson
I think 2.6 failed to abruptly close streams that weren't fully read, which we observed as a huge performance hit. We had to backport the 2.7 improvements before being able to use it.

RE: ReduceByKey with a byte array as the key

2015-06-11 Thread Aaron Davidson
Be careful shoving arbitrary binary data into a string, invalid utf characters can cause significant computational overhead in my experience. On Jun 11, 2015 10:09 AM, Mark Tse mark@d2l.com wrote: Makes sense – I suspect what you suggested should work. However, I think the overhead

Re: RDD resiliency -- does it keep state?

2015-03-28 Thread Aaron Davidson
Note that speculation is off by default to avoid these kinds of unexpected issues. On Sat, Mar 28, 2015 at 6:21 AM, Steve Loughran ste...@hortonworks.com wrote: It's worth adding that there's no guaranteed that re-evaluated work would be on the same host as before, and in the case of node

Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Aaron Davidson
Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be

Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Aaron Davidson
. Darin. - Original Message - From: Darin McBeath ddmcbe...@yahoo.com.INVALID To: Mingyu Kim m...@palantir.com; Aaron Davidson ilike...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 23, 2015 3:16 PM Subject: Re: Which OutputCommitter to use for S3

Re: Having lots of FetchFailedException in join

2015-03-05 Thread Aaron Davidson
could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
) Jianshi On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson ilike...@gmail.com wrote: Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot

Re: Worker and Nodes

2015-02-21 Thread Aaron Davidson
Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though,

Re: Which OutputCommitter to use for S3?

2015-02-21 Thread Aaron Davidson
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the

Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions will be equal sized (that is hard), and instead uses sampling to approximate equal buckets. Thus, it is possible that a bucket is left empty. If you want the specified behavior, you should define your own partitioner. It

Re: Shuffle write increases in spark 1.2

2015-02-15 Thread Aaron Davidson
I think Xuefeng Wu's suggestion is likely correct. This different is more likely explained by the compression library changing versions than sort vs hash shuffle (which should not affect output size significantly). Others have reported that switching to lz4 fixed their issue. We should document

Re: Shuffle read/write issue in spark 1.2

2015-02-06 Thread Aaron Davidson
Did the problem go away when you switched to lz4? There was a change from the default compression codec fro 1.0 to 1.1, where we went from LZF to Snappy. I don't think there was any such change from 1.1 to 1.2, though. On Fri, Feb 6, 2015 at 12:17 AM, Praveen Garg praveen.g...@guavus.com wrote:

Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Aaron Davidson
The latter would be faster. With S3, you want to maximize number of concurrent readers until you hit your network throughput limits. On Wed, Feb 4, 2015 at 6:20 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi if i have a 10GB file on s3 and set 10 partitions, would it be download whole

Re: 2GB limit for partitions?

2015-02-03 Thread Aaron Davidson
To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks

Re: Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread Aaron Davidson
Ah, this is in particular an issue due to sort-based shuffle (it was not the case for hash-based shuffle, which would immediately serialize each record rather than holding many in memory at once). The documentation should be updated. On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza

Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Aaron Davidson
, but the job sits there as the moving of files happens. On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com wrote: This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task

Re: performance of saveAsTextFile moving files from _temporary

2015-01-27 Thread Aaron Davidson
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the

Re: Lost task - connection closed

2015-01-26 Thread Aaron Davidson
It looks like something weird is going on with your object serialization, perhaps a funny form of self-reference which is not detected by ObjectOutputStream's typical loop avoidance. That, or you have some data structure like a linked list with a parent pointer and you have many thousand elements.

Re: Lost task - connection closed

2015-01-25 Thread Aaron Davidson
Please take a look at the executor logs (on both sides of the IOException) to see if there are other exceptions (e.g., OOM) which precede this one. Generally, the connections should not fail spontaneously. On Sun, Jan 25, 2015 at 10:35 PM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi,

Re: Spark 1.2 – How to change Default (Random) port ….

2015-01-25 Thread Aaron Davidson
This was a regression caused by Netty Block Transfer Service. The fix for this just barely missed the 1.2 release, and you can see the associated JIRA here: https://issues.apache.org/jira/browse/SPARK-4837 Current master has the fix, and the Spark 1.2.1 release will have it included. If you don't

Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Aaron Davidson
Spark's network-common package depends on guava as a provided dependency in order to avoid conflicting with other libraries (e.g., Hadoop) that depend on specific versions. com/google/common/base/Preconditions has been present in Guava since version 2, so this is likely a dependency not found

Re: Serializability: for vs. while loops

2015-01-15 Thread Aaron Davidson
Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from

Re: use netty shuffle for network cause high gc time

2015-01-13 Thread Aaron Davidson
What version are you running? I think spark.shuffle.use.netty was a valid option only in Spark 1.1, where the Netty stuff was strictly experimental. Spark 1.2 contains an officially supported and much more thoroughly tested version under the property spark.shuffle.blockTransferService, which is

Re: FileNotFoundException in appcache shuffle files

2015-01-10 Thread Aaron Davidson
As Jerry said, this is not related to shuffle file consolidation. The unique thing about this problem is that it's failing to find a file while trying to _write_ to it, in append mode. The simplest explanation for this would be that the file is deleted in between some check for existence and

Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId

2015-01-08 Thread Aaron Davidson
Do note that this problem may be fixed in Spark 1.2, as we changed the default transfer service to use a Netty-based one rather than the ConnectionManager. On Thu, Jan 8, 2015 at 7:05 AM, Spidy yoni...@gmail.com wrote: Hi, Can you please explain which settings did you changed? -- View

Re: RDDs being cleaned too fast

2014-12-10 Thread Aaron Davidson
The ContextCleaner uncaches RDDs that have gone out of scope on the driver. So it's possible that the given RDD is no longer reachable in your program's control flow, or else it'd be a bug in the ContextCleaner. On Wed, Dec 10, 2014 at 5:34 PM, ankits ankitso...@gmail.com wrote: I'm using spark

Re: Running two different Spark jobs vs multi-threading RDDs

2014-12-06 Thread Aaron Davidson
You can actually submit multiple jobs to a single SparkContext in different threads. In the case you mentioned with 2 stages having a common parent, both will wait for the parent stage to complete and then the two will execute in parallel, sharing the cluster resources. Solutions that submit

Re: Announcing Spark 1.1.1!

2014-12-03 Thread Aaron Davidson
Because this was a maintenance release, we should not have introduced any binary backwards or forwards incompatibilities. Therefore, applications that were written and compiled against 1.1.0 should still work against a 1.1.1 cluster, and vice versa. On Wed, Dec 3, 2014 at 1:30 PM, Andrew Or

Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-30 Thread Aaron Davidson
be interested in the new s3a filesystem in Hadoop 2.6.0 [1]. 1. https://issues.apache.org/jira/plugins/servlet/mobile#issue/HADOOP-10400 On Nov 26, 2014 12:24 PM, Aaron Davidson ilike...@gmail.com wrote: Spark has a known problem where it will do a pass of metadata on a large number of small

Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2014-11-26 Thread Aaron Davidson
Spark has a known problem where it will do a pass of metadata on a large number of small files serially, in order to find the partition information prior to starting the job. This will probably not be repaired by switching the FS impl. However, you can change the FS being used like so (prior to

Re: Bug in Accumulators...

2014-11-23 Thread Aaron Davidson
As Mohit said, making Main extend Serializable should fix this example. In general, it's not a bad idea to mark the fields you don't want to serialize (e.g., sc and conf in this case) as @transient as well, though this is not the issue in this case. Note that this problem would not have arisen in

Re: Given multiple .filter()'s, is there a way to set the order?

2014-11-14 Thread Aaron Davidson
In the situation you show, Spark will pipeline each filter together, and will apply each filter one at a time to each row, effectively constructing an statement. You would only see a performance difference if the filter code itself is somewhat expensive, then you would want to only execute it on

Re: data locality, task distribution

2014-11-13 Thread Aaron Davidson
though in the times I mentioned - the list I gave (3.1 min, 2 seconds, ... 8 min) were not different runs with different cache %s, they were iterations within a single run with 100% caching. -Nathan On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson ilike...@gmail.com wrote

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case? On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
down within a run, with the same instance. I meant I'd run the whole app, and one time, it would cache 100%, and the next run, it might cache only 83% Within a run, it doesn't change. On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com wrote: The fact that the caching

Re: Bug in Accumulators...

2014-11-07 Thread Aaron Davidson
This may be due in part to Scala allocating an anonymous inner class in order to execute the for loop. I would expect if you change it to a while loop like var i = 0 while (i 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) i += 1 } then the problem may go away. I am not

Re: Spark speed performance

2014-11-01 Thread Aaron Davidson
coalesce() is a streaming operation if used without the second parameter, it does not put all the data in RAM. If used with the second parameter (shuffle = true), then it performs a shuffle, but still does not put all the data in RAM. On Sat, Nov 1, 2014 at 12:09 PM, jan.zi...@centrum.cz wrote:

Re: Shuffle issues in the current master

2014-10-22 Thread Aaron Davidson
You may be running into this issue: https://issues.apache.org/jira/browse/SPARK-4019 You could check by having 2000 or fewer reduce partitions. On Wed, Oct 22, 2014 at 1:48 PM, DB Tsai dbt...@dbtsai.com wrote: PS, sorry for spamming the mailing list. Based my knowledge, both

Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-22 Thread Aaron Davidson
Another wild guess, if your data is stored in S3, you might be running into an issue where the default jets3t properties limits the number of parallel S3 connections to 4. Consider increasing the max-thread-counts from here: http://www.jets3t.org/toolkit/configuration.html. On Tue, Oct 21, 2014

Re: input split size

2014-10-18 Thread Aaron Davidson
The minPartitions argument of textFile/hadoopFile cannot decrease the number of splits past the physical number of blocks/files. So if you have 3 HDFS blocks, asking for 2 minPartitions will still give you 3 partitions (hence the min). It can, however, convert a file with fewer HDFS blocks into

Re: How to change the values in Array of Bytes

2014-09-06 Thread Aaron Davidson
More of a Scala question than Spark, but apply here can be written with just parentheses like this: val array = Array.fill[Byte](10)(0) if (array(index) == 0) { array(index) = 1 } The second instance of array(index) = 1 is actually not calling apply, but update. It's a scala-ism that's usually

Re: error: type mismatch while Union

2014-09-06 Thread Aaron Davidson
Are you doing this from the spark-shell? You're probably running into https://issues.apache.org/jira/browse/SPARK-1199 which should be fixed in 1.1. On Sat, Sep 6, 2014 at 3:03 AM, Dhimant dhimant84.jays...@gmail.com wrote: I am using Spark version 1.0.2 -- View this message in context:

Re: question on replicate() in blockManager.scala

2014-09-06 Thread Aaron Davidson
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if check, perhaps obscuring its existence. On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: String, data:

Re: Getting the type of an RDD in spark AND pyspark

2014-09-06 Thread Aaron Davidson
Pretty easy to do in Scala: rdd.elementClassTag.runtimeClass You can access this method from Python as well by using the internal _jrdd. It would look something like this (warning, I have not tested it): rdd._jrdd.classTag().runtimeClass() (The method name is classTag for JavaRDDLike, and

Re: Stage failure in BlockManager due to FileNotFoundException on long-running streaming job

2014-08-20 Thread Aaron Davidson
This is likely due to a bug in shuffle file consolidation (which you have enabled) which was hopefully fixed in 1.1 with this patch: https://github.com/apache/spark/commit/78f2af582286b81e6dc9fa9d455ed2b369d933bd Until 1.0.3 or 1.1 are released, the simplest solution is to disable

Re: s3:// sequence file startup time

2014-08-17 Thread Aaron Davidson
The driver must initially compute the partitions and their preferred locations for each part of the file, which results in a serial getFileBlockLocations() on each part. However, I would expect this to take several seconds, not minutes, to perform on 1000 parts. Is your driver inside or outside of

Re: Spark: why need a masterLock when sending heartbeat to master

2014-08-17 Thread Aaron Davidson
Yes, good point, I believe the masterLock is now unnecessary altogether. The reason for its initial existence was that changeMaster() originally could be called out-of-band of the actor, and so we needed to make sure the master reference did not change out from under us. Now it appears that all

Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD?

Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
understand that returned byte array somehow corresponds to actual data, but how can I get it? On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote: rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than

Re: Spilling in-memory... messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
I see. There should not be a significant algorithmic difference between those two cases, as far as I can think, but there is a good bit of local-mode-only logic in Spark. One typical problem we see on large-heap, many-core JVMs, though, is much more time spent in garbage collection. I'm not sure

Re: Configuring Spark Memory

2014-07-24 Thread Aaron Davidson
information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http

Re: What if there are large, read-only variables shared by all map functions?

2014-07-23 Thread Aaron Davidson
In particular, take a look at the TorrentBroadcast, which should be much more efficient than HttpBroadcast (which was the default in 1.0) for large files. If you find that TorrentBroadcast doesn't work for you, then another way to solve this problem is to place the data on all nodes' local disks,

Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread Aaron Davidson
What's the exception you're seeing? Is it an OOM? On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote: Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in

Re: which kind of BlockId should I use?

2014-07-20 Thread Aaron Davidson
Hm, this is not a public API, but you should theoretically be able to use TestBlockId if you like. Internally, we just use the BlockId's natural hashing and equality to do lookups and puts, so it should work fine. However, since it is in no way public API, it may change even in maintenance

Re: Large Task Size?

2014-07-15 Thread Aaron Davidson
for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part

Re: Confused by groupByKey() and the default partitioner

2014-07-13 Thread Aaron Davidson
at 9:13 AM, Guanhua Yan gh...@lanl.gov wrote: Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list concatenation operations, and found that the performance becomes even worse. So groupByKey is not that bad in my code. Best regards, - Guanhua From: Aaron Davidson ilike

Re: Confused by groupByKey() and the default partitioner

2014-07-12 Thread Aaron Davidson
Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the

Re: KMeans for large training data

2014-07-12 Thread Aaron Davidson
The netlib.BLAS: Failed to load implementation warning only means that the BLAS implementation may be slower than using a native one. The reason why it only shows up at the end is that the library is only used for the finalization step of the KMeans algorithm, so your job should've been wrapping

Re: Convert from RDD[Object] to RDD[Array[Object]]

2014-07-12 Thread Aaron Davidson
If you don't really care about the batchedDegree, but rather just want to do operations over some set of elements rather than one at a time, then just use mapPartitions(). Otherwise, if you really do want certain sized batches and you are able to relax the constraints slightly, is to construct

Re: pyspark sc.parallelize running OOM with smallish data

2014-07-12 Thread Aaron Davidson
I think this is probably dying on the driver itself, as you are probably materializing the whole dataset inside your python driver. How large is spark_data_array compared to your driver memory? On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote: I put the same dataset into

Re: Putting block rdd failed when running example svm on large data

2014-07-12 Thread Aaron Davidson
Also check the web ui for that. Each iteration will have one or more stages associated with it in the driver web ui. On Sat, Jul 12, 2014 at 6:47 PM, crater cq...@ucmerced.edu wrote: Hi Xiangrui, Thanks for the information. Also, it is possible to figure out the execution time per

Re: Large Task Size?

2014-07-12 Thread Aaron Davidson
I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul

Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
There is a difference from actual GC overhead, which can be reduced by reusing objects, versus this error, which actually means you ran out of memory. This error can probably be relieved by increasing your executor heap size, unless your data is corrupt and it is allocating huge arrays, or you are

Re: java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)

2014-07-08 Thread Aaron Davidson
and they all compete for running GCs which slow things down and trigger the error you saw. By reducing the number of cores, there are more cpu resources available to a task so the GC could finish before the error gets throw. HTH, Jerry On Tue, Jul 8, 2014 at 1:35 PM, Aaron Davidson ilike

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
Hmm, looks like the Executor is trying to connect to the driver on localhost, from this line: 14/07/08 11:07:13 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@localhost:39701/user/CoarseGrainedScheduler What is your setup? Standalone mode with 4 separate machines? Are

Re: Comparative study

2014-07-08 Thread Aaron Davidson
Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. +1 @Reynold Spark can handle big big data. There are known issues with informing the user about what went wrong

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I thought you might have preemptively done so. I think the issue is actually related to your network configuration, as Spark probably failed to find your driver's ip address. Do you see a warning on the driver that looks

Re: CoarseGrainedExecutorBackend: Driver Disassociated

2014-07-08 Thread Aaron Davidson
By the way, you can run the sc.getConf.get(spark.driver.host) thing inside spark-shell, whether or not the Executors actually start up successfully. On Tue, Jul 8, 2014 at 8:23 PM, Aaron Davidson ilike...@gmail.com wrote: You actually should avoid setting SPARK_PUBLIC_DNS unless necessary, I

Re: Serialization of objects

2014-07-01 Thread Aaron Davidson
If you want to stick with Java serialization and need to serialize a non-Serializable object, your best choices are probably to either subclass it with a Serializable one or wrap it in a class of your own which implements its own writeObject/readObject methods (see here:

Re: Failed to launch Worker

2014-07-01 Thread Aaron Davidson
Where are you running the spark-class version? Hopefully also on the workers. If you're trying to centrally start/stop all workers, you can add a slaves file to the spark conf/ directory which is just a list of your hosts, one per line. Then you can just use ./sbin/start-slaves.sh to start the

Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0

2014-07-01 Thread Aaron Davidson
In your spark-env.sh, do you happen to set SPARK_PUBLIC_DNS or something of that kin? This error suggests the worker is trying to bind a server on the master's IP, which clearly doesn't make sense On Mon, Jun 30, 2014 at 11:59 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I did

Re: Interconnect benchmarking

2014-06-27 Thread Aaron Davidson
A simple throughput test is also repartition()ing a large RDD. This also stresses the disks, though, so you might try to mount your spark temporary directory as a ramfs. On Fri, Jun 27, 2014 at 5:57 PM, danilopds danilob...@gmail.com wrote: Hi, According with the research paper bellow of

Re: Improving Spark multithreaded performance?

2014-06-26 Thread Aaron Davidson
I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total

Re: jsonFile function in SQLContext does not work

2014-06-25 Thread Aaron Davidson
Is it possible you have blank lines in your input? Not that this should be an error condition, but it may be what's causing it. On Wed, Jun 25, 2014 at 11:57 AM, durin m...@simon-schaefer.net wrote: Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and

Re: Changing log level of spark

2014-06-25 Thread Aaron Davidson
If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote: I have a log4j.xml in

Re: Shark vs Impala

2014-06-23 Thread Aaron Davidson
Note that regarding a long load time, data format means a whole lot in terms of query performance. If you load all your data into compressed, columnar Parquet files on local hardware, Spark SQL would also perform far, far better than it would reading from gzipped S3 files. You must also be careful

Re: DAGScheduler: Failed to run foreach

2014-06-23 Thread Aaron Davidson
Please note that this: for (sentence - sourcerdd) { ... } is actually Scala syntactic sugar which is converted into sourcerdd.foreach { sentence = ... } What this means is that this will actually run on the cluster, which is probably not what you want if you're trying to print them. Try

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
I remember having to do a similar thing in the spark docker scripts for testing purposes. Were you able to modify the /etc/hosts directly? I remember issues with that as docker apparently mounts it as part of its read-only filesystem. On Tue, Jun 17, 2014 at 4:36 PM, Mohit Jaggi

Re: Executors not utilized properly.

2014-06-17 Thread Aaron Davidson
repartition() is actually just an alias of coalesce(), but which the shuffle flag to set to true. This shuffle is probably what you're seeing as taking longer, but it is required when you go from a smaller number of partitions to a larger. When actually decreasing the number of partitions,

Re: spark with docker: errors with akka, NAT?

2014-06-17 Thread Aaron Davidson
Yup, alright, same solution then :) On Tue, Jun 17, 2014 at 7:39 PM, Mohit Jaggi mohitja...@gmail.com wrote: I used --privileged to start the container and then unmounted /etc/hosts. Then I created a new /etc/hosts file On Tue, Jun 17, 2014 at 4:58 PM, Aaron Davidson ilike...@gmail.com

Re: long GC pause during file.cache()

2014-06-15 Thread Aaron Davidson
Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon

Re: How to specify executor memory in EC2 ?

2014-06-12 Thread Aaron Davidson
The scripts for Spark 1.0 actually specify this property in /root/spark/conf/spark-defaults.conf I didn't know that this would override the --executor-memory flag, though, that's pretty odd. On Thu, Jun 12, 2014 at 6:02 PM, Aliaksei Litouka aliaksei.lito...@gmail.com wrote: Yes, I am

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-09 Thread Aaron Davidson
It is not a very good idea to save the results in the exact same place as the data. Any failures during the job could lead to corrupted data, because recomputing the lost partitions would involve reading the original (now-nonexistent) data. As such, the only safe way to do this would be to do as

Re: How to enable fault-tolerance?

2014-06-09 Thread Aaron Davidson
Looks like your problem is local mode: https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430 For some reason, someone decided not to do retries when running in local mode. Not exactly sure why, feel free to

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
+1 please re-add this feature On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com wrote: Thanks for pointing that out. I've assigned you to SPARK-1677 (I think I accidentally assigned myself way back when I created it). This should be an easy fix. On Mon, Jun 2, 2014 at

Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Aaron Davidson
/parcels/CDH/lib/hadoop/lib/native -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077 On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson ilike...@gmail.com wrote: Sounds like you have two shells running, and the first one is talking all your resources. Do

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
- files may be left over from previous saves, which is dangerous. Is this correct? On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson ilike...@gmail.com wrote: +1 please re-add this feature On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell pwend...@gmail.com wrote: Thanks for pointing

Re: Using Spark on Data size larger than Memory size

2014-06-01 Thread Aaron Davidson
There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many

Re: Akka disassociation on Java SE Embedded

2014-06-01 Thread Aaron Davidson
-- Chanwit Kaewkasi linkedin.com/in/chanwit On Wed, May 28, 2014 at 12:47 AM, Aaron Davidson ilike...@gmail.com wrote: Spark should effectively turn Akka's failure detector off, because we historically had problems with GCs and other issues causing disassociations. The only thing that should

Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
straightforward ways of producing assembly jars. On Sat, May 31, 2014 at 11:23 PM, Russell Jurney russell.jur...@gmail.com wrote: Thanks for the fast reply. I am running CDH 4.4 with the Cloudera Parcel of Spark 0.9.0, in standalone mode. On Saturday, May 31, 2014, Aaron Davidson ilike

Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
require that I know where $SPARK_HOME is. However, I have no idea. Any idea where that might be? On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson ilike...@gmail.com wrote: Gotcha. The easiest way to get your dependencies to your Executors would probably be to construct your SparkContext

Re: hadoopRDD stalls reading entire directory

2014-06-01 Thread Aaron Davidson
. Then when I run rdd.first, I get this over and over: 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson ilike

Re: Can anyone help me set memory for standalone cluster?

2014-06-01 Thread Aaron Davidson
In addition to setting the Standalone memory, you'll also need to tell your SparkContext to claim the extra resources. Set spark.executor.memory to 1600m as well. This should be a system property set in SPARK_JAVA_OPTS in conf/spark-env.sh (in 0.9.1, which you appear to be using) -- e.g., export

Re: Spark 1.0.0 - Java 8

2014-05-30 Thread Aaron Davidson
Also, the Spark examples can run out of the box on a single machine, as well as a cluster. See the Master URLs heading here: http://spark.apache.org/docs/latest/submitting-applications.html#master-urls On Fri, May 30, 2014 at 9:24 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: With

Re: access hdfs file name in map()

2014-05-29 Thread Aaron Davidson
Currently there is not a way to do this using textFile(). However, you could pretty straightforwardly define your own subclass of HadoopRDD [1] in order to get access to this information (likely using mapPartitionsWithIndex to look up the InputSplit for a particular partition). Note that

  1   2   >