Re: ExternalAppendOnlyMap throw no such element

2014-02-18 Thread guojc
Hi there,
I was able to finally identify the bug as StreamBuffer.compareTo
method's ill defined behavior when key's hashCode equals to Int.MaxValue.
Though this only occur in aboue 1/2^32 chance, it can happen a lot when
your key size approach 2^32. I have create a pull request for the bug fix
https://github.com/apache/incubator-spark/pull/612

Best Regards,
Jiacheng Guo


On Mon, Jan 27, 2014 at 2:36 PM, guojc  wrote:

> Hi Patrick,
> I have create the jira
> https://spark-project.atlassian.net/browse/SPARK-1045. It turn out the
> situation is related to join two large rdd, not related to the combine
> process as previous thought.
>
> Best Regards,
> Jiacheng Guo
>
>
> On Mon, Jan 27, 2014 at 11:07 AM, guojc  wrote:
>
>> Hi Patrick,
>>I think this might be data related and about edge condition handling
>> as I only get a single partition repeatedly throw exception on
>> externalappendonlymap's iterator.  I will file a jira as soon as I can
>> isolate the problem. Btw, the test is intentionally abuse the external sort
>> to see its performance impact on real application, because I have trouble
>> to configure a right partition number for each dataset.
>>
>> Best Regards,
>> Jiacheng Guo
>>
>>
>> On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell wrote:
>>
>>> Hey There,
>>>
>>> So one thing you can do is disable the external sorting, this should
>>> preserve the behavior exactly was it was in previous releases.
>>>
>>> It's quite possible that the problem you are having relates to the
>>> fact that you have individual records that are 1GB in size. This is a
>>> pretty extreme case that may violate assumptions in the implementation
>>> of the external aggregation code.
>>>
>>> Would you mind opening a Jira for this? Also, if you are able to find
>>> an isolated way to recreate the behavior it will make it easier to
>>> debug and fix.
>>>
>>> IIRC, even with external aggregation Spark still materializes the
>>> final combined output *for a given key* in memory. If you are
>>> outputting GB of data for a single key, then you might also look into
>>> a different parallelization strategy for your algorithm. Not sure if
>>> this is also an issue though...
>>>
>>> - Patrick
>>>
>>> On Sun, Jan 26, 2014 at 2:27 AM, guojc  wrote:
>>> > Hi Patrick,
>>> > I still get the exception on lastest master
>>> > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the
>>> subject.
>>> > I'm using KryoSerialzation with a custom serialization function, and
>>> the
>>> > exception come from a rdd operation
>>> >
>>> combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer").
>>> > All previous operation seems ok. The only difference is that this
>>> operation
>>> > can generate some a large dict object around 1 gb size.  I hope this
>>> can
>>> > give you some clue what might go wrong.  I'm still having trouble
>>> figure out
>>> > the cause.
>>> >
>>> > Thanks,
>>> > Jiacheng Guo
>>> >
>>> >
>>> > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell 
>>> wrote:
>>> >>
>>> >> This code has been modified since you reported this so you may want to
>>> >> try the current master.
>>> >>
>>> >> - Patrick
>>> >>
>>> >> On Mon, Jan 20, 2014 at 4:22 AM, guojc  wrote:
>>> >> > Hi,
>>> >> >   I'm tring out lastest master branch of spark for the exciting
>>> external
>>> >> > hashmap feature. I have a code that is running correctly at spark
>>> 0.8.1
>>> >> > and
>>> >> > I only make a change for its easily to be spilled to disk. However,
>>> I
>>> >> > encounter a few task failure of
>>> >> > java.util.NoSuchElementException (java.util.NoSuchElementException)
>>> >> >
>>> >> >
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
>>> >> > And the job seems to fail to recover.
>>> >> > Can anyone give some suggestion on how to investigate the issue?
>>> >> > Thanks,Jiacheng Guo
>>> >
>>> >
>>>
>>
>>
>


Re: ExternalAppendOnlyMap throw no such element

2014-01-26 Thread guojc
Hi Patrick,
I have create the jira
https://spark-project.atlassian.net/browse/SPARK-1045. It turn out the
situation is related to join two large rdd, not related to the combine
process as previous thought.

Best Regards,
Jiacheng Guo


On Mon, Jan 27, 2014 at 11:07 AM, guojc  wrote:

