Re: ExternalAppendOnlyMap throw no such element
Hi there, I was able to finally identify the bug as StreamBuffer.compareTo method's ill defined behavior when key's hashCode equals to Int.MaxValue. Though this only occur in aboue 1/2^32 chance, it can happen a lot when your key size approach 2^32. I have create a pull request for the bug fix https://github.com/apache/incubator-spark/pull/612 Best Regards, Jiacheng Guo On Mon, Jan 27, 2014 at 2:36 PM, guojc wrote: > Hi Patrick, > I have create the jira > https://spark-project.atlassian.net/browse/SPARK-1045. It turn out the > situation is related to join two large rdd, not related to the combine > process as previous thought. > > Best Regards, > Jiacheng Guo > > > On Mon, Jan 27, 2014 at 11:07 AM, guojc wrote: > >> Hi Patrick, >>I think this might be data related and about edge condition handling >> as I only get a single partition repeatedly throw exception on >> externalappendonlymap's iterator. I will file a jira as soon as I can >> isolate the problem. Btw, the test is intentionally abuse the external sort >> to see its performance impact on real application, because I have trouble >> to configure a right partition number for each dataset. >> >> Best Regards, >> Jiacheng Guo >> >> >> On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell wrote: >> >>> Hey There, >>> >>> So one thing you can do is disable the external sorting, this should >>> preserve the behavior exactly was it was in previous releases. >>> >>> It's quite possible that the problem you are having relates to the >>> fact that you have individual records that are 1GB in size. This is a >>> pretty extreme case that may violate assumptions in the implementation >>> of the external aggregation code. >>> >>> Would you mind opening a Jira for this? Also, if you are able to find >>> an isolated way to recreate the behavior it will make it easier to >>> debug and fix. >>> >>> IIRC, even with external aggregation Spark still materializes the >>> final combined output *for a given key* in memory. If you are >>> outputting GB of data for a single key, then you might also look into >>> a different parallelization strategy for your algorithm. Not sure if >>> this is also an issue though... >>> >>> - Patrick >>> >>> On Sun, Jan 26, 2014 at 2:27 AM, guojc wrote: >>> > Hi Patrick, >>> > I still get the exception on lastest master >>> > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the >>> subject. >>> > I'm using KryoSerialzation with a custom serialization function, and >>> the >>> > exception come from a rdd operation >>> > >>> combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer"). >>> > All previous operation seems ok. The only difference is that this >>> operation >>> > can generate some a large dict object around 1 gb size. I hope this >>> can >>> > give you some clue what might go wrong. I'm still having trouble >>> figure out >>> > the cause. >>> > >>> > Thanks, >>> > Jiacheng Guo >>> > >>> > >>> > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell >>> wrote: >>> >> >>> >> This code has been modified since you reported this so you may want to >>> >> try the current master. >>> >> >>> >> - Patrick >>> >> >>> >> On Mon, Jan 20, 2014 at 4:22 AM, guojc wrote: >>> >> > Hi, >>> >> > I'm tring out lastest master branch of spark for the exciting >>> external >>> >> > hashmap feature. I have a code that is running correctly at spark >>> 0.8.1 >>> >> > and >>> >> > I only make a change for its easily to be spilled to disk. However, >>> I >>> >> > encounter a few task failure of >>> >> > java.util.NoSuchElementException (java.util.NoSuchElementException) >>> >> > >>> >> > >>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) >>> >> > And the job seems to fail to recover. >>> >> > Can anyone give some suggestion on how to investigate the issue? >>> >> > Thanks,Jiacheng Guo >>> > >>> > >>> >> >> >
Re: ExternalAppendOnlyMap throw no such element
Hi Patrick, I have create the jira https://spark-project.atlassian.net/browse/SPARK-1045. It turn out the situation is related to join two large rdd, not related to the combine process as previous thought. Best Regards, Jiacheng Guo On Mon, Jan 27, 2014 at 11:07 AM, guojc wrote: > Hi Patrick, >I think this might be data related and about edge condition handling as > I only get a single partition repeatedly throw exception on > externalappendonlymap's iterator. I will file a jira as soon as I can > isolate the problem. Btw, the test is intentionally abuse the external sort > to see its performance impact on real application, because I have trouble > to configure a right partition number for each dataset. > > Best Regards, > Jiacheng Guo > > > On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell wrote: > >> Hey There, >> >> So one thing you can do is disable the external sorting, this should >> preserve the behavior exactly was it was in previous releases. >> >> It's quite possible that the problem you are having relates to the >> fact that you have individual records that are 1GB in size. This is a >> pretty extreme case that may violate assumptions in the implementation >> of the external aggregation code. >> >> Would you mind opening a Jira for this? Also, if you are able to find >> an isolated way to recreate the behavior it will make it easier to >> debug and fix. >> >> IIRC, even with external aggregation Spark still materializes the >> final combined output *for a given key* in memory. If you are >> outputting GB of data for a single key, then you might also look into >> a different parallelization strategy for your algorithm. Not sure if >> this is also an issue though... >> >> - Patrick >> >> On Sun, Jan 26, 2014 at 2:27 AM, guojc wrote: >> > Hi Patrick, >> > I still get the exception on lastest master >> > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the >> subject. >> > I'm using KryoSerialzation with a custom serialization function, and the >> > exception come from a rdd operation >> > >> combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer"). >> > All previous operation seems ok. The only difference is that this >> operation >> > can generate some a large dict object around 1 gb size. I hope this can >> > give you some clue what might go wrong. I'm still having trouble >> figure out >> > the cause. >> > >> > Thanks, >> > Jiacheng Guo >> > >> > >> > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell >> wrote: >> >> >> >> This code has been modified since you reported this so you may want to >> >> try the current master. >> >> >> >> - Patrick >> >> >> >> On Mon, Jan 20, 2014 at 4:22 AM, guojc wrote: >> >> > Hi, >> >> > I'm tring out lastest master branch of spark for the exciting >> external >> >> > hashmap feature. I have a code that is running correctly at spark >> 0.8.1 >> >> > and >> >> > I only make a change for its easily to be spilled to disk. However, I >> >> > encounter a few task failure of >> >> > java.util.NoSuchElementException (java.util.NoSuchElementException) >> >> > >> >> > >> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) >> >> > And the job seems to fail to recover. >> >> > Can anyone give some suggestion on how to investigate the issue? >> >> > Thanks,Jiacheng Guo >> > >> > >> > >
Re: ExternalAppendOnlyMap throw no such element
Hi Patrick, I think this might be data related and about edge condition handling as I only get a single partition repeatedly throw exception on externalappendonlymap's iterator. I will file a jira as soon as I can isolate the problem. Btw, the test is intentionally abuse the external sort to see its performance impact on real application, because I have trouble to configure a right partition number for each dataset. Best Regards, Jiacheng Guo On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell wrote: > Hey There, > > So one thing you can do is disable the external sorting, this should > preserve the behavior exactly was it was in previous releases. > > It's quite possible that the problem you are having relates to the > fact that you have individual records that are 1GB in size. This is a > pretty extreme case that may violate assumptions in the implementation > of the external aggregation code. > > Would you mind opening a Jira for this? Also, if you are able to find > an isolated way to recreate the behavior it will make it easier to > debug and fix. > > IIRC, even with external aggregation Spark still materializes the > final combined output *for a given key* in memory. If you are > outputting GB of data for a single key, then you might also look into > a different parallelization strategy for your algorithm. Not sure if > this is also an issue though... > > - Patrick > > On Sun, Jan 26, 2014 at 2:27 AM, guojc wrote: > > Hi Patrick, > > I still get the exception on lastest master > > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the subject. > > I'm using KryoSerialzation with a custom serialization function, and the > > exception come from a rdd operation > > > combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer"). > > All previous operation seems ok. The only difference is that this > operation > > can generate some a large dict object around 1 gb size. I hope this can > > give you some clue what might go wrong. I'm still having trouble figure > out > > the cause. > > > > Thanks, > > Jiacheng Guo > > > > > > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell > wrote: > >> > >> This code has been modified since you reported this so you may want to > >> try the current master. > >> > >> - Patrick > >> > >> On Mon, Jan 20, 2014 at 4:22 AM, guojc wrote: > >> > Hi, > >> > I'm tring out lastest master branch of spark for the exciting > external > >> > hashmap feature. I have a code that is running correctly at spark > 0.8.1 > >> > and > >> > I only make a change for its easily to be spilled to disk. However, I > >> > encounter a few task failure of > >> > java.util.NoSuchElementException (java.util.NoSuchElementException) > >> > > >> > > org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) > >> > And the job seems to fail to recover. > >> > Can anyone give some suggestion on how to investigate the issue? > >> > Thanks,Jiacheng Guo > > > > >
Re: ExternalAppendOnlyMap throw no such element
Hi Patrick, I still get the exception on lastest master 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the subject. I'm using KryoSerialzation with a custom serialization function, and the exception come from a rdd operation combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer"). All previous operation seems ok. The only difference is that this operation can generate some a large dict object around 1 gb size. I hope this can give you some clue what might go wrong. I'm still having trouble figure out the cause. Thanks, Jiacheng Guo On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell wrote: > This code has been modified since you reported this so you may want to > try the current master. > > - Patrick > > On Mon, Jan 20, 2014 at 4:22 AM, guojc wrote: > > Hi, > > I'm tring out lastest master branch of spark for the exciting external > > hashmap feature. I have a code that is running correctly at spark 0.8.1 > and > > I only make a change for its easily to be spilled to disk. However, I > > encounter a few task failure of > > java.util.NoSuchElementException (java.util.NoSuchElementException) > > > org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) > > And the job seems to fail to recover. > > Can anyone give some suggestion on how to investigate the issue? > > Thanks,Jiacheng Guo >
Re: Does foreach operation increase rdd lineage?
Yes, I means Gibbs sampling. From the api document, I don't see why the data will be collected to driver. The document say that ' def foreach(f: (T) => Unit): Unit Applies a function f to all elements of this RDD.' So If I want to change my data in place, what operation I should use? Best Regards, Jiacheng Guo On Fri, Jan 24, 2014 at 9:03 PM, 尹绪森 wrote: > Do you mean "Gibbs sampling" ? Actually, foreach is an action, it will > collect all data from workers to driver. You will get OOM complained by JVM. > > I am not very sure of your implementation, but if data not need to join > together, you'd better keep them in workers. > > > 2014/1/24 guojc > >> Hi, >>I'm writing a paralell mcmc program that having a very large dataset >> in memory, and need to update the dataset in-memory and avoid creating >> additional copy. Should I choose a foreach operation on rdd to express the >> change? or I have to create a new rdd after each sampling process? >> >> Thanks, >> Jiacheng Guo >> > > > > -- > Best Regards > --- > Xusen Yin尹绪森 > Beijing Key Laboratory of Intelligent Telecommunications Software and > Multimedia > Beijing University of Posts & Telecommunications > Intel Labs China > Homepage: *http://yinxusen.github.io/ <http://yinxusen.github.io/>* >
Does foreach operation increase rdd lineage?
Hi, I'm writing a paralell mcmc program that having a very large dataset in memory, and need to update the dataset in-memory and avoid creating additional copy. Should I choose a foreach operation on rdd to express the change? or I have to create a new rdd after each sampling process? Thanks, Jiacheng Guo
ExternalAppendOnlyMap throw no such element
Hi, I'm tring out lastest master branch of spark for the exciting external hashmap feature. I have a code that is running correctly at spark 0.8.1 and I only make a change for its easily to be spilled to disk. However, I encounter a few task failure of java.util.NoSuchElementException (java.util.NoSuchElementException) org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277) org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212) org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) And the job seems to fail to recover. Can anyone give some suggestion on how to investigate the issue? Thanks,Jiacheng Guo
Re: App master failed to find application jar in the master branch on YARN
Hi Tom, Thank you for your help. I finally found the problem. It's a silly mistake for me. After checkout git repository, I forgot to change the spark-env.sh under conf folder to add yarn config folder. I guess it might be helpful to display warning message about that. Anyway, thank you for your kindness for helping me ruling out the problem. Best Regards, Jiacheng Guo On Tue, Nov 19, 2013 at 11:55 PM, Tom Graves wrote: > The property is deprecated but will still work. Either one is fine. > > Launching the job from the namenode is fine . > > I brought up a cluster with 2.0.5-alpha and built the latest spark master > branch and it runs fine for me. It looks like namenode 2.0.5-alpha won't > even start with the defaulFs of file:///. Please make sure your namenode > is actually up and running and you are pointing to it because you can run > some jobs successfully without it (on a single node cluster), but when you > have a multinode cluster here is the error I get when I run without a > namenode up and it looks very similar to your error message: > > appDiagnostics: Application application_1384876319080_0001 failed > 1 times due to AM Container for appattempt_1384876319080_0001_01 exited > with exitCode: -1000 due to: java.io.FileNotFoundException: File > file:/home/tgravescs/spark-master/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > does not exist > > When you changed the default fs config did you restart the cluster? > > Can you try just running the examples jar: > > > SPARK_JAR=assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > > ./spark-class org.apache.spark.deploy.yarn.Client --jar > examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar > --class org.apache.spark.examples.SparkPi --args yarn-standalone > --num-workers 2 --master-memory 2g --worker-memory 2g --worker-cores 1 > > On the client side you should see messages like this: > 13/11/19 15:41:30 INFO yarn.Client: Uploading > file:/home/tgravescs/spark-master/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar > to hdfs:// > namenode.host.com:9000/user/tgravescs/.sparkStaging/application_1384874528558_0003/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar > 13/11/19 15:41:31 INFO yarn.Client: Uploading > file:/home/tgravescs/spark-master/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > to hdfs:// > namenode.host.com:9000/user/tgravescs/.sparkStaging/application_1384874528558_0003/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > > Tom > > > On Tuesday, November 19, 2013 5:35 AM, guojc wrote: > Hi Tom, >Thank you for your response. I have double checked that I had upload > both jar in the same folder on hdfs. I think the fs.default.name > you pointed out is the old deprecated name for fs.defaultFS config > accordiing > http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-project-dist/hadoop-common/DeprecatedProperties.html. > Anyway, we have tried both > fs.default.name and fs.defaultFS set to hdfs namenode, and the situation > remained same. And we have removed SPARK_HOME env variable on worker node. > An additional information might be related is that job submission is done > on the same machine of hdfs namenode. But I'm not sure this will cause the > problem. > > Thanks, > Jiacheng Guo > > > On Tue, Nov 19, 2013 at 11:50 AM, Tom Graves wrote: > > Sorry for the delay. What is the default filesystem on your HDFS setup? > It looks like its set to file: rather then hdfs://. That is the only > reason I can think its listing the directory as > file:/home/work/.sparkStaging/application_1384588058297_0056. > Its basically just copying it local rather then uploading to hdfs and its > just trying to use the local > file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar. > It generally would create that in hdfs so it accessible on all the nodes. > Is your /home/work nfs mounted on all the nodes? > > You can find the default fs by looking at the Hadoop config files. > Generally in core-site.xml. its specified by: > fs.default.name > > Its pretty odd if those are its erroring with file:// when you specified > hdfs://. > when you tried the hdfs:// did you upload both the spark jar and your > client jar (SparkAUC-assembly-0.1.jar)? If not try that and make sure to > put hdfs:// on them when you export SPARK_JAR and specify the --jar option. > > > I'll try to reproduce the error tomorrow to see if a bug was introduced > when I added the feature to run spa
Re: App master failed to find application jar in the master branch on YARN
Hi Tom, Thank you for your response. I have double checked that I had upload both jar in the same folder on hdfs. I think the fs.default.name you pointed out is the old deprecated name for fs.defaultFS config accordiing http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-project-dist/hadoop-common/DeprecatedProperties.html. Anyway, we have tried both fs.default.name and fs.defaultFS set to hdfs namenode, and the situation remained same. And we have removed SPARK_HOME env variable on worker node. An additional information might be related is that job submission is done on the same machine of hdfs namenode. But I'm not sure this will cause the problem. Thanks, Jiacheng Guo On Tue, Nov 19, 2013 at 11:50 AM, Tom Graves wrote: > Sorry for the delay. What is the default filesystem on your HDFS setup? > It looks like its set to file: rather then hdfs://. That is the only > reason I can think its listing the directory as > file:/home/work/.sparkStaging/application_1384588058297_0056. > Its basically just copying it local rather then uploading to hdfs and its > just trying to use the local > file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar. > It generally would create that in hdfs so it accessible on all the nodes. > Is your /home/work nfs mounted on all the nodes? > > You can find the default fs by looking at the Hadoop config files. > Generally in core-site.xml. its specified by: > fs.default.name > > Its pretty odd if those are its erroring with file:// when you specified > hdfs://. > when you tried the hdfs:// did you upload both the spark jar and your > client jar (SparkAUC-assembly-0.1.jar)? If not try that and make sure to > put hdfs:// on them when you export SPARK_JAR and specify the --jar option. > > > I'll try to reproduce the error tomorrow to see if a bug was introduced > when I added the feature to run spark from HDFS. > > Tom > > > On Monday, November 18, 2013 11:13 AM, guojc wrote: > Hi Tom, >I'm on Hadoop 2.05. I can launch application spark 0.8 release > normally. However I switch to git master branch version with application > built with it, I got the jar not found exception and same happens to the > example application. I have tried both file:// protocol and hdfs:// > protocol with jar in local file system and hdfs respectively, and even > tried jar list parameter when new spark context. The exception is slightly > different for hdfs protocol and local file path. My application launch > command is > > > SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > /home/work/guojiacheng/spark/spark-class > org.apache.spark.deploy.yarn.Client --jar > /home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar > --class myClass.SparkAUC --args -c --args yarn-standalone --args -i > --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args > hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args > hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60 > --master-memory 6g --worker-memory 7g --worker-cores 1 > > And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true > sbt/sbt assembly > > Only thing I can think of might be related is on each cluster node, it has > a env SPARK_HOME point to a copy of 0.8 version's position, and its bin > fold is in Path environment variable. And 0.9 version is not there. It was > something left over, when cluster was setup. But I don't know whether it > is related, as my understand is the yarn version try to distribute spark > through yarn. > > hdfs version error message: > > appDiagnostics: Application application_1384588058297_0056 failed > 1 times due to AM Container for appattempt_1384588058297_0056_01 exited > with exitCode: -1000 due to: RemoteTrace: > java.io.FileNotFoundException: File > file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar > does not exist > > local version error message. > appDiagnostics: Application application_1384588058297_0066 failed 1 times > due to AM Container for appattempt_1384588058297_0066_01 exited with > exitCode: -1000 due to: java.io.FileNotFoundException: File > file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar > does not exist > > Best Regards, > Jiacheng GUo > > > > On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves wrote: > > Hey Jiacheng Guo, > > do you have SPARK_EXAMPLES_JAR env variable set? If you do, you have to > add the --addJars parame
Re: App master failed to find application jar in the master branch on YARN
Hi Tom, I'm on Hadoop 2.05. I can launch application spark 0.8 release normally. However I switch to git master branch version with application built with it, I got the jar not found exception and same happens to the example application. I have tried both file:// protocol and hdfs:// protocol with jar in local file system and hdfs respectively, and even tried jar list parameter when new spark context. The exception is slightly different for hdfs protocol and local file path. My application launch command is SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar /home/work/guojiacheng/spark/spark-class org.apache.spark.deploy.yarn.Client --jar /home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar --class myClass.SparkAUC --args -c --args yarn-standalone --args -i --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60 --master-memory 6g --worker-memory 7g --worker-cores 1 And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly Only thing I can think of might be related is on each cluster node, it has a env SPARK_HOME point to a copy of 0.8 version's position, and its bin fold is in Path environment variable. And 0.9 version is not there. It was something left over, when cluster was setup. But I don't know whether it is related, as my understand is the yarn version try to distribute spark through yarn. hdfs version error message: appDiagnostics: Application application_1384588058297_0056 failed 1 times due to AM Container for appattempt_1384588058297_0056_01 exited with exitCode: -1000 due to: RemoteTrace: java.io.FileNotFoundException: File file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar does not exist local version error message. appDiagnostics: Application application_1384588058297_0066 failed 1 times due to AM Container for appattempt_1384588058297_0066_01 exited with exitCode: -1000 due to: java.io.FileNotFoundException: File file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar does not exist Best Regards, Jiacheng GUo On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves wrote: > Hey Jiacheng Guo, > > do you have SPARK_EXAMPLES_JAR env variable set? If you do, you have to > add the --addJars parameter to the yarn client and point to the spark > examples jar. Or just unset SPARK_EXAMPLES_JAR env variable. > > You should only have to set SPARK_JAR env variable. > > If that isn't the issue let me know the build command you used and hadoop > version, and your defaultFs or hadoop. > > Tom > > > On Saturday, November 16, 2013 2:32 AM, guojc wrote: > hi, >After reading about the exiting progress in consolidating shuffle, I'm > eager to trying out the last master branch. However up to launch the > example application, the job failed with prompt the app master failed to > find the target jar. appDiagnostics: Application > application_1384588058297_0017 failed 1 times due to AM Container for > appattempt_1384588058297_0017_01 exited with exitCode: -1000 due to: > java.io.FileNotFoundException: File > file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar > does not exist. > > Is there any change on how to launch a yarn job now? > > Best Regards, > Jiacheng Guo > > > >
Re: Does spark RDD has a partitionedByKey
After looking at the api more carefully, I just found I overlooked the partitionBy function on PairRDDFunction. It's the function I need. Sorry for the confusion. Best Regards, Jiacheng Guo On Sat, Nov 16, 2013 at 3:59 AM, Christopher Nguyen wrote: > Jiacheng, if you're OK with using the Shark layer above Spark (and I think > for many use cases the answer would be "yes"), then you can take advantage > of Shark's co-partitioning. Or do something like > https://github.com/amplab/shark/pull/100/commits > > Sent while mobile. Pls excuse typos etc. > On Nov 16, 2013 2:48 AM, "guojc" wrote: > >> Hi Meisam, >> What I want to achieve here is a bit tricky. Basically, I'm try to >> implement PerSplit SemiJoin in a iterative algorithm with Spark. It's a >> very efficient join strategy for high in-balanced data set and provide huge >> gain against normal join in that situation., >> >> Let's say we have two large rdd, a: RDD[X] and b: RDD[Y,Z] and both >> of them load directly from hdfs. So both of them will has a partitioner of >> Nothing. And X is a large complicate struct contain a set of join key Y. >> First for each partition of a , I extract join key Y from every ins of X >> in that parition and construct a hash set of join key Y and paritionID. Now >> I have a new rdd c :RDD[Y,PartionID ] and join it with b on Y and then >> construct a rdd d:RDD[PartitionID,Map[Y,Z]] by groupby on PartitionID and >> constructing map of Y and Z. As for each partition of a, I want to >> repartiion it according to its partition id, and it becomes a rdd >> e:RDD[PartitionID,X]. As both d and e will same partitioner and same key, >> they will be joined very efficiently. >> >> The key ability I want to have here is the ability to cache rdd c >> with same partitioner of rdd b and cache e. So later join with b and d will >> be efficient, because the value of b will be updated from time to time and >> d's content will change accordingly. And It will be nice to have the >> ability to repartition a with its original paritionid without actually >> shuffle across network. >> >> You can refer to >> http://researcher.watson.ibm.com/researcher/files/us-ytian/hadoopjoin.pdf for >> PerSplit SemiJoin's details. >> >> Best Regards, >> Jiacheng Guo >> >> >> On Sat, Nov 16, 2013 at 3:02 AM, Meisam Fathi wrote: >> >>> Hi guojc, >>> >>> It is not cleat for me what problem you are trying to solve. What do >>> you want to do with the result of your >>> groupByKey(myPartitioner).flatMapValues( x=>x)? Do you want to use it >>> in a join? Do you want to save it to your file system? Or do you want >>> to do something else with it? >>> >>> Thanks, >>> Meisam >>> >>> On Fri, Nov 15, 2013 at 12:56 PM, guojc wrote: >>> > Hi Meisam, >>> > Thank you for response. I know each rdd has a partitioner. What I >>> want >>> > to achieved here is re-partition a piece of data according to my custom >>> > partitioner. Currently I do that by >>> groupByKey(myPartitioner).flatMapValues( >>> > x=>x). But I'm a bit worried whether this will create additional temp >>> object >>> > collection, as result is first made into Seq the an collection of >>> tupples. >>> > Any suggestion? >>> > >>> > Best Regards, >>> > Jiahcheng Guo >>> > >>> > >>> > On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi >> > >>> > wrote: >>> >> >>> >> Hi Jiacheng, >>> >> >>> >> Each RDD has a partitioner. You can define your own partitioner if the >>> >> default partitioner does not suit your purpose. >>> >> You can take a look at this >>> >> >>> >> >>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf >>> . >>> >> >>> >> Thanks, >>> >> Meisam >>> >> >>> >> On Fri, Nov 15, 2013 at 6:54 AM, guojc wrote: >>> >> > Hi, >>> >> > I'm wondering whether spark rdd can has a partitionedByKey >>> function? >>> >> > The >>> >> > use of this function is to have a rdd distributed by according to a >>> >> > cerntain >>> >> > paritioner and cache it. And then further join performance by rdd >>> with >>> >> > same >>> >> > partitoner will a great speed up. Currently, we only have a >>> >> > groupByKeyFunction and generate a Seq of desired type , which is not >>> >> > very >>> >> > convenient. >>> >> > >>> >> > Btw, Sorry for last empty body email. I mistakenly hit the send >>> >> > shortcut. >>> >> > >>> >> > >>> >> > Best Regards, >>> >> > Jiacheng Guo >>> > >>> > >>> >> >>
App master failed to find application jar in the master branch on YARN
hi, After reading about the exiting progress in consolidating shuffle, I'm eager to trying out the last master branch. However up to launch the example application, the job failed with prompt the app master failed to find the target jar. appDiagnostics: Application application_1384588058297_0017 failed 1 times due to AM Container for appattempt_1384588058297_0017_01 exited with exitCode: -1000 due to: java.io.FileNotFoundException: File file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar does not exist. Is there any change on how to launch a yarn job now? Best Regards, Jiacheng Guo
Re: Does spark RDD has a partitionedByKey
Hi Meisam, What I want to achieve here is a bit tricky. Basically, I'm try to implement PerSplit SemiJoin in a iterative algorithm with Spark. It's a very efficient join strategy for high in-balanced data set and provide huge gain against normal join in that situation., Let's say we have two large rdd, a: RDD[X] and b: RDD[Y,Z] and both of them load directly from hdfs. So both of them will has a partitioner of Nothing. And X is a large complicate struct contain a set of join key Y. First for each partition of a , I extract join key Y from every ins of X in that parition and construct a hash set of join key Y and paritionID. Now I have a new rdd c :RDD[Y,PartionID ] and join it with b on Y and then construct a rdd d:RDD[PartitionID,Map[Y,Z]] by groupby on PartitionID and constructing map of Y and Z. As for each partition of a, I want to repartiion it according to its partition id, and it becomes a rdd e:RDD[PartitionID,X]. As both d and e will same partitioner and same key, they will be joined very efficiently. The key ability I want to have here is the ability to cache rdd c with same partitioner of rdd b and cache e. So later join with b and d will be efficient, because the value of b will be updated from time to time and d's content will change accordingly. And It will be nice to have the ability to repartition a with its original paritionid without actually shuffle across network. You can refer to http://researcher.watson.ibm.com/researcher/files/us-ytian/hadoopjoin.pdf for PerSplit SemiJoin's details. Best Regards, Jiacheng Guo On Sat, Nov 16, 2013 at 3:02 AM, Meisam Fathi wrote: > Hi guojc, > > It is not cleat for me what problem you are trying to solve. What do > you want to do with the result of your > groupByKey(myPartitioner).flatMapValues( x=>x)? Do you want to use it > in a join? Do you want to save it to your file system? Or do you want > to do something else with it? > > Thanks, > Meisam > > On Fri, Nov 15, 2013 at 12:56 PM, guojc wrote: > > Hi Meisam, > > Thank you for response. I know each rdd has a partitioner. What I > want > > to achieved here is re-partition a piece of data according to my custom > > partitioner. Currently I do that by > groupByKey(myPartitioner).flatMapValues( > > x=>x). But I'm a bit worried whether this will create additional temp > object > > collection, as result is first made into Seq the an collection of > tupples. > > Any suggestion? > > > > Best Regards, > > Jiahcheng Guo > > > > > > On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi > > wrote: > >> > >> Hi Jiacheng, > >> > >> Each RDD has a partitioner. You can define your own partitioner if the > >> default partitioner does not suit your purpose. > >> You can take a look at this > >> > >> > http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf > . > >> > >> Thanks, > >> Meisam > >> > >> On Fri, Nov 15, 2013 at 6:54 AM, guojc wrote: > >> > Hi, > >> > I'm wondering whether spark rdd can has a partitionedByKey function? > >> > The > >> > use of this function is to have a rdd distributed by according to a > >> > cerntain > >> > paritioner and cache it. And then further join performance by rdd with > >> > same > >> > partitoner will a great speed up. Currently, we only have a > >> > groupByKeyFunction and generate a Seq of desired type , which is not > >> > very > >> > convenient. > >> > > >> > Btw, Sorry for last empty body email. I mistakenly hit the send > >> > shortcut. > >> > > >> > > >> > Best Regards, > >> > Jiacheng Guo > > > > >
How to override yarn default java.io.tmpdir and spark.local.dir
Hi, How can I override the default java.io.tmpdir and spark.local.dir in YARN. I had tried to set SPARK_YARN_USER_ENV with SPARK_JAVA_OPTS. It seems has no effect. The position is still from YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR, and it is a very small disk for me. Any suggestion? Thanks, Jiacheng Guo
Re: Does spark RDD has a partitionedByKey
Hi Meisam, Thank you for response. I know each rdd has a partitioner. What I want to achieved here is re-partition a piece of data according to my custom partitioner. Currently I do that by groupByKey(myPartitioner).flatMapValues( x=>x). But I'm a bit worried whether this will create additional temp object collection, as result is first made into Seq the an collection of tupples. Any suggestion? Best Regards, Jiahcheng Guo On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi wrote: > Hi Jiacheng, > > Each RDD has a partitioner. You can define your own partitioner if the > default partitioner does not suit your purpose. > You can take a look at this > > http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf > . > > Thanks, > Meisam > > On Fri, Nov 15, 2013 at 6:54 AM, guojc wrote: > > Hi, > > I'm wondering whether spark rdd can has a partitionedByKey function? > The > > use of this function is to have a rdd distributed by according to a > cerntain > > paritioner and cache it. And then further join performance by rdd with > same > > partitoner will a great speed up. Currently, we only have a > > groupByKeyFunction and generate a Seq of desired type , which is not very > > convenient. > > > > Btw, Sorry for last empty body email. I mistakenly hit the send shortcut. > > > > > > Best Regards, > > Jiacheng Guo >
Does spark RDD has a partitionedByKey
Hi, I'm wondering whether spark rdd can has a partitionedByKey function? The use of this function is to have a rdd distributed by according to a cerntain paritioner and cache it. And then further join performance by rdd with same partitoner will a great speed up. Currently, we only have a groupByKeyFunction and generate a Seq of desired type , which is not very convenient. Btw, Sorry for last empty body email. I mistakenly hit the send shortcut. Best Regards, Jiacheng Guo
Does Spark has a partitionByKey function
Hi,