Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Hi Andrew,

Thanks a lot! Indeed, it doesn't start with spark, the following properties
are read by implementation of the driver rather than spark conf:

--conf spooky.root=s3n://spooky- \
--conf spooky.checkpoint=s3://spooky-checkpoint \

This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set
the same properties?

Yours Peng

On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From the
 warning it looks like the config you are trying to set does not start with
 the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Thanks all for your information. Andrew, I dig out one of your old post
which is relevant:

http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html

But didn't mention how to supply the properties that don't start with spark.

On 12 June 2015 at 19:39, Ted Yu yuzhih...@gmail.com wrote:

 This is the SPARK JIRA which introduced the warning:

 [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
 in spark-shell and spark-submit

 On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote:

 Hi Andrew,

 Thanks a lot! Indeed, it doesn't start with spark, the following
 properties are read by implementation of the driver rather than spark conf:

 --conf spooky.root=s3n://spooky- \
 --conf spooky.checkpoint=s3://spooky-checkpoint \

 This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
 set the same properties?

 Yours Peng

 On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From
 the warning it looks like the config you are trying to set does not start
 with the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







[Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
In Spark 1.3.x, the system property of the driver can be set by --conf
option, shared between setting spark properties and system properties.

In Spark 1.4.0 this feature is removed, the driver instead log the following
warning:

Warning: Ignoring non-spark config property: xxx.xxx=v

How do set driver's system property in 1.4.0? Is there a reason it is
removed without a deprecation warning?

Thanks a lot for your advices.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2015-05-21 Thread Peng Cheng
I stumble upon this thread and I conjecture that this may affect restoring a
checkpointed RDD as well:

http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-gt-10-hour-between-stage-latency-td22925.html#a22928

In my case I have 1600+ fragmented checkpoint file and the time to read all
metadata takes a staggering 11 hours.

If this is really the cause then its an obvious handicap, as checkponted RDD
already has all file parttition information available and doesn't need to to
read them from s3 into driver again (which cause a single-point-of-failure
and a bottleneck).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p22984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Looks like this problem has been mentioned before:

http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2

and a temporarily solution is to deploy on a dedicated EMR/S3 configuration.
I'll go for that one for a shot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Turns out the above thread is unrelated: it was caused by using s3:// instead
of s3n://. Which I already avoided in my checkpointDir configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
BTW: My thread dump of the driver's main thread looks like it is stuck on
waiting for Amazon S3 bucket metadata for a long time (which may suggests
that I should move checkpointing directory from S3 to HDFS):

Thread 1: main (RUNNABLE) 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
sun.security.ssl.InputRecord.read(InputRecord.java:480)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?

2015-04-24 Thread Peng Cheng
I'm deploying a Spark data processing job on an EC2 cluster, the job is small
for the cluster (16 cores with 120G RAM in total), the largest RDD has only
76k+ rows. But heavily skewed in the middle (thus requires repartitioning)
and each row has around 100k of data after serialization. The job always got
stuck in repartitioning. Namely, the job will constantly get following
errors and retries:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle

org.apache.spark.shuffle.FetchFailedException: Error in opening
FileSegmentManagedBuffer

org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException: /tmp/spark-...
I've tried to identify the problem but it seems like both memory and disk
consumption of the machine throwing these errors are below 50%. I've also
tried different configurations, including:

let driver/executor memory use 60% of total memory.
let netty to priortize JVM shuffling buffer.
increase shuffling streaming buffer to 128m.
use KryoSerializer and max out all buffers
increase shuffling memoryFraction to 0.4
But none of them works. The small job always trigger the same series of
errors and max out retries (upt to 1000 times). How to troubleshoot this
thing in such situation?

Thanks a lot if you have any clue.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-04-20 Thread Peng Cheng
I got exactly the same problem, except that I'm running on a standalone
master. Can you tell me the counterpart parameter on standalone master for
increasing the same memroy overhead?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to avoid “Invalid checkpoint directory” error in apache Spark?

2015-04-17 Thread Peng Cheng
I'm using Amazon EMR + S3 as my spark cluster infrastructure. When I'm
running a job with periodic checkpointing (it has a long dependency tree, so
truncating by checkpointing is mandatory, each checkpoint has 320
partitions). The job stops halfway, resulting an exception:

(On driver)
org.apache.spark.SparkException: Invalid checkpoint directory:
s3n://spooky-checkpoint/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198
at
org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
...

(On Executor)
15/04/17 22:00:14 WARN StorageService: Encountered 4 Internal Server
error(s), will retry in 800ms
15/04/17 22:00:15 WARN RestStorageService: Retrying request following error
response: PUT '/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025' --
ResponseCode: 500, ResponseStatus: Internal Server Error
...

After manually checking checkpointed files I found that
/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025 is indeed missing
on S3. So my question is: if it is missing (perhaps due to AWS malfunction),
why didn't spark detect it immediately in the checkpointing process (so it
can be retried), instead of throwing an irrecoverable error stating that
dependency tree is already lost? And how to avoid this situation from
happening again?

I don't think this problem is addressed before because HDFS is assumed to be
immediately consistent (unlike S3 which is eventually consistent) and
extremely resilient. However every component has a chance of breakdown, can
you share your best practice of checkpointing?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Invalid-checkpoint-directory-error-in-apache-Spark-tp22548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
I double check the 1.2 feature list and found out that the new sort-based
shuffle manager has nothing to do with HashPartitioner :- Sorry for the
misinformation.

In another hand. This may explain increase in shuffle spill as a side effect
of the new shuffle manager, let me revert spark.shuffle.manager to hash and
see if it make things better (or worse, as the benchmark in
https://issues.apache.org/jira/browse/SPARK-3280 indicates)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
Same problem here, shuffle write increased from 10G to over 64G, since I'm
running on amazon EC2 this always cause temporary folder to consume all the
disk space. Still looking for a solution.

BTW, the 64G shuffle write is encountered on shuffling a pairRDD with
HashPartitioner, so its not related to Spark 1.2.0's new features

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-11 Thread Peng Cheng
You are right. I've checked the overall stage metrics and looks like the
largest shuffling write is over 9G. The partition completed successfully
but its spilled file can't be removed until all others are finished.
It's very likely caused by a stupid mistake in my design. A lookup table
grows constantly in a loop, every time its union with a new increment will
results in both of them being reshuffled, and partitioner reverted to None.
This can never be efficient with existing API.


Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-10 Thread Peng Cheng
I'm running a small job on a cluster with 15G of mem and 8G of disk per
machine.

The job always get into a deadlock where the last error message is:

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

By the time it happens the shuffle write size is 0.0B and input size
is 3.4MB. I wonder what operation could quickly eat up the entire 5G
free disk space.

In addition, The storage level of the entire job is confined to
MEMORY_ONLY_SERIALIZED and checkpointing is completely disabled.


Re: java.lang.IllegalStateException: unread block data

2015-02-02 Thread Peng Cheng
I got the same problem, maybe java serializer is unstable



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p21463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) - RDD1 - RDD2
  ||
 V   V
 RDD3 - RDD4 - Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me to
persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Has this
problem be addressed so far?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) - RDD1 - RDD2
  ||
 V   V
 RDD3 - RDD4 - Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me
to persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Is there a
way to solve (or circumvent) this problem?


Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
Not if broadcast can only be used between stages. To enable this you have
to at least make broadcast asynchronous  non-blocking.

On 9 January 2015 at 18:02, Krishna Sankar ksanka...@gmail.com wrote:

 I am also looking at this domain. We could potentially use the broadcast
 capability in Spark to distribute the parameters. Haven't thought thru yet.
 Cheers
 k/

 On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote:

 Does it makes sense to use Spark's actor system (e.g. via
 SparkContext.env.actorSystem) to create parameter server?

 On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote:

 You are not the first :) probably not the fifth to have the question.
 parameter server is not included in spark framework and I've seen all
 kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
 Not sure if an 'official' benchmark  implementation will be released
 soon

 On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:


 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an
 additional software stack.

 Deeplearning4j seems to implements a distributed version of many
 popural DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm
 in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
 used for the model parallelism (as DNN are generally designed as DAG) ? And
 what about using GPUs to do local parallelism (mecanism to push partition
 into GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao







Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
You are not the first :) probably not the fifth to have the question.
parameter server is not included in spark framework and I've seen all kinds
of hacking to improvise it: REST api, HDFS, tachyon, etc.
Not sure if an 'official' benchmark  implementation will be released soon

On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:

 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an additional
 software stack.

 Deeplearning4j seems to implements a distributed version of many popural
 DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm in
 Spark ? Spark already have RDD (for data parallelism). Can GraphX be used
 for the model parallelism (as DNN are generally designed as DAG) ? And what
 about using GPUs to do local parallelism (mecanism to push partition into
 GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao




Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-02 Thread Peng Cheng
I was under the impression that ALS wasn't designed for it :- The famous
ebay online recommender uses SGD
However, you can try using the previous model as starting point, and
gradually reduce the number of iteration after the model stablize. I never
verify this idea, so you need to at least cross-validate it before putting
into productio

On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be
wrote:

 Hi all,

 I'm curious about MLlib and if it is possible to do incremental training on
 the ALSModel.

 Usually training is run first, and then you can query. But in my case, data
 is collected in real-time and I want the predictions of my ALSModel to
 consider the latest data without complete re-training phase.

 I've checked out these resources, but could not find any info on how to
 solve this:
 https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html

 My question fits in a larger picture where I'm using Prediction IO, and
 this
 in turn is based on Spark.

 Thanks in advance for any advice!

 Wouter



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




spark-repl_1.2.0 was not uploaded to central maven repository.

2014-12-20 Thread Peng Cheng
Everything else is there except spark-repl. Can someone check that out this
weekend?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-repl-1-2-0-was-not-uploaded-to-central-maven-repository-tp20799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on Tachyon

2014-12-20 Thread Peng Cheng
IMHO: cache doesn't provide redundancy, and its in the same jvm, so its much
faster.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Tachyon-tp1463p20800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to extend an one-to-one RDD of Spark that can be persisted?

2014-12-04 Thread Peng Cheng
In my project I extend a new RDD type that wraps another RDD and some
metadata. The code I use is similar to FilteredRDD implementation:

case class PageRowRDD(
   self: RDD[PageRow],
   @transient keys: ListSet[KeyLike] = ListSet()
){
  override def getPartitions: Array[Partition] =
firstParent[PageRow].partitions

  override val partitioner = self.partitioner

  override def compute(split: Partition, context: TaskContext) =
firstParent[PageRow].iterator(split, context)
}
However when I try to persist and reuse it in 2 transformations. My logs and
debug shows that it is being computed twice, rather than being reused in
memory.

The problem is: there is no such problem for FilteredRDD. How do I avoid
this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-extend-an-one-to-one-RDD-of-Spark-that-can-be-persisted-tp20394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
Sorry its a timeout duplicate, please remove it



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018p18020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-21 Thread Peng Cheng
Looks like the only way is to implement that feature. There is no way of
hacking it into working



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p16985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-06 Thread Peng Cheng
Any suggestions? I'm thinking of submitting a feature request for mutable
broadcast. Is it doable?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p15807.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Asynchronous Broadcast from driver to workers, is it possible?

2014-10-04 Thread Peng Cheng
While Spark already offers support for asynchronous reduce (collect data from
workers, while not interrupting execution of a parallel transformation)
through accumulator, I have made little progress on making this process
reciprocal, namely, to broadcast data from driver to workers to be used by
all executors in the middle of a transformation. This primarily intended to
be used in downpour SGD/adagrad, a non-blocking concurrent machine learning
optimizer that performs better than existing synchronous GD in MLlib, and
have vast application in training of many models.

My attempt so far is to stick to out-of-the-box, immutable broadcast, open a
new thread on driver, in which I broadcast a thin data wrapper that when
deserialized, will insert into a mutable singleton that is already
replicated to all workers in the fat jar, this customized deserialization is
not hard, just overwrite readObject like this:

class AutoInsert(var value: Int) extends Serializable{

  WorkerReplica.last = value

  private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
WorkerContainer.last = this.value
  }
}