> Hi Patrick,
>I think this might be data related and about edge condition handling as
> I only get a single partition repeatedly throw exception on
> externalappendonlymap's iterator.  I will file a jira as soon as I can
> isolate the problem. Btw, the test is intentionally abuse the external sort
> to see its performance impact on real application, because I have trouble
> to configure a right partition number for each dataset.
>
> Best Regards,
> Jiacheng Guo
>
>
> On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell wrote:
>
>> Hey There,
>>
>> So one thing you can do is disable the external sorting, this should
>> preserve the behavior exactly was it was in previous releases.
>>
>> It's quite possible that the problem you are having relates to the
>> fact that you have individual records that are 1GB in size. This is a
>> pretty extreme case that may violate assumptions in the implementation
>> of the external aggregation code.
>>
>> Would you mind opening a Jira for this? Also, if you are able to find
>> an isolated way to recreate the behavior it will make it easier to
>> debug and fix.
>>
>> IIRC, even with external aggregation Spark still materializes the
>> final combined output *for a given key* in memory. If you are
>> outputting GB of data for a single key, then you might also look into
>> a different parallelization strategy for your algorithm. Not sure if
>> this is also an issue though...
>>
>> - Patrick
>>
>> On Sun, Jan 26, 2014 at 2:27 AM, guojc  wrote:
>> > Hi Patrick,
>> > I still get the exception on lastest master
>> > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the
>> subject.
>> > I'm using KryoSerialzation with a custom serialization function, and the
>> > exception come from a rdd operation
>> >
>> combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer").
>> > All previous operation seems ok. The only difference is that this
>> operation
>> > can generate some a large dict object around 1 gb size.  I hope this can
>> > give you some clue what might go wrong.  I'm still having trouble
>> figure out
>> > the cause.
>> >
>> > Thanks,
>> > Jiacheng Guo
>> >
>> >
>> > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell 
>> wrote:
>> >>
>> >> This code has been modified since you reported this so you may want to
>> >> try the current master.
>> >>
>> >> - Patrick
>> >>
>> >> On Mon, Jan 20, 2014 at 4:22 AM, guojc  wrote:
>> >> > Hi,
>> >> >   I'm tring out lastest master branch of spark for the exciting
>> external
>> >> > hashmap feature. I have a code that is running correctly at spark
>> 0.8.1
>> >> > and
>> >> > I only make a change for its easily to be spilled to disk. However, I
>> >> > encounter a few task failure of
>> >> > java.util.NoSuchElementException (java.util.NoSuchElementException)
>> >> >
>> >> >
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
>> >> > And the job seems to fail to recover.
>> >> > Can anyone give some suggestion on how to investigate the issue?
>> >> > Thanks,Jiacheng Guo
>> >
>> >
>>
>
>


Re: ExternalAppendOnlyMap throw no such element

2014-01-26 Thread guojc
Hi Patrick,
   I think this might be data related and about edge condition handling as
I only get a single partition repeatedly throw exception on
externalappendonlymap's iterator.  I will file a jira as soon as I can
isolate the problem. Btw, the test is intentionally abuse the external sort
to see its performance impact on real application, because I have trouble
to configure a right partition number for each dataset.

Best Regards,
Jiacheng Guo


On Mon, Jan 27, 2014 at 6:16 AM, Patrick Wendell  wrote:

> Hey There,
>
> So one thing you can do is disable the external sorting, this should
> preserve the behavior exactly was it was in previous releases.
>
> It's quite possible that the problem you are having relates to the
> fact that you have individual records that are 1GB in size. This is a
> pretty extreme case that may violate assumptions in the implementation
> of the external aggregation code.
>
> Would you mind opening a Jira for this? Also, if you are able to find
> an isolated way to recreate the behavior it will make it easier to
> debug and fix.
>
> IIRC, even with external aggregation Spark still materializes the
> final combined output *for a given key* in memory. If you are
> outputting GB of data for a single key, then you might also look into
> a different parallelization strategy for your algorithm. Not sure if
> this is also an issue though...
>
> - Patrick
>
> On Sun, Jan 26, 2014 at 2:27 AM, guojc  wrote:
> > Hi Patrick,
> > I still get the exception on lastest master
> > 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the subject.
> > I'm using KryoSerialzation with a custom serialization function, and the
> > exception come from a rdd operation
> >
> combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer").
> > All previous operation seems ok. The only difference is that this
> operation
> > can generate some a large dict object around 1 gb size.  I hope this can
> > give you some clue what might go wrong.  I'm still having trouble figure
> out
> > the cause.
> >
> > Thanks,
> > Jiacheng Guo
> >
> >
> > On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell 
> wrote:
> >>
> >> This code has been modified since you reported this so you may want to
> >> try the current master.
> >>
> >> - Patrick
> >>
> >> On Mon, Jan 20, 2014 at 4:22 AM, guojc  wrote:
> >> > Hi,
> >> >   I'm tring out lastest master branch of spark for the exciting
> external
> >> > hashmap feature. I have a code that is running correctly at spark
> 0.8.1
> >> > and
> >> > I only make a change for its easily to be spilled to disk. However, I
> >> > encounter a few task failure of
> >> > java.util.NoSuchElementException (java.util.NoSuchElementException)
> >> >
> >> >
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
> >> > And the job seems to fail to recover.
> >> > Can anyone give some suggestion on how to investigate the issue?
> >> > Thanks,Jiacheng Guo
> >
> >
>