Unfortunately it looks like the deserializtion is called lazily and won't do
anything before a worker use it (Broadcast[AutoInsert]), this is impossible
without waiting for workers' stage to be finished and broadcast again. I'm
wondering if I can 'hack' this thing into working? Or I'll have to write a
serious extension to broadcast component to enable changing the value.

Hope I can find like-minded on this forum because ML is a selling point of
Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Crawler and Scraper with different priorities

2014-09-09 Thread Peng Cheng
Hi Sandeep,

would you be interesting in joining my open source project?

https://github.com/tribbloid/spookystuff

IMHO spark is indeed not for general purpose crawling, of which distributed
job is highly homogeneous. But good enough for directional scraping which
involves heterogeneous input and deep graph following  extraction. Please
drop me a line if you have a user case, as I'll try to integrate it as a
feature.

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug or feature? Overwrite broadcasted variables.

2014-08-19 Thread Peng Cheng
Unfortunately, After some research I found its just a side effect of how
closure containing var works in scala:
http://stackoverflow.com/questions/11657676/how-does-scala-maintains-the-values-of-variable-when-the-closure-was-defined

the closure keep referring var broadcasted wrapper as a pointer, until it is
shipped to nodes, which is only triggered lazily. So, you can't do this
after shipping already started (e.g. change the broadcasted value in a new
thread when an action is running). It's neither a feature or bug, just an
illusion.