Re: ExternalAppendOnlyMap throw no such element

2014-01-26 Thread guojc
Hi Patrick,
I still get the exception on lastest
master 05be7047744c88e64e7e6bd973f9bcfacd00da5f. A bit more info on the
subject. I'm using KryoSerialzation with a custom serialization function,
and the exception come from a rdd
operation 
combineByKey(createDict,combineKey,mergeDict,partitioner,true,"org.apache.spark.serializer.KryoSerializer").
All previous operation seems ok. The only difference is that this operation
can generate some a large dict object around 1 gb size.  I hope this can
give you some clue what might go wrong.  I'm still having trouble figure
out the cause.

Thanks,
Jiacheng Guo


On Wed, Jan 22, 2014 at 1:36 PM, Patrick Wendell  wrote:

> This code has been modified since you reported this so you may want to
> try the current master.
>
> - Patrick
>
> On Mon, Jan 20, 2014 at 4:22 AM, guojc  wrote:
> > Hi,
> >   I'm tring out lastest master branch of spark for the exciting external
> > hashmap feature. I have a code that is running correctly at spark 0.8.1
> and
> > I only make a change for its easily to be spilled to disk. However, I
> > encounter a few task failure of
> > java.util.NoSuchElementException (java.util.NoSuchElementException)
> >
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
> > And the job seems to fail to recover.
> > Can anyone give some suggestion on how to investigate the issue?
> > Thanks,Jiacheng Guo
>


Re: Does foreach operation increase rdd lineage?

2014-01-24 Thread guojc
Yes, I means Gibbs sampling. From the api document, I don't see why the
data will be collected to driver. The document say that '
def foreach(f: (T) => Unit): Unit
Applies a function f to all elements of this RDD.'

So If I want to change my data in place, what operation I should use?

Best Regards,
Jiacheng Guo


On Fri, Jan 24, 2014 at 9:03 PM, 尹绪森  wrote:

> Do you mean "Gibbs sampling" ? Actually, foreach is an action, it will
> collect all data from workers to driver. You will get OOM complained by JVM.
>
> I am not very sure of your implementation, but if data not need to join
> together, you'd better keep them in workers.
>
>
> 2014/1/24 guojc 
>
>> Hi,
>>I'm writing a paralell mcmc program that having a very large dataset
>> in memory, and need to update the dataset in-memory and avoid creating
>> additional copy. Should I choose a foreach operation on rdd to express the
>> change? or I have to create a new rdd after each sampling process?
>>
>> Thanks,
>> Jiacheng Guo
>>
>
>
>
> --
> Best Regards
> ---
> Xusen Yin尹绪森
> Beijing Key Laboratory of Intelligent Telecommunications Software and
> Multimedia
> Beijing University of Posts & Telecommunications
> Intel Labs China
> Homepage: *http://yinxusen.github.io/ <http://yinxusen.github.io/>*
>


Does foreach operation increase rdd lineage?

2014-01-24 Thread guojc
Hi,
   I'm writing a paralell mcmc program that having a very large dataset in
memory, and need to update the dataset in-memory and avoid creating
additional copy. Should I choose a foreach operation on rdd to express the
change? or I have to create a new rdd after each sampling process?

Thanks,
Jiacheng Guo


ExternalAppendOnlyMap throw no such element

2014-01-20 Thread guojc
Hi,
  I'm tring out lastest master branch of spark for the exciting external
hashmap feature. I have a code that is running correctly at spark 0.8.1 and
I only make a change for its easily to be spilled to disk. However, I
encounter a few task failure of
java.util.NoSuchElementException (java.util.NoSuchElementException)
org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)
org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
And the job seems to fail to recover.
Can anyone give some suggestion on how to investigate the issue?
Thanks,Jiacheng Guo


Re: App master failed to find application jar in the master branch on YARN

2013-11-19 Thread guojc
Hi Tom,
 Thank you for your help. I finally found the problem. It's a silly