I would really like to see a non-blocking Broadcast.set() being implemented,
it makes a lot of stochastic algorithms easier to write.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
I'm curious to see that if you declare broadcasted wrapper as a var, and
overwrite it in the driver program, the modification can have stable impact
on all transformations/actions defined BEFORE the overwrite but was executed
lazily AFTER the overwrite:

   val a = sc.parallelize(1 to 10)

var broadcasted = sc.broadcast(broad)

val b = a.map(_ + broadcasted.value)
//  b.persist()
for (line - b.collect()) {  print(line)  }

println(\n===)
broadcasted = sc.broadcast(cast)

for (line - b.collect()) {  print(line)  }

the result is:

1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad
===
1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast

Of course, if you persist b before overwriting it will still get the
non-surprising result (both are 10broad... because they are persisted). This
can be useful sometimes but may cause confusion at other times (people can
no longer add persist at will just for backup because it may change the
result).

So far I've found no documentation supporting this feature. So can some one
confirm that its a feature craftly designed?

Yours Peng 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
Yeah, Thanks a lot. I know for people understanding lazy execution this seems
straightforward. But for those who don't it may become a liability.

I've only tested its stability on a small example (which seems stable),
hopefully it's not a serendipity. Can a committer confirm this?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Peng Cheng
I give up, communication must be blocked by the complex EC2 network topology
(though the error information indeed need some improvement). It doesn't make
sense to run a client thousands miles away to communicate frequently with
workers. I have moved everything to EC2 now.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
This will be handy for demo and quick prototyping as the command-line REPL
doesn't support a lot of editor features, also, you don't need to ssh into
your worker/master if your client is behind an NAT wall. Since Spark
codebase has a minimalistic design philosophy I don't think this component
can make into the main repository. However it can be an independent project
that is also supported by the community (like Solr/ElasticSearch to Lucene)