mistake for me. After checkout git repository, I forgot to change the
spark-env.sh under conf folder to add yarn config folder. I guess it might
be helpful to display warning message about that. Anyway, thank you for
your kindness for helping me ruling out the problem.

Best Regards,
Jiacheng Guo


On Tue, Nov 19, 2013 at 11:55 PM, Tom Graves  wrote:

> The property is deprecated but will still work. Either one is fine.
>
> Launching the job from the namenode is fine .
>
> I brought up a cluster with 2.0.5-alpha and built the latest spark master
> branch and it runs fine for me. It looks like namenode 2.0.5-alpha won't
> even start with the defaulFs of file:///.  Please make sure your namenode
> is actually up and running and you are pointing to it because you can run
> some jobs successfully without it (on a single node cluster), but when you
> have a multinode cluster  here is the error I get when I run without a
> namenode up and it looks very similar to your error message:
>
> appDiagnostics: Application application_1384876319080_0001 failed
> 1 times due to AM Container for appattempt_1384876319080_0001_01 exited
> with  exitCode: -1000 due to: java.io.FileNotFoundException: File
> file:/home/tgravescs/spark-master/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
> does not exist
>
> When you changed the default fs config did you restart the cluster?
>
> Can you try just running the examples jar:
>
>
> SPARK_JAR=assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
>
> ./spark-class  org.apache.spark.deploy.yarn.Client --jar
> examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
>  --class org.apache.spark.examples.SparkPi  --args yarn-standalone
>  --num-workers 2  --master-memory 2g --worker-memory 2g --worker-cores 1
>
> On the client side you should see messages like this:
> 13/11/19 15:41:30 INFO yarn.Client: Uploading
> file:/home/tgravescs/spark-master/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
> to hdfs://
> namenode.host.com:9000/user/tgravescs/.sparkStaging/application_1384874528558_0003/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
> 13/11/19 15:41:31 INFO yarn.Client: Uploading
> file:/home/tgravescs/spark-master/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
> to hdfs://
> namenode.host.com:9000/user/tgravescs/.sparkStaging/application_1384874528558_0003/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
>
> Tom
>
>
>   On Tuesday, November 19, 2013 5:35 AM, guojc  wrote:
>  Hi Tom,
>Thank you for your response. I  have double checked that I had upload
> both jar in the same folder on hdfs. I think the fs.default.name
> you pointed out is the old deprecated name for fs.defaultFS config
> accordiing
> http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-project-dist/hadoop-common/DeprecatedProperties.html.
>   Anyway, we have tried both
> fs.default.name and  fs.defaultFS set to hdfs namenode, and the situation
> remained same. And we have removed SPARK_HOME env variable on worker node.
>  An additional information might be related is that job submission is done
> on the same machine of hdfs namenode.  But I'm not sure this will cause the
> problem.
>
> Thanks,
> Jiacheng Guo
>
>
> On Tue, Nov 19, 2013 at 11:50 AM, Tom Graves  wrote:
>
> Sorry for the delay. What is the default filesystem on your HDFS setup?
>  It looks like its set to file: rather then hdfs://.  That is the only
> reason I can think its listing the directory as  
> file:/home/work/.sparkStaging/application_1384588058297_0056.
>  Its basically just copying it local rather then uploading to hdfs and its
> just trying to use the local
> file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar.
> It generally would create that in hdfs so it accessible on all the nodes.
>  Is your /home/work nfs mounted on all the nodes?
>
> You can find the default fs by looking at the Hadoop config files.
>  Generally in core-site.xml.  its specified by: 
> fs.default.name
>
> Its pretty odd if those are its erroring with file:// when you specified
> hdfs://.
> when you tried the hdfs:// did you upload both the spark jar and your
> client jar (SparkAUC-assembly-0.1.jar)?  If not try that and make sure to
> put hdfs:// on them when you export SPARK_JAR and specify the --jar option.
>
>
> I'll try to reproduce the error tomorrow to see if a bug was introduced
> when I added the feature to run spa

Re: App master failed to find application jar in the master branch on YARN

2013-11-19 Thread guojc
Hi Tom,
   Thank you for your response. I  have double checked that I had upload
both jar in the same folder on hdfs. I think the fs.default.name
you pointed out is the old deprecated name for fs.defaultFS config
accordiing
http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-project-dist/hadoop-common/DeprecatedProperties.html.
 Anyway, we have tried both
fs.default.name and  fs.defaultFS set to hdfs namenode, and the situation
remained same. And we have removed SPARK_HOME env variable on worker node.
 An additional information might be related is that job submission is done
on the same machine of hdfs namenode.  But I'm not sure this will cause the
problem.

Thanks,
Jiacheng Guo


On Tue, Nov 19, 2013 at 11:50 AM, Tom Graves  wrote:

> Sorry for the delay. What is the default filesystem on your HDFS setup?
>  It looks like its set to file: rather then hdfs://.  That is the only
> reason I can think its listing the directory as  
> file:/home/work/.sparkStaging/application_1384588058297_0056.
>  Its basically just copying it local rather then uploading to hdfs and its
> just trying to use the local
> file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar.
> It generally would create that in hdfs so it accessible on all the nodes.
>  Is your /home/work nfs mounted on all the nodes?
>
> You can find the default fs by looking at the Hadoop config files.
>  Generally in core-site.xml.  its specified by: 
> fs.default.name
>
> Its pretty odd if those are its erroring with file:// when you specified
> hdfs://.
> when you tried the hdfs:// did you upload both the spark jar and your
> client jar (SparkAUC-assembly-0.1.jar)?  If not try that and make sure to
> put hdfs:// on them when you export SPARK_JAR and specify the --jar option.
>
>
> I'll try to reproduce the error tomorrow to see if a bug was introduced
> when I added the feature to run spark from HDFS.
>
> Tom
>
>
>   On Monday, November 18, 2013 11:13 AM, guojc  wrote:
>  Hi Tom,
>I'm on Hadoop 2.05.  I can launch application spark 0.8 release
> normally. However I switch to git master branch version with application
> built with it, I got the jar not found exception and same happens to the
> example application. I have tried both file:// protocol and hdfs://
> protocol with jar in local file system and hdfs respectively, and even
> tried jar list parameter when new spark context.  The exception is slightly
> different for hdfs protocol and local file path. My application launch
> command is
>
>  
> SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
> /home/work/guojiacheng/spark/spark-class
>  org.apache.spark.deploy.yarn.Client --jar
> /home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar
> --class  myClass.SparkAUC --args -c --args yarn-standalone  --args -i
> --args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args
> hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args
> hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60
>  --master-memory 6g --worker-memory 7g --worker-cores 1
>
> And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true
> sbt/sbt assembly
>
> Only thing I can think of might be related is on each cluster node, it has
> a env SPARK_HOME point to a copy of 0.8 version's position, and its bin
> fold is in Path environment variable. And 0.9 version is not there.  It was
> something left over, when cluster was setup.  But I don't know whether it
> is related, as my understand is the yarn version try to distribute spark
> through yarn.
>
> hdfs version error message:
>
>  appDiagnostics: Application application_1384588058297_0056 failed
> 1 times due to AM Container for appattempt_1384588058297_0056_01 exited
> with  exitCode: -1000 due to: RemoteTrace:
> java.io.FileNotFoundException: File
> file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar
> does not exist
>
> local version error message.
> appDiagnostics: Application application_1384588058297_0066 failed 1 times
> due to AM Container for appattempt_1384588058297_0066_01 exited with
>  exitCode: -1000 due to: java.io.FileNotFoundException: File
> file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
> does not exist
>
> Best Regards,
> Jiacheng GUo
>
>
>
> On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves  wrote:
>
> Hey Jiacheng Guo,
>
> do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to
> add the --addJars parame

Re: App master failed to find application jar in the master branch on YARN

2013-11-18 Thread guojc
Hi Tom,
   I'm on Hadoop 2.05.  I can launch application spark 0.8 release
normally. However I switch to git master branch version with application
built with it, I got the jar not found exception and same happens to the
example application. I have tried both file:// protocol and hdfs://
protocol with jar in local file system and hdfs respectively, and even
tried jar list parameter when new spark context.  The exception is slightly
different for hdfs protocol and local file path. My application launch
command is

 
SPARK_JAR=/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
/home/work/guojiacheng/spark/spark-class
 org.apache.spark.deploy.yarn.Client --jar
/home/work/guojiacheng/spark-auc/target/scala-2.9.3/SparkAUC-assembly-0.1.jar
--class  myClass.SparkAUC --args -c --args yarn-standalone  --args -i
--args hdfs://{hdfs_host}:9000/user/work/guojiacheng/data --args -m --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/model_large --args -o --args
hdfs://{hdfs_host}:9000/user/work/guojiacheng/score --num-workers 60
 --master-memory 6g --worker-memory 7g --worker-cores 1

And my build command is SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true
sbt/sbt assembly

Only thing I can think of might be related is on each cluster node, it has
a env SPARK_HOME point to a copy of 0.8 version's position, and its bin
fold is in Path environment variable. And 0.9 version is not there.  It was
something left over, when cluster was setup.  But I don't know whether it
is related, as my understand is the yarn version try to distribute spark
through yarn.