I've reviewed and tested a few REPL web ui including:
- Scala-notebook: https://github.com/Bridgewater/scala-notebook
- Tinsmiths: https://github.com/kouphax/tinsmith
- IScala: https://github.com/mattpap/IScala
- Codebrew: https://codebrew.io/

however they are either too heavyweight, or their ILoop is buried very deep
(sometimes even in another library). I'm interested in working on this part,
has anyone experimented on similar solution before?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
That would be really cool with IPython, But I' still wondering if all
language features are supported, namely I need these 2 in particular:
1. importing class and ILoop from external jars (so I can point it to
SparkILoop or Sparkbinding ILoop of Apache Mahout instead of Scala's default
ILoop)
2. implicit typecast/wrapper and implicit variable (widely used in
SparkContext.scala)
I'll be able to start experimentation immediately if someone can confirm
these features.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447p8469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark slave fail to start with wierd error information

2014-06-25 Thread Peng Cheng
Sorry I just realize that start-slave is for a different task. Please close
this



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-25 Thread Peng Cheng
I'm running a very small job (16 partitions, 2 stages) on a 2-node cluster,
each with 15G memory, the master page looks all normal:

URL: spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077
Workers: 1
Cores: 2 Total, 2 Used
Memory: 13.9 GB Total, 512.0 MB Used
Applications: 1 Running, 0 Completed
Drivers: 0 Running, 1 Completed
Status: ALIVE
Workers

Id  Address State   Cores   Memory
worker-20140625083124-ip-172-31-35-57.ec2.internal-54548
ip-172-31-35-57.ec2.internal:54548  ALIVE   2 (2 Used)   13.9 GB (512.0 
MB Used)
Running Applications

ID  NameCores   Memory per Node Submitted Time  UserState   Duration
app-20140625083158- org.tribbloid.spookystuff.example.GoogleImage$   2  
512.0 MB2014/06/25 08:31:58 pengRUNNING 17 min

However when submitting the job in client mode:

$SPARK_HOME/bin/spark-submit \
--class org.tribbloid.spookystuff.example.GoogleImage \
--master spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077 \
--deploy-mode client \
./../../../target/spookystuff-example-assembly-0.1.0-SNAPSHOT.jar \

it is never picked up by any worker despite that 13.4G memory and 2 cores in
total are available. The log of driver shows repeatedly:

14/06/25 04:46:29 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory

Looks like its either a bug or misinformation. Can someone confirm this so I
can submit a JIRA?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using Spark as web app backend

2014-06-25 Thread Peng Cheng
Totally agree, also there is a class 'SparkSubmit' you can call directly to
replace shellscript



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-as-web-app-backend-tp8163p8248.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-25 Thread Peng Cheng
Expanded to 4 nodes and change the workers to listen to public DNS, but still
it shows the same error (which is obviously wrong). I can't believe I'm the
first to encounter this issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8285.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark slave fail to start with wierd error information

2014-06-24 Thread Peng Cheng
I'm trying to link a spark slave with an already-setup master, using:

$SPARK_HOME/sbin/start-slave.sh spark://ip-172-31-32-12:7077

However the result shows that it cannot open a log file it is supposed to
create:

failed to launch org.apache.spark.deploy.worker.Worker:
tail: cannot open
'/opt/spark/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-spark://ip-172-31-32-12:7077-ip-172-31-36-80.out'
for reading: No such file or directory
full log in
/opt/spark/spark-1.0.0-bin-hadoop1/sbin/../logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-spark://ip-172-31-32-12:7077-ip-172-31-36-80.out
(ignore this line as the log file is not there)

What happened here?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark slave fail to start with wierd error information

2014-06-24 Thread Peng Cheng
I haven't setup a passwordless login from slave to master node yet (I was
under impression that this is not necessary since they communicate using
port 7077)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to Reload Spark Configuration Files

2014-06-24 Thread Peng Cheng
I've read somewhere that in 1.0 there is a bash tool called 'spark-config.sh'
that allows you to propagate your config files to a number of master and
slave nodes. However I haven't use it myself



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Reload-Spark-Configuration-Files-tp8159p8219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError

2014-06-24 Thread Peng Cheng
I got 'NoSuchFieldError' which is of the same type. its definitely a
dependency jar conflict. spark driver will load jars of itself which in
recent version get many dependencies that are 1-2 years old. And if your
newer version dependency is in the same package it will be shaded (Java's
first come first serve principle) and the new method won't be found. Try
using:

mvn dependency:tree to find duplicate artifacts

and use maven-shade-plugin to rename the package of your newer library.
(IntelliJ doesn't officially support this plug-in so it may become quirky,
if that happens try re-importing the project)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Upgrading-to-Spark-1-0-0-causes-NoSuchMethodError-tp8207p8220.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Does PUBLIC_DNS environment parameter really works?

2014-06-24 Thread Peng Cheng
I'm deploying a cluster to Amazon EC2, trying to override its internal ip
addresses with public dns

I start a cluster with environment parameter: SPARK_PUBLIC_DNS=[my EC2
public DNS]

But it doesn't change anything on the web UI, it still shows internal ip
address

Spark Master at spark://ip-172-31-32-12:7077



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-PUBLIC-DNS-environment-parameter-really-works-tp8237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-22 Thread Peng Cheng
Right problem solved in a most disgraceful manner. Just add a package
relocation in maven shade config.
The downside is that it is not compatible with my IDE (IntelliJ IDEA), will
cause:

Error:scala.reflect.internal.MissingRequirementError: object scala.runtime
in compiler mirror not found.: object scala.runtime in compiler mirror not
found.

and all scala object inspection fail and marked as error. So I'm still
looking for an alternative



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8088.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-21 Thread Peng Cheng
Thanks a lot! Let me check my maven shade plugin config and see if there is a
fix



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8073.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-21 Thread Peng Cheng
Indeed I see a lot of duplicate package warning in the maven-shade assembly
package output, so I tried to eliminate them:

First I set scope of dependency to apache-spark to 'provided', as suggested
in this page:
http://spark.apache.org/docs/latest/submitting-applications.html

But spark master gave me a blunt dependency not found error:
Exception in thread main java.lang.NoClassDefFoundError:
scala/collection/Seq
at ... [my main object]

Then I revert it back to 'compile' to see if things got better, but after
which I again saw duplicates of packages, then random errors (like
NoSuchFieldError, IllegalStateException etc.)

Is setting scope = 'provided' mandatory in deployment? I mere remove this
line for debugging locally.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8076.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-21 Thread Peng Cheng
Latest Advancement:
I found the cause of NoClassDef exception: I wasn't using spark-submit,
instead I tried to run the spark application directly with SparkConf set in
the code. (this is handy in local debugging). However the old problem
remains: Even my maven-shade plugin doesn't give any warning of duplicate,
it still gives me the same error:

14/06/21 16:43:59 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-2,5,main]
java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.entity.ContentType.parse(ContentType.java:229)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-21 Thread Peng Cheng
I also found that any buggy application submitted in --deploy-mode = cluster
mode will crash the worker (turn status to 'DEAD'). This shouldn't really
happen, otherwise nobody will use this mode. It is yet unclear whether all
workers will crash or only the one running the driver will (as I only have
one worker)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8079.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark throws NoSuchFieldError when testing on cluster mode

2014-06-21 Thread Peng Cheng
Hi Sean,

OK I'm about 90% sure about the cause of this problem: Just another classic
Dependency conflict:
Myproject - Selenium - apache.httpcomponents:httpcore 4.3.1 (has
ContentType)
Spark - Spark SQL Hive - Hive - Thrift - apache.httpcomponents:httpcore
4.1.3 (has no ContentType)

Though I generated an uber jar excluding Spark/Shark as 'provided' and
indeed include the latest httpcore 4.3. By default spark-submit will load
the uber jar of itself first, then load application's, so unfortunately my
dependency was shaded. I hope I can change the class loading sequence (which
is very unlikely unless someone submit a JIRA), but in worst case I can only
resort the dumb way - manually renaming packages in maven-shade plugin.

That will be the plan for tomorrow. However, I'm wondering if there is a
'clean solution'? Like some plugin that automagically put packages in
different versions, or detect conflicts and rename to aliases?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-throws-NoSuchFieldError-when-testing-on-cluster-mode-tp8064p8083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: What is the best way to handle transformations or actions that takes forever?

2014-06-17 Thread Peng Cheng
I've tried enabling the speculative jobs, this seems partially solved the
problem, however I'm not sure if it can handle large-scale situations as it
only start when 75% of the job is finished.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


What is the best way to handle transformations or actions that takes forever?

2014-06-16 Thread Peng Cheng
My transformations or actions has some external tool set dependencies and
sometimes they just stuck somewhere and there is no way I can fix them. If I
don't want the job to run forever, Do I need to implement several monitor
threads to throws an exception when they stuck. Or the framework can already
handle that?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark1.0 spark sql saveAsParquetFile Error

2014-06-09 Thread Peng Cheng
I wasn't using spark sql before.
But by default spark should retry the exception for 4 times.
I'm curious why it aborted after 1 failure



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-spark-sql-saveAsParquetFile-Error-tp7006p7252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to enable fault-tolerance?

2014-06-09 Thread Peng Cheng
I speculate that Spark will only retry on exceptions that are registered with
TaskSetScheduler, so a definitely-will-fail task will fail quickly without
taking more resources. However I haven't found any documentation or web page
on it



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Occasional failed tasks

2014-06-09 Thread Peng Cheng
I think these failed task must got retried automatically if you can't see any
error in your results. Other wise the entire application will throw a
SparkException and abort.

Unfortunately I don't know how to do this, my application always abort.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Occasional-failed-tasks-tp527p7259.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to enable fault-tolerance?

2014-06-09 Thread Peng Cheng
Thanks a lot! That's very responsive, somebody definitely has 
encountered the same problem before, and added two hidden modes in 
masterURL:


(from SparkContext.scala: line1431)

   // Regular expression for local[N, maxRetries], used in tests with 
failing tasks
   val LOCAL_N_FAILURES_REGEX = 
local\[([0-9]+)\s*,\s*([0-9]+)\].r
   // Regular expression for simulating a Spark cluster of [N, cores, 
memory] locally
   val LOCAL_CLUSTER_REGEX = 
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r


Unfortunately they never got pushed into the documentation, and you got 
config parameters scattered in two different places (masterURL and 
$spark.task.maxFailures).
I'm thinking of adding a new config parameter 
$spark.task.maxLocalFailures to override 1, how do you think?


Thanks again buddy.

Yours Peng

On Mon 09 Jun 2014 01:33:45 PM EDT, Aaron Davidson wrote:

Looks like your problem is local mode:
https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430

For some reason, someone decided not to do retries when running in
local mode. Not exactly sure why, feel free to submit a JIRA on this.


On Mon, Jun 9, 2014 at 8:59 AM, Peng Cheng pc...@uow.edu.au
mailto:pc...@uow.edu.au wrote:

I speculate that Spark will only retry on exceptions that are
registered with
TaskSetScheduler, so a definitely-will-fail task will fail quickly
without
taking more resources. However I haven't found any documentation
or web page
on it



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.




Re: How to enable fault-tolerance?

2014-06-09 Thread Peng Cheng

Oh, and to make things worse, they forgot '\*' in their regex.
Am I the first to encounter this problem before?

On Mon 09 Jun 2014 02:24:43 PM EDT, Peng Cheng wrote:

Thanks a lot! That's very responsive, somebody definitely has
encountered the same problem before, and added two hidden modes in
masterURL:

(from SparkContext.scala: line1431)

   // Regular expression for local[N, maxRetries], used in tests with