hdfs version error message:

 appDiagnostics: Application application_1384588058297_0056 failed
1 times due to AM Container for appattempt_1384588058297_0056_01 exited
with  exitCode: -1000 due to: RemoteTrace:
java.io.FileNotFoundException: File
file:/home/work/.sparkStaging/application_1384588058297_0056/SparkAUC-assembly-0.1.jar
does not exist

local version error message.
appDiagnostics: Application application_1384588058297_0066 failed 1 times
due to AM Container for appattempt_1384588058297_0066_01 exited with
 exitCode: -1000 due to: java.io.FileNotFoundException: File
file:/home/work/guojiacheng/spark/assembly/target/scala-2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.0.5-alpha.jar
does not exist

Best Regards,
Jiacheng GUo



On Mon, Nov 18, 2013 at 10:34 PM, Tom Graves  wrote:

> Hey Jiacheng Guo,
>
> do you have SPARK_EXAMPLES_JAR env variable set?  If you do, you have to
> add the --addJars parameter to the yarn client and point to the spark
> examples jar.  Or just unset SPARK_EXAMPLES_JAR env variable.
>
> You should only have to set SPARK_JAR env variable.
>
> If that isn't the issue let me know the build command you used and hadoop
> version, and your defaultFs or hadoop.
>
> Tom
>
>
>   On Saturday, November 16, 2013 2:32 AM, guojc  wrote:
>  hi,
>After reading about the exiting progress in consolidating shuffle, I'm
> eager to trying out the last master branch. However up to launch the
> example application, the job failed with prompt the app master failed to
> find the target jar. appDiagnostics: Application
> application_1384588058297_0017 failed 1 times due to AM Container for
> appattempt_1384588058297_0017_01 exited with  exitCode: -1000 due to:
> java.io.FileNotFoundException: File
> file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
> does not exist.
>
>   Is there any change on how to launch a yarn job now?
>
> Best Regards,
> Jiacheng Guo
>
>
>
>


Re: Does spark RDD has a partitionedByKey

2013-11-16 Thread guojc
After looking at the api more carefully, I just found  I overlooked the
partitionBy function on PairRDDFunction.  It's the function I need. Sorry
for the confusion.

Best Regards,
Jiacheng Guo


On Sat, Nov 16, 2013 at 3:59 AM, Christopher Nguyen  wrote:

> Jiacheng, if you're OK with using the Shark layer above Spark (and I think
> for many use cases the answer would be "yes"), then you can take advantage
> of Shark's co-partitioning. Or do something like
> https://github.com/amplab/shark/pull/100/commits
>
> Sent while mobile. Pls excuse typos etc.
> On Nov 16, 2013 2:48 AM, "guojc"  wrote:
>
>> Hi Meisam,
>>  What I want to achieve here is a bit tricky. Basically, I'm try to
>> implement PerSplit SemiJoin in a iterative algorithm with Spark. It's a
>> very efficient join strategy for high in-balanced data set and provide huge
>> gain against normal join in that situation.,
>>
>>  Let's say we have two large rdd, a: RDD[X] and b: RDD[Y,Z] and both
>> of them load directly from hdfs. So both of them will has a partitioner of
>> Nothing. And X is a large complicate struct contain a set of join key Y.
>>  First for each partition of a , I extract join key Y from every ins of X
>> in that parition and construct a hash set of join key Y and paritionID. Now
>> I have a new rdd c :RDD[Y,PartionID ] and join it with b on Y and then
>> construct a rdd d:RDD[PartitionID,Map[Y,Z]] by groupby on PartitionID and
>> constructing map of Y and Z.  As for each partition of a, I want to
>> repartiion it according to its partition id, and it becomes a rdd
>>  e:RDD[PartitionID,X]. As both d and e will same partitioner and same key,
>> they will be joined very efficiently.
>>
>> The key ability I want to have here is the ability to cache rdd c
>> with same partitioner of rdd b and cache e. So later join with b and d will
>> be efficient, because the value of b will be updated from time to time and
>> d's content will change accordingly. And It will be nice to have the
>> ability to repartition a with its original paritionid without actually
>> shuffle across network.
>>
>> You can refer to
>> http://researcher.watson.ibm.com/researcher/files/us-ytian/hadoopjoin.pdf for
>> PerSplit SemiJoin's details.
>>
>> Best Regards,
>> Jiacheng Guo
>>
>>
>> On Sat, Nov 16, 2013 at 3:02 AM, Meisam Fathi wrote:
>>
>>> Hi guojc,
>>>
>>> It is not cleat for me what problem you are trying to solve. What do
>>> you want to do with the result of your
>>> groupByKey(myPartitioner).flatMapValues( x=>x)? Do you want to use it
>>> in a join? Do you want to save it to your file system? Or do you want
>>> to do something else with it?
>>>
>>> Thanks,
>>> Meisam
>>>
>>> On Fri, Nov 15, 2013 at 12:56 PM, guojc  wrote:
>>> > Hi Meisam,
>>> > Thank you for response. I know each rdd has a partitioner. What I
>>> want
>>> > to achieved here is re-partition a piece of data according to my custom
>>> > partitioner. Currently I do that by
>>> groupByKey(myPartitioner).flatMapValues(
>>> > x=>x). But I'm a bit worried whether this will create additional temp
>>> object
>>> > collection, as result is first made into Seq the an collection of
>>> tupples.
>>> > Any suggestion?
>>> >
>>> > Best Regards,
>>> > Jiahcheng Guo
>>> >
>>> >
>>> > On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi >> >
>>> > wrote:
>>> >>
>>> >> Hi Jiacheng,
>>> >>
>>> >> Each RDD has a partitioner. You can define your own partitioner if the
>>> >> default partitioner does not suit your purpose.
>>> >> You can take a look at this
>>> >>
>>> >>
>>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
>>> .
>>> >>
>>> >> Thanks,
>>> >> Meisam
>>> >>
>>> >> On Fri, Nov 15, 2013 at 6:54 AM, guojc  wrote:
>>> >> > Hi,
>>> >> >   I'm wondering whether spark rdd can has a partitionedByKey
>>> function?
>>> >> > The
>>> >> > use of this function is to have a rdd distributed by according to a
>>> >> > cerntain
>>> >> > paritioner and cache it. And then further join performance by rdd
>>> with
>>> >> > same
>>> >> > partitoner will a great speed up. Currently, we only have a
>>> >> > groupByKeyFunction and generate a Seq of desired type , which is not
>>> >> > very
>>> >> > convenient.
>>> >> >
>>> >> > Btw, Sorry for last empty body email. I mistakenly hit the send
>>> >> > shortcut.
>>> >> >
>>> >> >
>>> >> > Best Regards,
>>> >> > Jiacheng Guo
>>> >
>>> >
>>>
>>
>>


App master failed to find application jar in the master branch on YARN

2013-11-16 Thread guojc
hi,
   After reading about the exiting progress in consolidating shuffle, I'm
eager to trying out the last master branch. However up to launch the
example application, the job failed with prompt the app master failed to
find the target jar. appDiagnostics: Application
application_1384588058297_0017 failed 1 times due to AM Container for
appattempt_1384588058297_0017_01 exited with  exitCode: -1000 due to:
java.io.FileNotFoundException: File
file:/${my_work_dir}/spark/examples/target/scala-2.9.3/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar
does not exist.

  Is there any change on how to launch a yarn job now?

Best Regards,
Jiacheng Guo


Re: Does spark RDD has a partitionedByKey

2013-11-15 Thread guojc
Hi Meisam,
 What I want to achieve here is a bit tricky. Basically, I'm try to
implement PerSplit SemiJoin in a iterative algorithm with Spark. It's a
very efficient join strategy for high in-balanced data set and provide huge
gain against normal join in that situation.,

 Let's say we have two large rdd, a: RDD[X] and b: RDD[Y,Z] and both of
them load directly from hdfs. So both of them will has a partitioner of
Nothing. And X is a large complicate struct contain a set of join key Y.
 First for each partition of a , I extract join key Y from every ins of X
in that parition and construct a hash set of join key Y and paritionID. Now
I have a new rdd c :RDD[Y,PartionID ] and join it with b on Y and then
construct a rdd d:RDD[PartitionID,Map[Y,Z]] by groupby on PartitionID and
constructing map of Y and Z.  As for each partition of a, I want to
repartiion it according to its partition id, and it becomes a rdd
 e:RDD[PartitionID,X]. As both d and e will same partitioner and same key,
they will be joined very efficiently.

The key ability I want to have here is the ability to cache rdd c with
same partitioner of rdd b and cache e. So later join with b and d will be
efficient, because the value of b will be updated from time to time and d's
content will change accordingly. And It will be nice to have the ability to
repartition a with its original paritionid without actually shuffle across
network.

You can refer to
http://researcher.watson.ibm.com/researcher/files/us-ytian/hadoopjoin.pdf for
PerSplit SemiJoin's details.