failing tasks
   val LOCAL_N_FAILURES_REGEX = local\[([0-9]+)\s*,\s*([0-9]+)\].r
   // Regular expression for simulating a Spark cluster of [N, cores,
memory] locally
   val LOCAL_CLUSTER_REGEX =
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r

Unfortunately they never got pushed into the documentation, and you
got config parameters scattered in two different places (masterURL and
$spark.task.maxFailures).
I'm thinking of adding a new config parameter
$spark.task.maxLocalFailures to override 1, how do you think?

Thanks again buddy.

Yours Peng

On Mon 09 Jun 2014 01:33:45 PM EDT, Aaron Davidson wrote:

Looks like your problem is local mode:
https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430


For some reason, someone decided not to do retries when running in
local mode. Not exactly sure why, feel free to submit a JIRA on this.


On Mon, Jun 9, 2014 at 8:59 AM, Peng Cheng pc...@uow.edu.au
mailto:pc...@uow.edu.au wrote:

I speculate that Spark will only retry on exceptions that are
registered with
TaskSetScheduler, so a definitely-will-fail task will fail quickly
without
taking more resources. However I haven't found any documentation
or web page
on it



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html

Sent from the Apache Spark User List mailing list archive at
Nabble.com.




Re: How to enable fault-tolerance?

2014-06-09 Thread Peng Cheng
Hi Matei, Yeah you are right this is very niche (my user case is as a 
web crawler), but I glad you also like an additional property. Let me 
open a JIRA.


Yours Peng

On Mon 09 Jun 2014 03:08:29 PM EDT, Matei Zaharia wrote:

If this is a useful feature for local mode, we should open a JIRA to document 
the setting or improve it (I’d prefer to add a spark.local.retries property 
instead of a special URL format). We initially disabled it for everything 
except unit tests because 90% of the time an exception in local mode means a 
problem in the application, and we’d rather let the user debug that right away 
rather than retrying the task several times and having them worry about why 
they get so many errors.

Matei

On Jun 9, 2014, at 11:28 AM, Peng Cheng pc...@uowmail.edu.au wrote:


Oh, and to make things worse, they forgot '\*' in their regex.
Am I the first to encounter this problem before?

On Mon 09 Jun 2014 02:24:43 PM EDT, Peng Cheng wrote:

Thanks a lot! That's very responsive, somebody definitely has
encountered the same problem before, and added two hidden modes in
masterURL:

(from SparkContext.scala: line1431)

   // Regular expression for local[N, maxRetries], used in tests with
failing tasks
   val LOCAL_N_FAILURES_REGEX = local\[([0-9]+)\s*,\s*([0-9]+)\].r
   // Regular expression for simulating a Spark cluster of [N, cores,
memory] locally
   val LOCAL_CLUSTER_REGEX =
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r

Unfortunately they never got pushed into the documentation, and you
got config parameters scattered in two different places (masterURL and
$spark.task.maxFailures).
I'm thinking of adding a new config parameter
$spark.task.maxLocalFailures to override 1, how do you think?

Thanks again buddy.

Yours Peng

On Mon 09 Jun 2014 01:33:45 PM EDT, Aaron Davidson wrote:

Looks like your problem is local mode:
https://github.com/apache/spark/blob/640f9a0efefd42cff86aecd4878a3a57f5ae85fa/core/src/main/scala/org/apache/spark/SparkContext.scala#L1430


For some reason, someone decided not to do retries when running in
local mode. Not exactly sure why, feel free to submit a JIRA on this.


On Mon, Jun 9, 2014 at 8:59 AM, Peng Cheng pc...@uow.edu.au
mailto:pc...@uow.edu.au wrote:

I speculate that Spark will only retry on exceptions that are
registered with
TaskSetScheduler, so a definitely-will-fail task will fail quickly
without
taking more resources. However I haven't found any documentation
or web page
on it



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-fault-tolerance-tp7250p7255.html

Sent from the Apache Spark User List mailing list archive at
Nabble.com.