Best Regards,
Jiacheng Guo


On Sat, Nov 16, 2013 at 3:02 AM, Meisam Fathi wrote:

> Hi guojc,
>
> It is not cleat for me what problem you are trying to solve. What do
> you want to do with the result of your
> groupByKey(myPartitioner).flatMapValues( x=>x)? Do you want to use it
> in a join? Do you want to save it to your file system? Or do you want
> to do something else with it?
>
> Thanks,
> Meisam
>
> On Fri, Nov 15, 2013 at 12:56 PM, guojc  wrote:
> > Hi Meisam,
> > Thank you for response. I know each rdd has a partitioner. What I
> want
> > to achieved here is re-partition a piece of data according to my custom
> > partitioner. Currently I do that by
> groupByKey(myPartitioner).flatMapValues(
> > x=>x). But I'm a bit worried whether this will create additional temp
> object
> > collection, as result is first made into Seq the an collection of
> tupples.
> > Any suggestion?
> >
> > Best Regards,
> > Jiahcheng Guo
> >
> >
> > On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi 
> > wrote:
> >>
> >> Hi Jiacheng,
> >>
> >> Each RDD has a partitioner. You can define your own partitioner if the
> >> default partitioner does not suit your purpose.
> >> You can take a look at this
> >>
> >>
> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
> .
> >>
> >> Thanks,
> >> Meisam
> >>
> >> On Fri, Nov 15, 2013 at 6:54 AM, guojc  wrote:
> >> > Hi,
> >> >   I'm wondering whether spark rdd can has a partitionedByKey function?
> >> > The
> >> > use of this function is to have a rdd distributed by according to a
> >> > cerntain
> >> > paritioner and cache it. And then further join performance by rdd with
> >> > same
> >> > partitoner will a great speed up. Currently, we only have a
> >> > groupByKeyFunction and generate a Seq of desired type , which is not
> >> > very
> >> > convenient.
> >> >
> >> > Btw, Sorry for last empty body email. I mistakenly hit the send
> >> > shortcut.
> >> >
> >> >
> >> > Best Regards,
> >> > Jiacheng Guo
> >
> >
>


How to override yarn default java.io.tmpdir and spark.local.dir

2013-11-15 Thread guojc
Hi,
   How can I override the default java.io.tmpdir and spark.local.dir in
YARN. I had tried to set SPARK_YARN_USER_ENV with SPARK_JAVA_OPTS. It seems
has no effect. The position is still from
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR, and it is a very small disk
for me. Any suggestion?

Thanks,
Jiacheng Guo


Re: Does spark RDD has a partitionedByKey

2013-11-15 Thread guojc
Hi Meisam,
Thank you for response. I know each rdd has a partitioner. What I want
to achieved here is re-partition a piece of data according to my custom
partitioner. Currently I do that by
groupByKey(myPartitioner).flatMapValues( x=>x). But I'm a bit worried
whether this will create additional temp object collection, as result is
first made into Seq the an collection of tupples. Any suggestion?

Best Regards,
Jiahcheng Guo


On Sat, Nov 16, 2013 at 12:24 AM, Meisam Fathi wrote:

> Hi Jiacheng,
>
> Each RDD has a partitioner. You can define your own partitioner if the
> default partitioner does not suit your purpose.
> You can take a look at this
>
> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
> .
>
> Thanks,
> Meisam
>
> On Fri, Nov 15, 2013 at 6:54 AM, guojc  wrote:
> > Hi,
> >   I'm wondering whether spark rdd can has a partitionedByKey function?
> The
> > use of this function is to have a rdd distributed by according to a
> cerntain
> > paritioner and cache it. And then further join performance by rdd with
> same
> > partitoner will a great speed up. Currently, we only have a
> > groupByKeyFunction and generate a Seq of desired type , which is not very
> > convenient.
> >
> > Btw, Sorry for last empty body email. I mistakenly hit the send shortcut.
> >
> >
> > Best Regards,
> > Jiacheng Guo
>


Does spark RDD has a partitionedByKey

2013-11-15 Thread guojc
Hi,
  I'm wondering whether spark rdd can has a partitionedByKey function? The
use of this function is to have a rdd distributed by according to a
cerntain paritioner and cache it. And then further join performance by rdd
with same partitoner will a great speed up. Currently, we only have a
groupByKeyFunction and generate a Seq of desired type , which is not very
convenient.

Btw, Sorry for last empty body email. I mistakenly hit the send shortcut.


Best Regards,
Jiacheng Guo


Does Spark has a partitionByKey function

2013-11-15 Thread guojc
Hi,