Re: java.lang.StackOverflowError when calling count()
We were getting the same problem also. Funny thing our code worked with larger data set and failed for a reduced data set. Anyway we are thinking on passing stacksize override params to jvm may be that can help you. Please give it a try and let me know. --conf spark.executor.extraJavaOptions=-Xss8m --conf spark.driver.extraJavaOptions=-Xss8m taking 8m is kind a overkill for stack size, so you can start with 4m. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p27156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkML RandomForest java.lang.StackOverflowError
Can you try reducing maxBins? That reduces communication (at the cost of coarser discretization of continuous features). On Fri, Apr 1, 2016 at 11:32 AM, Joseph Bradley wrote: > In my experience, 20K is a lot but often doable; 2K is easy; 200 is > small. Communication scales linearly in the number of features. > > On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov < > evgeny.a.moro...@gmail.com> wrote: > >> Joseph, >> >> Correction, there 20k features. Is it still a lot? >> What number of features can be considered as normal? >> >> -- >> Be well! >> Jean Morozov >> >> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley >> wrote: >> >>> First thought: 70K features is *a lot* for the MLlib implementation (and >>> any PLANET-like implementation) >>> >>> Using fewer partitions is a good idea. >>> >>> Which Spark version was this on? >>> >>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov < >>> evgeny.a.moro...@gmail.com> wrote: >>> >>>> The questions I have in mind: >>>> >>>> Is it smth that the one might expect? From the stack trace itself it's >>>> not clear where does it come from. >>>> Is it an already known bug? Although I haven't found anything like that. >>>> Is it possible to configure something to workaround / avoid this? >>>> >>>> I'm not sure it's the right thing to do, but I've >>>> increased thread stack size 10 times (to 80MB) >>>> reduced default parallelism 10 times (only 20 cores are available) >>>> >>>> Thank you in advance. >>>> >>>> -- >>>> Be well! >>>> Jean Morozov >>>> >>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < >>>> evgeny.a.moro...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a web service that provides rest api to train random forest >>>>> algo. >>>>> I train random forest on a 5 nodes spark cluster with enough memory - >>>>> everything is cached (~22 GB). >>>>> On a small datasets up to 100k samples everything is fine, but with >>>>> the biggest one (400k samples and ~70k features) I'm stuck with >>>>> StackOverflowError. >>>>> >>>>> Additional options for my web service >>>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >>>>> spark.default.parallelism = 200. >>>>> >>>>> On a 400k samples dataset >>>>> - (with default thread stack size) it took 4 hours of training to get >>>>> the error. >>>>> - with increased stack size it took 60 hours to hit it. >>>>> I can increase it, but it's hard to say what amount of memory it needs >>>>> and it's applied to all of the treads and might waste a lot of memory. >>>>> >>>>> I'm looking at different stages at event timeline now and see that >>>>> task deserialization time gradually increases. And at the end task >>>>> deserialization time is roughly same as executor computing time. >>>>> >>>>> Code I use to train model: >>>>> >>>>> int MAX_BINS = 16; >>>>> int NUM_CLASSES = 0; >>>>> double MIN_INFO_GAIN = 0.0; >>>>> int MAX_MEMORY_IN_MB = 256; >>>>> double SUBSAMPLING_RATE = 1.0; >>>>> boolean USE_NODEID_CACHE = true; >>>>> int CHECKPOINT_INTERVAL = 10; >>>>> int RANDOM_SEED = 12345; >>>>> >>>>> int NODE_SIZE = 5; >>>>> int maxDepth = 30; >>>>> int numTrees = 50; >>>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >>>>> maxDepth, NUM_CLASSES, MAX_BINS, >>>>> QuantileStrategy.Sort(), new >>>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, >>>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >>>>> CHECKPOINT_INTERVAL); >>>>> RandomForestModel model = >>>>> RandomForest.trainRegressor(labeledPoints.rdd(), strategy, numTrees, >>>>> "auto", RANDOM_SEED); >>>>> >>>>> >>>>> Any advice would be highly appreciated. >>>>> >>>>> The exception
Re: SparkML RandomForest java.lang.StackOverflowError
In my experience, 20K is a lot but often doable; 2K is easy; 200 is small. Communication scales linearly in the number of features. On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov wrote: > Joseph, > > Correction, there 20k features. Is it still a lot? > What number of features can be considered as normal? > > -- > Be well! > Jean Morozov > > On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley > wrote: > >> First thought: 70K features is *a lot* for the MLlib implementation (and >> any PLANET-like implementation) >> >> Using fewer partitions is a good idea. >> >> Which Spark version was this on? >> >> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov < >> evgeny.a.moro...@gmail.com> wrote: >> >>> The questions I have in mind: >>> >>> Is it smth that the one might expect? From the stack trace itself it's >>> not clear where does it come from. >>> Is it an already known bug? Although I haven't found anything like that. >>> Is it possible to configure something to workaround / avoid this? >>> >>> I'm not sure it's the right thing to do, but I've >>> increased thread stack size 10 times (to 80MB) >>> reduced default parallelism 10 times (only 20 cores are available) >>> >>> Thank you in advance. >>> >>> -- >>> Be well! >>> Jean Morozov >>> >>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < >>> evgeny.a.moro...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a web service that provides rest api to train random forest >>>> algo. >>>> I train random forest on a 5 nodes spark cluster with enough memory - >>>> everything is cached (~22 GB). >>>> On a small datasets up to 100k samples everything is fine, but with the >>>> biggest one (400k samples and ~70k features) I'm stuck with >>>> StackOverflowError. >>>> >>>> Additional options for my web service >>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >>>> spark.default.parallelism = 200. >>>> >>>> On a 400k samples dataset >>>> - (with default thread stack size) it took 4 hours of training to get >>>> the error. >>>> - with increased stack size it took 60 hours to hit it. >>>> I can increase it, but it's hard to say what amount of memory it needs >>>> and it's applied to all of the treads and might waste a lot of memory. >>>> >>>> I'm looking at different stages at event timeline now and see that task >>>> deserialization time gradually increases. And at the end task >>>> deserialization time is roughly same as executor computing time. >>>> >>>> Code I use to train model: >>>> >>>> int MAX_BINS = 16; >>>> int NUM_CLASSES = 0; >>>> double MIN_INFO_GAIN = 0.0; >>>> int MAX_MEMORY_IN_MB = 256; >>>> double SUBSAMPLING_RATE = 1.0; >>>> boolean USE_NODEID_CACHE = true; >>>> int CHECKPOINT_INTERVAL = 10; >>>> int RANDOM_SEED = 12345; >>>> >>>> int NODE_SIZE = 5; >>>> int maxDepth = 30; >>>> int numTrees = 50; >>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >>>> maxDepth, NUM_CLASSES, MAX_BINS, >>>> QuantileStrategy.Sort(), new >>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, >>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >>>> CHECKPOINT_INTERVAL); >>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), >>>> strategy, numTrees, "auto", RANDOM_SEED); >>>> >>>> >>>> Any advice would be highly appreciated. >>>> >>>> The exception (~3000 lines long): >>>> java.lang.StackOverflowError >>>> at >>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) >>>> at >>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) >>>> at >>>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) >>>> at >>>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) >>>> at >>>> java.io.ObjectInputStream.readClass
Re: SparkML RandomForest java.lang.StackOverflowError
Joseph, Correction, there 20k features. Is it still a lot? What number of features can be considered as normal? -- Be well! Jean Morozov On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley wrote: > First thought: 70K features is *a lot* for the MLlib implementation (and > any PLANET-like implementation) > > Using fewer partitions is a good idea. > > Which Spark version was this on? > > On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov < > evgeny.a.moro...@gmail.com> wrote: > >> The questions I have in mind: >> >> Is it smth that the one might expect? From the stack trace itself it's >> not clear where does it come from. >> Is it an already known bug? Although I haven't found anything like that. >> Is it possible to configure something to workaround / avoid this? >> >> I'm not sure it's the right thing to do, but I've >> increased thread stack size 10 times (to 80MB) >> reduced default parallelism 10 times (only 20 cores are available) >> >> Thank you in advance. >> >> -- >> Be well! >> Jean Morozov >> >> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < >> evgeny.a.moro...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a web service that provides rest api to train random forest algo. >>> I train random forest on a 5 nodes spark cluster with enough memory - >>> everything is cached (~22 GB). >>> On a small datasets up to 100k samples everything is fine, but with the >>> biggest one (400k samples and ~70k features) I'm stuck with >>> StackOverflowError. >>> >>> Additional options for my web service >>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >>> spark.default.parallelism = 200. >>> >>> On a 400k samples dataset >>> - (with default thread stack size) it took 4 hours of training to get >>> the error. >>> - with increased stack size it took 60 hours to hit it. >>> I can increase it, but it's hard to say what amount of memory it needs >>> and it's applied to all of the treads and might waste a lot of memory. >>> >>> I'm looking at different stages at event timeline now and see that task >>> deserialization time gradually increases. And at the end task >>> deserialization time is roughly same as executor computing time. >>> >>> Code I use to train model: >>> >>> int MAX_BINS = 16; >>> int NUM_CLASSES = 0; >>> double MIN_INFO_GAIN = 0.0; >>> int MAX_MEMORY_IN_MB = 256; >>> double SUBSAMPLING_RATE = 1.0; >>> boolean USE_NODEID_CACHE = true; >>> int CHECKPOINT_INTERVAL = 10; >>> int RANDOM_SEED = 12345; >>> >>> int NODE_SIZE = 5; >>> int maxDepth = 30; >>> int numTrees = 50; >>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >>> maxDepth, NUM_CLASSES, MAX_BINS, >>> QuantileStrategy.Sort(), new >>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, >>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >>> CHECKPOINT_INTERVAL); >>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), >>> strategy, numTrees, "auto", RANDOM_SEED); >>> >>> >>> Any advice would be highly appreciated. >>> >>> The exception (~3000 lines long): >>> java.lang.StackOverflowError >>> at >>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) >>> at >>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) >>> at >>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) >>> at >>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) >>> at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >>> at >>> java.io.ObjectInputStream.readO
Re: SparkML RandomForest java.lang.StackOverflowError
One more thing. With increased stack size it completed twice more already, but now I see in the log. [dispatcher-event-loop-1] WARN o.a.spark.scheduler.TaskSetManager - Stage 24860 contains a task of very large size (157 KB). The maximum recommended task size is 100 KB. Size of the task increases over time. When the warning appeared first time it was around 100KB. Also time to complete collectAsMap at DecisionTree.scala:651 also increased from 8 seconds at the beginning of the training up to 20-24 seconds now. -- Be well! Jean Morozov On Wed, Mar 30, 2016 at 12:14 AM, Eugene Morozov wrote: > Joseph, > > I'm using 1.6.0. > > -- > Be well! > Jean Morozov > > On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley > wrote: > >> First thought: 70K features is *a lot* for the MLlib implementation (and >> any PLANET-like implementation) >> >> Using fewer partitions is a good idea. >> >> Which Spark version was this on? >> >> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov < >> evgeny.a.moro...@gmail.com> wrote: >> >>> The questions I have in mind: >>> >>> Is it smth that the one might expect? From the stack trace itself it's >>> not clear where does it come from. >>> Is it an already known bug? Although I haven't found anything like that. >>> Is it possible to configure something to workaround / avoid this? >>> >>> I'm not sure it's the right thing to do, but I've >>> increased thread stack size 10 times (to 80MB) >>> reduced default parallelism 10 times (only 20 cores are available) >>> >>> Thank you in advance. >>> >>> -- >>> Be well! >>> Jean Morozov >>> >>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < >>> evgeny.a.moro...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a web service that provides rest api to train random forest >>>> algo. >>>> I train random forest on a 5 nodes spark cluster with enough memory - >>>> everything is cached (~22 GB). >>>> On a small datasets up to 100k samples everything is fine, but with the >>>> biggest one (400k samples and ~70k features) I'm stuck with >>>> StackOverflowError. >>>> >>>> Additional options for my web service >>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >>>> spark.default.parallelism = 200. >>>> >>>> On a 400k samples dataset >>>> - (with default thread stack size) it took 4 hours of training to get >>>> the error. >>>> - with increased stack size it took 60 hours to hit it. >>>> I can increase it, but it's hard to say what amount of memory it needs >>>> and it's applied to all of the treads and might waste a lot of memory. >>>> >>>> I'm looking at different stages at event timeline now and see that task >>>> deserialization time gradually increases. And at the end task >>>> deserialization time is roughly same as executor computing time. >>>> >>>> Code I use to train model: >>>> >>>> int MAX_BINS = 16; >>>> int NUM_CLASSES = 0; >>>> double MIN_INFO_GAIN = 0.0; >>>> int MAX_MEMORY_IN_MB = 256; >>>> double SUBSAMPLING_RATE = 1.0; >>>> boolean USE_NODEID_CACHE = true; >>>> int CHECKPOINT_INTERVAL = 10; >>>> int RANDOM_SEED = 12345; >>>> >>>> int NODE_SIZE = 5; >>>> int maxDepth = 30; >>>> int numTrees = 50; >>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >>>> maxDepth, NUM_CLASSES, MAX_BINS, >>>> QuantileStrategy.Sort(), new >>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, >>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >>>> CHECKPOINT_INTERVAL); >>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), >>>> strategy, numTrees, "auto", RANDOM_SEED); >>>> >>>> >>>> Any advice would be highly appreciated. >>>> >>>> The exception (~3000 lines long): >>>> java.lang.StackOverflowError >>>> at >>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) >>>> at >>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) >>>&g
Re: SparkML RandomForest java.lang.StackOverflowError
Joseph, I'm using 1.6.0. -- Be well! Jean Morozov On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley wrote: > First thought: 70K features is *a lot* for the MLlib implementation (and > any PLANET-like implementation) > > Using fewer partitions is a good idea. > > Which Spark version was this on? > > On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov < > evgeny.a.moro...@gmail.com> wrote: > >> The questions I have in mind: >> >> Is it smth that the one might expect? From the stack trace itself it's >> not clear where does it come from. >> Is it an already known bug? Although I haven't found anything like that. >> Is it possible to configure something to workaround / avoid this? >> >> I'm not sure it's the right thing to do, but I've >> increased thread stack size 10 times (to 80MB) >> reduced default parallelism 10 times (only 20 cores are available) >> >> Thank you in advance. >> >> -- >> Be well! >> Jean Morozov >> >> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < >> evgeny.a.moro...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a web service that provides rest api to train random forest algo. >>> I train random forest on a 5 nodes spark cluster with enough memory - >>> everything is cached (~22 GB). >>> On a small datasets up to 100k samples everything is fine, but with the >>> biggest one (400k samples and ~70k features) I'm stuck with >>> StackOverflowError. >>> >>> Additional options for my web service >>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >>> spark.default.parallelism = 200. >>> >>> On a 400k samples dataset >>> - (with default thread stack size) it took 4 hours of training to get >>> the error. >>> - with increased stack size it took 60 hours to hit it. >>> I can increase it, but it's hard to say what amount of memory it needs >>> and it's applied to all of the treads and might waste a lot of memory. >>> >>> I'm looking at different stages at event timeline now and see that task >>> deserialization time gradually increases. And at the end task >>> deserialization time is roughly same as executor computing time. >>> >>> Code I use to train model: >>> >>> int MAX_BINS = 16; >>> int NUM_CLASSES = 0; >>> double MIN_INFO_GAIN = 0.0; >>> int MAX_MEMORY_IN_MB = 256; >>> double SUBSAMPLING_RATE = 1.0; >>> boolean USE_NODEID_CACHE = true; >>> int CHECKPOINT_INTERVAL = 10; >>> int RANDOM_SEED = 12345; >>> >>> int NODE_SIZE = 5; >>> int maxDepth = 30; >>> int numTrees = 50; >>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >>> maxDepth, NUM_CLASSES, MAX_BINS, >>> QuantileStrategy.Sort(), new >>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, >>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >>> CHECKPOINT_INTERVAL); >>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), >>> strategy, numTrees, "auto", RANDOM_SEED); >>> >>> >>> Any advice would be highly appreciated. >>> >>> The exception (~3000 lines long): >>> java.lang.StackOverflowError >>> at >>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) >>> at >>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) >>> at >>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) >>> at >>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) >>> at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at >>> java.io.Obj
Re: SparkML RandomForest java.lang.StackOverflowError
First thought: 70K features is *a lot* for the MLlib implementation (and any PLANET-like implementation) Using fewer partitions is a good idea. Which Spark version was this on? On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov wrote: > The questions I have in mind: > > Is it smth that the one might expect? From the stack trace itself it's not > clear where does it come from. > Is it an already known bug? Although I haven't found anything like that. > Is it possible to configure something to workaround / avoid this? > > I'm not sure it's the right thing to do, but I've > increased thread stack size 10 times (to 80MB) > reduced default parallelism 10 times (only 20 cores are available) > > Thank you in advance. > > -- > Be well! > Jean Morozov > > On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov < > evgeny.a.moro...@gmail.com> wrote: > >> Hi, >> >> I have a web service that provides rest api to train random forest algo. >> I train random forest on a 5 nodes spark cluster with enough memory - >> everything is cached (~22 GB). >> On a small datasets up to 100k samples everything is fine, but with the >> biggest one (400k samples and ~70k features) I'm stuck with >> StackOverflowError. >> >> Additional options for my web service >> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" >> spark.default.parallelism = 200. >> >> On a 400k samples dataset >> - (with default thread stack size) it took 4 hours of training to get the >> error. >> - with increased stack size it took 60 hours to hit it. >> I can increase it, but it's hard to say what amount of memory it needs >> and it's applied to all of the treads and might waste a lot of memory. >> >> I'm looking at different stages at event timeline now and see that task >> deserialization time gradually increases. And at the end task >> deserialization time is roughly same as executor computing time. >> >> Code I use to train model: >> >> int MAX_BINS = 16; >> int NUM_CLASSES = 0; >> double MIN_INFO_GAIN = 0.0; >> int MAX_MEMORY_IN_MB = 256; >> double SUBSAMPLING_RATE = 1.0; >> boolean USE_NODEID_CACHE = true; >> int CHECKPOINT_INTERVAL = 10; >> int RANDOM_SEED = 12345; >> >> int NODE_SIZE = 5; >> int maxDepth = 30; >> int numTrees = 50; >> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), >> maxDepth, NUM_CLASSES, MAX_BINS, >> QuantileStrategy.Sort(), new scala.collection.immutable.HashMap<>(), >> nodeSize, MIN_INFO_GAIN, >> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, >> CHECKPOINT_INTERVAL); >> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), >> strategy, numTrees, "auto", RANDOM_SEED); >> >> >> Any advice would be highly appreciated. >> >> The exception (~3000 lines long): >> java.lang.StackOverflowError >> at >> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) >> at >> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) >> at >> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) >> at >> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> scala.collection.immutable.$colon$colon.readObject(List.scala:366) >> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >> at >> java.io.ObjectInputStream.readSe
Re: SparkML RandomForest java.lang.StackOverflowError
The questions I have in mind: Is it smth that the one might expect? From the stack trace itself it's not clear where does it come from. Is it an already known bug? Although I haven't found anything like that. Is it possible to configure something to workaround / avoid this? I'm not sure it's the right thing to do, but I've increased thread stack size 10 times (to 80MB) reduced default parallelism 10 times (only 20 cores are available) Thank you in advance. -- Be well! Jean Morozov On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov wrote: > Hi, > > I have a web service that provides rest api to train random forest algo. > I train random forest on a 5 nodes spark cluster with enough memory - > everything is cached (~22 GB). > On a small datasets up to 100k samples everything is fine, but with the > biggest one (400k samples and ~70k features) I'm stuck with > StackOverflowError. > > Additional options for my web service > spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" > spark.default.parallelism = 200. > > On a 400k samples dataset > - (with default thread stack size) it took 4 hours of training to get the > error. > - with increased stack size it took 60 hours to hit it. > I can increase it, but it's hard to say what amount of memory it needs and > it's applied to all of the treads and might waste a lot of memory. > > I'm looking at different stages at event timeline now and see that task > deserialization time gradually increases. And at the end task > deserialization time is roughly same as executor computing time. > > Code I use to train model: > > int MAX_BINS = 16; > int NUM_CLASSES = 0; > double MIN_INFO_GAIN = 0.0; > int MAX_MEMORY_IN_MB = 256; > double SUBSAMPLING_RATE = 1.0; > boolean USE_NODEID_CACHE = true; > int CHECKPOINT_INTERVAL = 10; > int RANDOM_SEED = 12345; > > int NODE_SIZE = 5; > int maxDepth = 30; > int numTrees = 50; > Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), > maxDepth, NUM_CLASSES, MAX_BINS, > QuantileStrategy.Sort(), new scala.collection.immutable.HashMap<>(), > nodeSize, MIN_INFO_GAIN, > MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, > CHECKPOINT_INTERVAL); > RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), > strategy, numTrees, "auto", RANDOM_SEED); > > > Any advice would be highly appreciated. > > The exception (~3000 lines long): > java.lang.StackOverflowError > at > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) > at > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) > at > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) > at > java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > scala.collection.immutable.$colon$colon.readObject(List.scala:366) > at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > at > java.io.ObjectInputSt
SparkML RandomForest java.lang.StackOverflowError
Hi, I have a web service that provides rest api to train random forest algo. I train random forest on a 5 nodes spark cluster with enough memory - everything is cached (~22 GB). On a small datasets up to 100k samples everything is fine, but with the biggest one (400k samples and ~70k features) I'm stuck with StackOverflowError. Additional options for my web service spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192" spark.default.parallelism = 200. On a 400k samples dataset - (with default thread stack size) it took 4 hours of training to get the error. - with increased stack size it took 60 hours to hit it. I can increase it, but it's hard to say what amount of memory it needs and it's applied to all of the treads and might waste a lot of memory. I'm looking at different stages at event timeline now and see that task deserialization time gradually increases. And at the end task deserialization time is roughly same as executor computing time. Code I use to train model: int MAX_BINS = 16; int NUM_CLASSES = 0; double MIN_INFO_GAIN = 0.0; int MAX_MEMORY_IN_MB = 256; double SUBSAMPLING_RATE = 1.0; boolean USE_NODEID_CACHE = true; int CHECKPOINT_INTERVAL = 10; int RANDOM_SEED = 12345; int NODE_SIZE = 5; int maxDepth = 30; int numTrees = 50; Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), maxDepth, NUM_CLASSES, MAX_BINS, QuantileStrategy.Sort(), new scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN, MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, CHECKPOINT_INTERVAL); RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), strategy, numTrees, "auto", RANDOM_SEED); Any advice would be highly appreciated. The exception (~3000 lines long): java.lang.StackOverflowError at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333) at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828) at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) -- Be well! Jean Morozov
Re: Spark SQL - java.lang.StackOverflowError after caching table
I ran: sqlContext.cacheTable("product") var df = sqlContext.sql("...complex query...") df.explain(true) ...and obtained: http://pastebin.com/k9skERsr ...where "[...]" corresponds therein to huge lists of records from the addressed table (product) The query is of the following form: "SELECT distinct p.id, p.`aaa`, p.`bbb` FROM product p, (SELECT distinct p1.id FROM product p1 WHERE p1.`ccc`='fff') p2, (SELECT distinct p3.id FROM product p3 WHERE p3.`ccc`='ddd') p4 WHERE p.`eee` = '1' AND p.id=p2.id AND p.`eee` > 137 AND p4.id=p.id UNION SELECT distinct p.id,p.`bbb`, p.`bbb` FROM product p, (SELECT distinct p1.id FROM product p1 WHERE p1.`ccc`='fff') p2, (SELECT distinct p5.id FROM product p5 WHERE p5.`ccc`='ggg') p6 WHERE p.`eee` = '1' AND p.id=p2.id AND p.`hhh` > 93 AND p6.id=p.id ORDER BY p.`bbb` LIMIT 10" On 24.03.2016 22:16, Ted Yu wrote: Can you obtain output from explain(true) on the query after cacheTable() call ? Potentially related JIRA: [SPARK-13657] [SQL] Support parsing very long AND/OR expressions On Thu, Mar 24, 2016 at 12:55 PM, Mohamed Nadjib MAMI mailto:m...@iai.uni-bonn.de>> wrote: Here is the stack trace: http://pastebin.com/ueHqiznH Here's the code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val table = sqlContext.read.parquet("hdfs://...parquet_table") table.registerTempTable("table") sqlContext.sql("...complex query...").show() /** works */ sqlContext.cacheTable("table") sqlContext.sql("...complex query...").show() /** works */ sqlContext.sql("...complex query...").show() /** fails */ On 24.03.2016 13:40, Ted Yu wrote: Can you pastebin the stack trace ? If you can show snippet of your code, that would help give us more clue. Thanks On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI <mailto:m...@iai.uni-bonn.de> wrote: Hi all, I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a problem with table caching (sqlContext.cacheTable()), using spark-shell of Spark 1.5.1. After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes longer the first time (well, for the lazy execution reason) but it finishes and returns results. However, the weird thing is that after I run the same query again, I get the error: "java.lang.StackOverflowError". I Googled it but didn't find the error appearing with table caching and querying. Any hint is appreciated. -- Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问 候, تحياتي. Mohamed Nadjib Mami PhD Student - EIS Department - Bonn University, Germany. Website <http://www.mohamednadjibmami.com>. LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>. -- Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, تحياتي. Mohamed Nadjib Mami PhD Student - EIS Department - Bonn University, Germany. Website <http://www.mohamednadjibmami.com>. LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.
Re: Spark SQL - java.lang.StackOverflowError after caching table
Can you obtain output from explain(true) on the query after cacheTable() call ? Potentially related JIRA: [SPARK-13657] [SQL] Support parsing very long AND/OR expressions On Thu, Mar 24, 2016 at 12:55 PM, Mohamed Nadjib MAMI wrote: > Here is the stack trace: http://pastebin.com/ueHqiznH > > Here's the code: > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > > val table = sqlContext.read.parquet("hdfs://...parquet_table") > table.registerTempTable("table") > > sqlContext.sql("...complex query...").show() /** works */ > > sqlContext.cacheTable("table") > > sqlContext.sql("...complex query...").show() /** works */ > > sqlContext.sql("...complex query...").show() /** fails */ > > > > On 24.03.2016 13:40, Ted Yu wrote: > > Can you pastebin the stack trace ? > > If you can show snippet of your code, that would help give us more clue. > > Thanks > > > On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI > wrote: > > Hi all, > I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a > problem with table caching (sqlContext.cacheTable()), using spark-shell of > Spark 1.5.1. > > After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes > longer the first time (well, for the lazy execution reason) but it finishes > and returns results. However, the weird thing is that after I run the same > query again, I get the error: "java.lang.StackOverflowError". > > I Googled it but didn't find the error appearing with table caching and > querying. > Any hint is appreciated. > > > > -- > Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, تحياتي. > Mohamed Nadjib Mami > PhD Student - EIS Department - Bonn University, Germany. > Website <http://www.mohamednadjibmami.com>. > LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>. > >
Re: Spark SQL - java.lang.StackOverflowError after caching table
Here is the stack trace: http://pastebin.com/ueHqiznH Here's the code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val table = sqlContext.read.parquet("hdfs://...parquet_table") table.registerTempTable("table") sqlContext.sql("...complex query...").show() /** works */ sqlContext.cacheTable("table") sqlContext.sql("...complex query...").show() /** works */ sqlContext.sql("...complex query...").show() /** fails */ On 24.03.2016 13:40, Ted Yu wrote: Can you pastebin the stack trace ? If you can show snippet of your code, that would help give us more clue. Thanks On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI wrote: Hi all, I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a problem with table caching (sqlContext.cacheTable()), using spark-shell of Spark 1.5.1. After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes longer the first time (well, for the lazy execution reason) but it finishes and returns results. However, the weird thing is that after I run the same query again, I get the error: "java.lang.StackOverflowError". I Googled it but didn't find the error appearing with table caching and querying. Any hint is appreciated. -- Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, تحياتي. Mohamed Nadjib Mami PhD Student - EIS Department - Bonn University, Germany. Website <http://www.mohamednadjibmami.com>. LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.
Re: Spark SQL - java.lang.StackOverflowError after caching table
Can you pastebin the stack trace ? If you can show snippet of your code, that would help give us more clue. Thanks > On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI wrote: > > Hi all, > I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a > problem with table caching (sqlContext.cacheTable()), using spark-shell of > Spark 1.5.1. > > After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes > longer the first time (well, for the lazy execution reason) but it finishes > and returns results. However, the weird thing is that after I run the same > query again, I get the error: "java.lang.StackOverflowError". > > I Googled it but didn't find the error appearing with table caching and > querying. > Any hint is appreciated. > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL - java.lang.StackOverflowError after caching table
Hi all, I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a problem with table caching (sqlContext.cacheTable()), using spark-shell of Spark 1.5.1. After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes longer the first time (well, for the lazy execution reason) but it finishes and returns results. However, the weird thing is that after I run the same query again, I get the error: "java.lang.StackOverflowError". I Googled it but didn't find the error appearing with table caching and querying. Any hint is appreciated.
Re: Spark Streaming: java.lang.StackOverflowError
What code is triggering the stack overflow? On Mon, Feb 29, 2016 at 11:13 PM, Vinti Maheshwari wrote: > Hi All, > > I am getting below error in spark-streaming application, i am using kafka > for input stream. When i was doing with socket, it was working fine. But > when i changed to kafka it's giving error. Anyone has idea why it's > throwing error, do i need to change my batch time and check pointing time? > > > > *ERROR StreamingContext: Error starting the context, marking it as > stoppedjava.lang.StackOverflowError* > > My program: > > def main(args: Array[String]): Unit = { > > // Function to create and setup a new StreamingContext > def functionToCreateContext(): StreamingContext = { > val conf = new SparkConf().setAppName("HBaseStream") > val sc = new SparkContext(conf) > // create a StreamingContext, the main entry point for all streaming > functionality > val ssc = new StreamingContext(sc, Seconds(5)) > val brokers = args(0) > val topics= args(1) > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) > > val inputStream = messages.map(_._2) > //val inputStream = ssc.socketTextStream(args(0), args(1).toInt) > ssc.checkpoint(checkpointDirectory) > inputStream.print(1) > val parsedStream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map((_.trim.toLong))) > }) > import breeze.linalg.{DenseVector => BDV} > import scala.util.Try > > val state: DStream[(String, Array[Long])] = > parsedStream.updateStateByKey( > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > prev.map(_ +: current).orElse(Some(current)) > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) > }) > > state.checkpoint(Duration(1)) > state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) > ssc > } > // Get StreamingContext from checkpoint data or create a new one > val context = StreamingContext.getOrCreate(checkpointDirectory, > functionToCreateContext _) > } > } > > Regards, > ~Vinti >
Spark Streaming: java.lang.StackOverflowError
Hi All, I am getting below error in spark-streaming application, i am using kafka for input stream. When i was doing with socket, it was working fine. But when i changed to kafka it's giving error. Anyone has idea why it's throwing error, do i need to change my batch time and check pointing time? *ERROR StreamingContext: Error starting the context, marking it as stoppedjava.lang.StackOverflowError* My program: def main(args: Array[String]): Unit = { // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) // create a StreamingContext, the main entry point for all streaming functionality val ssc = new StreamingContext(sc, Seconds(5)) val brokers = args(0) val topics= args(1) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) val inputStream = messages.map(_._2) //val inputStream = ssc.socketTextStream(args(0), args(1).toInt) ssc.checkpoint(checkpointDirectory) inputStream.print(1) val parsedStream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) }) import breeze.linalg.{DenseVector => BDV} import scala.util.Try val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) state.checkpoint(Duration(1)) state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) } } Regards, ~Vinti
Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError
Hi, I am trying one transformation by calling scala method this scala method returns MutableList[AvroObject] def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): scala.collection.mutable.MutableList[AvroObject] Hence, the output of transaformation is RDD[MutableList[AvroObject]] But I want o/p as RDD[AvroObject] I tried applying foreach on RDD[MutableList[AvroObject]] --> RDD[AvroObject] var uA = sparkContext.accumulableCollection[MutableList[AvroObject], universe](MutableList[AvroObject]()) rdd_list_avroObj.foreach(u => { uA ++= u }) var uRDD = sparkContext.parallelize(uA.value) Its failing on large dataset with following error java.io.IOException: java.lang.StackOverflowError at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.StackOverflowError at java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359) at java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) I have two queries regarding this issue: Option 1: REplacement of accumulator Option 2: In scala method instead of returning List[AvroObject] can I send multiple AvroObject. SO that I'll get RDD[AvroObject] Note: I am using Saprk 1.3.0 Input DataSize 200GB Cluster 3 Machines(2 Cores, 8GB) Spark running in YARN Mode Thanks & Regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError
Hi, I am trying one transformation by calling scala method this scala method returns MutableList[AvroObject] def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): scala.collection.mutable.MutableList[AvroObject] Hence, the output of transaformation is RDD[MutableList[AvroObject]] But I want o/p as RDD[AvroObject] I tried applying foreach on RDD[MutableList[AvroObject]] --> RDD[AvroObject] var uA = sparkContext.accumulableCollection[MutableList[AvroObject], universe](MutableList[AvroObject]()) rdd_list_avroObj.foreach(u => { uA ++= u }) var uRDD = sparkContext.parallelize(uA.value) Its failing on large dataset with following error java.io.IOException: java.lang.StackOverflowError at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.StackOverflowError at java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359) at java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) I have two queries regarding this issue: Option 1: REplacement of accumulator Option 2: In scala method instead of returning List[AvroObject] can I send multiple AvroObject. SO that I'll get RDD[AvroObject] Note: I am using Saprk 1.3.0 Input DataSize 200GB Cluster 3 Machines(2 Cores, 8GB) Spark running in YARN Mode Thanks & Regards Shweta Jadhav Tata Consultancy Services Limited Cell:- +91-9867515614 Mailto: jadhav.shw...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
java.lang.StackOverflowError when doing spark sql
I am using spark 1.2.0(prebuild with hadoop 2.4) on windows7 I found a same bug here https://issues.apache.org/jira/browse/SPARK-4208,but it is still open, is there a workaround for this? Thanks! The stack trace: StackOverflow Exception occurs Exception in thread "main" java.lang.StackOverflowError at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
java.lang.stackoverflowerror when running Spark shell
I tested the examples according to the docs in spark sql programming guide, but the java.lang.stackoverflowerror occurred everytime I called sqlContext.sql("..."). Meanwhile, it worked fine in a hiveContext. The Hadoop version is 2.2.0, the Spark version is 1.1.0, built with Yarn, Hive.. I would be grateful if u could give me a clue. <http://apache-spark-user-list.1001560.n3.nabble.com/file/n14979/img.png> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-stackoverflowerror-when-running-Spark-shell-tp14979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.StackOverflowError when calling count()
hi, TD. Thanks very much! I got it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11980.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.StackOverflowError when calling count()
The long lineage causes a long/deep Java object tree (DAG of RDD objects), which needs to be serialized as part of the task creation. When serializing, the whole object DAG needs to be traversed leading to the stackoverflow error. TD On Mon, Aug 11, 2014 at 7:14 PM, randylu wrote: > hi, TD. I also fall into the trap of long lineage, and your suggestions do > work well. But i don't understand why the long lineage can cause stackover, > and where it takes effect? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: java.lang.StackOverflowError when calling count()
hi, TD. I also fall into the trap of long lineage, and your suggestions do work well. But i don't understand why the long lineage can cause stackover, and where it takes effect? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.StackOverflowError
Could you create an re-producable script (and data) to allow us to investigate this? Davies On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu wrote: > Hi, > I am doing some basic preprocessing in pyspark (local mode as follows): > > files = [ input files] > def read(filename,sc): > #process file > return rdd > > if __name__ =="__main__": >conf = SparkConf() > conf.setMaster('local') > sc = SparkContext(conf =conf) > sc.setCheckpointDir(root+"temp/") > > data = sc.parallelize([]) > > for i,f in enumerate(files): > > data = data.union(read(f,sc)) union is an lazy transformation, you could union them at once, rdds = [read(f,sc) for f in files] rdd = sc.union(rdds) > if i ==20: > data.checkpoint() > data.count() > if i == 500:break > #print data.count() > #rdd_1 = read(files[0],sc) > data.saveAsTextFile(root+"output/") > > > But I see this error: > keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) > File > "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o9564.saveAsTextFile. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > serialization failed: java.lang.StackOverflowError > java.io.Bits.putInt(Bits.java:93) > java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.StackOverflowError
Bump On Tuesday, August 5, 2014, Chengi Liu wrote: > Hi, > I am doing some basic preprocessing in pyspark (local mode as follows): > > files = [ input files] > def read(filename,sc): > #process file > return rdd > > if __name__ =="__main__": >conf = SparkConf() > conf.setMaster('local') > sc = SparkContext(conf =conf) > sc.setCheckpointDir(root+"temp/") > > data = sc.parallelize([]) > > for i,f in enumerate(files): > > data = data.union(read(f,sc)) > if i ==20: > data.checkpoint() > data.count() > if i == 500:break > #print data.count() > #rdd_1 = read(files[0],sc) > data.saveAsTextFile(root+"output/") > > > But I see this error: > keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) > File > "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o9564.saveAsTextFile. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > serialization failed: java.lang.StackOverflowError > java.io.Bits.putInt(Bits.java:93) > > java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927) >
java.lang.StackOverflowError
Hi, I am doing some basic preprocessing in pyspark (local mode as follows): files = [ input files] def read(filename,sc): #process file return rdd if __name__ =="__main__": conf = SparkConf() conf.setMaster('local') sc = SparkContext(conf =conf) sc.setCheckpointDir(root+"temp/") data = sc.parallelize([]) for i,f in enumerate(files): data = data.union(read(f,sc)) if i ==20: data.checkpoint() data.count() if i == 500:break #print data.count() #rdd_1 = read(files[0],sc) data.saveAsTextFile(root+"output/") But I see this error: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o9564.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.Bits.putInt(Bits.java:93) java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)
Re: java.lang.StackOverflowError when calling count()
Responses inline. On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 wrote: > Hi, > Thanks TD for your reply. I am still not able to resolve the problem for my > use case. > I have let's say 1000 different RDD's, and I am applying a transformation > function on each RDD and I want the output of all rdd's combined to a single > output RDD. For, this I am doing the following: > > ** > tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); > *//creating new rdd in every loop* > outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into > a single RDD* > > *//after every 10 iteration, in order to truncate the lineage* > cachRDD = outRDD.cache(); > cachRDD.checkpoint(); > System.out.println(cachRDD.collect().size()); > outRDD = new JavaRDD(cachRDD.rdd(),cachRDD.classTag()); > ** > > *//finally after whole computation* > outRDD.saveAsTextFile(..) > > The above operations is overall slow, runs successfully when performed less > iterations i.e. ~100. But, when the num of iterations in increased to ~1000, > The whole job is taking more than *30 mins* and ultimately break down giving > OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am > running the job on spark standalone mode with 2 cores and 2.9 GB memory. I think this is happening because how you are caching the output RDD that are being generated repeatedly. In every iteration, it is building this new union RDD which contains the data of the previous union RDD plus some new data. Since each of these union RDDs are cached, the underlying data is being cached repeatedly. So the cached Iteration 1: union RDD: X MB Iteration 2: union RDD: 2X MB | Total size cached: 3X Iteration 3: union RDD: 3X MB | Total size cached: 6X MB Iteration 4: union RDD: 4X MB | Total size cached: 10X MB ... If you do the math, that is a quadratic increase in the size of the data being processed and cached, wrt the # iterations. This leads to both increase in run time and memory usage. > I also observed that when collect() operation is performed, the number of > tasks keeps on increasing as the loop proceeds, like on first collect() 22 > total task, then ~40 total tasks ... ~300 task for single collect. > Does this means that all the operations are repeatedly performed, and RDD > lineage is not broken?? > Same reason as above. Each union RDD is build by appending the partitions of the previous union RDD plus the new set of partitions (~22 partitions). So Nth union RDD has N * 22 partitions, hence that many tasks. You could change this by also doing repartitioning when you want to cache+checkpoint the union RDD (therefore, outRDD.repartition(100).cache().checkpoint().count()). And do you really need all the data to be collected at the driver? If you are doing the cachRDD.collect() just to forced the checkpoint, then use cachRDD.count() > > Can you please elaborate on the point from your last post i.e. how to > perform: "*Create a modified RDD R` which has the same data as RDD R but > does not have the lineage. This is done by creating a new BlockRDD using the > ids of blocks of data representing the in-memory R*" > Please refer to the lines in the function: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74 What those lines do is save the data of the associated RDD to HDFS files, and then create a new CheckpointRDD from the same files.Then the dependency of the associated RDD is changed to use the new RDD. This truncates the lineage because the associated RDD's parent is not the new RDD which has a very short lineage (links to checkpoint files). And the previous dependencies (parent RDDs) are forgotten. This implementation can be modified by forcing the data of the associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And then instead of CheckpointRDD, you can create a new BlockRDD (using the names of the blocks that are used to cache the RDD), which is then set as the new dependency. This is definitely a behind-the-public API implementation, that is > > > - > Lalit Yadav > la...@sigmoidanalytics.com > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.StackOverflowError when calling count()
Hi, Thanks TD for your reply. I am still not able to resolve the problem for my use case. I have let's say 1000 different RDD's, and I am applying a transformation function on each RDD and I want the output of all rdd's combined to a single output RDD. For, this I am doing the following: ** tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); *//creating new rdd in every loop* outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into a single RDD* *//after every 10 iteration, in order to truncate the lineage* cachRDD = outRDD.cache(); cachRDD.checkpoint(); System.out.println(cachRDD.collect().size()); outRDD = new JavaRDD(cachRDD.rdd(),cachRDD.classTag()); ** *//finally after whole computation* outRDD.saveAsTextFile(..) The above operations is overall slow, runs successfully when performed less iterations i.e. ~100. But, when the num of iterations in increased to ~1000, The whole job is taking more than *30 mins* and ultimately break down giving OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am running the job on spark standalone mode with 2 cores and 2.9 GB memory. I also observed that when collect() operation is performed, the number of tasks keeps on increasing as the loop proceeds, like on first collect() 22 total task, then ~40 total tasks ... ~300 task for single collect. Does this means that all the operations are repeatedly performed, and RDD lineage is not broken?? Can you please elaborate on the point from your last post i.e. how to perform: "*Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R*" - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.StackOverflowError when calling count()
Just to add some more clarity in the discussion, there is a difference between caching to memory and checkpointing, when considered from the lineage point of view. When an RDD in checkpointed, the data of the RDD is saved to HDFS (or any Hadoop API compatible fault-tolerant storage) and the lineage of the RDD is truncated. This is okay because in case of the worker failure, the RDD data can be read back from the fault-tolerant storage. When an RDD is cached, the data of the RDD is cached in memory, but the lineage is not truncated. This is because if the in-memory data is lost, the lineage is required to recompute the data. So to deal with stackoverflow errors due to long lineage, just caching is not going to be useful. You have to checkpoint the RDD, and as far as I think, its correct way to do this is to do the following 1. Mark RDD of every Nth iteration for caching and checkpointing (both). 2. And before generating N+1 th iteration RDD, force the materialization of this RDD by doing a rdd.count(). This will persist the RDD in memory as well as save to HDFS and truncate the lineage. If you just mark all Nth iteration RDD for checkpointing, but only force the materialization after ALL the iterations (not after every N+1 th iteration as I suggested) that will still lead to stackoverflow errors. Yes this checkpointing and materialization is definitely decrease performance, but that is the limitation of the current implementation. If you are brave enough, you can try the following. Instead of relying on checkpointing to HDFS for truncating lineage, you can do the following. 1. Persist Nth RDD with replication (see different StorageLevels), this would replicated the in-memory RDD between workers within Spark. Lets call this RDD as R. 2. Force it materialize in the memory. 3. Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R (can elaborate on that if you want). This will avoid writing to HDFS (replication in the Spark memory), but truncate the lineage (by creating new BlockRDDs), and avoid stackoverflow error. Hope this helps TD On Wed, May 14, 2014 at 3:33 AM, lalit1303 wrote: > If we do cache() + count() after say every 50 iterations. The whole process > becomes very slow. > I have tried checkpoint() , cache() + count(), saveAsObjectFiles(). > Nothing works. > Materializing RDD's lead to drastic decrease in performance & if we don't > materialize, we face stackoverflowerror. > > > > - > Lalit Yadav > la...@sigmoidanalytics.com > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p5699.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
Re: java.lang.StackOverflowError when calling count()
If we do cache() + count() after say every 50 iterations. The whole process becomes very slow. I have tried checkpoint() , cache() + count(), saveAsObjectFiles(). Nothing works. Materializing RDD's lead to drastic decrease in performance & if we don't materialize, we face stackoverflowerror. On Wed, May 14, 2014 at 10:25 AM, Nick Chammas [via Apache Spark User List] wrote: > Would cache() + count() every N iterations work just as well as > checkPoint() + count() to get around this issue? > > We're basically trying to get Spark to avoid working on too lengthy a > lineage at once, right? > > Nick > > > On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng <[hidden > email]<http://user/SendEmail.jtp?type=node&node=5683&i=0> > > wrote: > >> After checkPoint, call count directly to materialize it. -Xiangrui >> >> On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <[hidden >> email]<http://user/SendEmail.jtp?type=node&node=5683&i=1>> >> wrote: >> > We are running into same issue. After 700 or so files the stack >> overflows, >> > cache, persist & checkpointing dont help. >> > Basically checkpointing only saves the RDD when it is materialized & it >> only >> > materializes in the end, then it runs out of stack. >> > >> > Regards >> > Mayur >> > >> > Mayur Rustagi >> > Ph: +1 >> (760) 203 3257 >> > http://www.sigmoidanalytics.com >> > @mayur_rustagi >> >> > >> > >> > >> > On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden >> > email]<http://user/SendEmail.jtp?type=node&node=5683&i=2>> >> wrote: >> >> >> >> You have a long lineage that causes the StackOverflow error. Try >> >> rdd.checkPoint() and rdd.count() for every 20~30 iterations. >> >> checkPoint can cut the lineage. -Xiangrui >> >> >> >> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden >> >> email]<http://user/SendEmail.jtp?type=node&node=5683&i=3>> >> wrote: >> >> > Dear Sparkers: >> >> > >> >> > I am using Python spark of version 0.9.0 to implement some iterative >> >> > algorithm. I got some errors shown at the end of this email. It seems >> >> > that >> >> > it's due to the Java Stack Overflow error. The same error has been >> >> > duplicated on a mac desktop and a linux workstation, both running the >> >> > same >> >> > version of Spark. >> >> > >> >> > The same line of code works correctly after quite some iterations. At >> >> > the >> >> > line of error, rdd__new.count() could be 0. (In some previous rounds, >> >> > this >> >> > was also 0 without any problem). >> >> > >> >> > Any thoughts on this? >> >> > >> >> > Thank you very much, >> >> > - Guanhua >> >> > >> >> > >> >> > >> >> > CODE:print "round", round, rdd__new.count() >> >> > >> >> > File >> >> > >> >> > >> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> >> > line 542, in count >> >> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to >> >> > java.lang.StackOverflowError [duplicate 1] >> >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> >> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; >> >> > aborting job >> >> > File >> >> > >> >> > >> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> >> > line 533, in sum >> >> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state >> >> > FAILED >> >> > from TID 1774 because its task set is gone >> >> > return self.mapPartitions(lambda x: >> [sum(x)]).reduce(operator.add) >> >> > File >> >> > >> >> > >> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> >> > line 499, in reduce >> >> > vals = self.mapPartitions(func).collect() >> >> > File >>
Re: java.lang.StackOverflowError when calling count()
Would cache() + count() every N iterations work just as well as checkPoint() + count() to get around this issue? We're basically trying to get Spark to avoid working on too lengthy a lineage at once, right? Nick On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng wrote: > After checkPoint, call count directly to materialize it. -Xiangrui > > On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi > wrote: > > We are running into same issue. After 700 or so files the stack > overflows, > > cache, persist & checkpointing dont help. > > Basically checkpointing only saves the RDD when it is materialized & it > only > > materializes in the end, then it runs out of stack. > > > > Regards > > Mayur > > > > Mayur Rustagi > > Ph: +1 (760) 203 3257 > > http://www.sigmoidanalytics.com > > @mayur_rustagi > > > > > > > > On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng > wrote: > >> > >> You have a long lineage that causes the StackOverflow error. Try > >> rdd.checkPoint() and rdd.count() for every 20~30 iterations. > >> checkPoint can cut the lineage. -Xiangrui > >> > >> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: > >> > Dear Sparkers: > >> > > >> > I am using Python spark of version 0.9.0 to implement some iterative > >> > algorithm. I got some errors shown at the end of this email. It seems > >> > that > >> > it's due to the Java Stack Overflow error. The same error has been > >> > duplicated on a mac desktop and a linux workstation, both running the > >> > same > >> > version of Spark. > >> > > >> > The same line of code works correctly after quite some iterations. At > >> > the > >> > line of error, rdd__new.count() could be 0. (In some previous rounds, > >> > this > >> > was also 0 without any problem). > >> > > >> > Any thoughts on this? > >> > > >> > Thank you very much, > >> > - Guanhua > >> > > >> > > >> > > >> > CODE:print "round", round, rdd__new.count() > >> > > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 542, in count > >> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to > >> > java.lang.StackOverflowError [duplicate 1] > >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > >> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; > >> > aborting job > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 533, in sum > >> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state > >> > FAILED > >> > from TID 1774 because its task set is gone > >> > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 499, in reduce > >> > vals = self.mapPartitions(func).collect() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 463, in collect > >> > bytesInJava = self._jrdd.collect().iterator() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > >> > line 537, in __call__ > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > >> > line 300, in get_return_value > >> > py4j.protocol.Py4JJavaError: An error occurred while calling > >> > o4317.collect. > >> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 > >> > times > >> > (most recent failure: Exception failure: java.lang.StackOverflowError) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.sca
Re: java.lang.StackOverflowError when calling count()
If we do cache() + count() after say every 50 iterations. The whole process becomes very slow. I have tried checkpoint() , cache() + count(), saveAsObjectFiles(). Nothing works. Materializing RDD's lead to drastic decrease in performance & if we don't materialize, we face stackoverflowerror. - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p5699.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.StackOverflowError when calling count()
Count causes the overall performance to drop drastically. Infact beyond 50 files it starts to hang. if i force materialization. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Tue, May 13, 2014 at 9:34 PM, Xiangrui Meng wrote: > After checkPoint, call count directly to materialize it. -Xiangrui > > On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi > wrote: > > We are running into same issue. After 700 or so files the stack > overflows, > > cache, persist & checkpointing dont help. > > Basically checkpointing only saves the RDD when it is materialized & it > only > > materializes in the end, then it runs out of stack. > > > > Regards > > Mayur > > > > Mayur Rustagi > > Ph: +1 (760) 203 3257 > > http://www.sigmoidanalytics.com > > @mayur_rustagi > > > > > > > > On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng > wrote: > >> > >> You have a long lineage that causes the StackOverflow error. Try > >> rdd.checkPoint() and rdd.count() for every 20~30 iterations. > >> checkPoint can cut the lineage. -Xiangrui > >> > >> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: > >> > Dear Sparkers: > >> > > >> > I am using Python spark of version 0.9.0 to implement some iterative > >> > algorithm. I got some errors shown at the end of this email. It seems > >> > that > >> > it's due to the Java Stack Overflow error. The same error has been > >> > duplicated on a mac desktop and a linux workstation, both running the > >> > same > >> > version of Spark. > >> > > >> > The same line of code works correctly after quite some iterations. At > >> > the > >> > line of error, rdd__new.count() could be 0. (In some previous rounds, > >> > this > >> > was also 0 without any problem). > >> > > >> > Any thoughts on this? > >> > > >> > Thank you very much, > >> > - Guanhua > >> > > >> > > >> > > >> > CODE:print "round", round, rdd__new.count() > >> > > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 542, in count > >> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to > >> > java.lang.StackOverflowError [duplicate 1] > >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > >> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; > >> > aborting job > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 533, in sum > >> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state > >> > FAILED > >> > from TID 1774 because its task set is gone > >> > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 499, in reduce > >> > vals = self.mapPartitions(func).collect() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 463, in collect > >> > bytesInJava = self._jrdd.collect().iterator() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > >> > line 537, in __call__ > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > >> > line 300, in get_return_value > >> > py4j.protocol.Py4JJavaError: An error occurred while calling > >> > o4317.collect. > >> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 > >> > times > >> > (most recent failure: Exception failure: java.lang.StackOverflowError) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedu
Re: java.lang.StackOverflowError when calling count()
Thanks Xiangrui. After some debugging efforts, it turns out that the problem results from a bug in my code. But it's good to know that a long lineage could also lead to this problem. I will also try checkpointing to see whether the performance can be improved. Best regards, - Guanhua On 5/13/14 12:10 AM, "Xiangrui Meng" wrote: >You have a long lineage that causes the StackOverflow error. Try >rdd.checkPoint() and rdd.count() for every 20~30 iterations. >checkPoint can cut the lineage. -Xiangrui > >On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: >> Dear Sparkers: >> >> I am using Python spark of version 0.9.0 to implement some iterative >> algorithm. I got some errors shown at the end of this email. It seems >>that >> it's due to the Java Stack Overflow error. The same error has been >> duplicated on a mac desktop and a linux workstation, both running the >>same >> version of Spark. >> >> The same line of code works correctly after quite some iterations. At >>the >> line of error, rdd__new.count() could be 0. (In some previous rounds, >>this >> was also 0 without any problem). >> >> Any thoughts on this? >> >> Thank you very much, >> - Guanhua >> >> >> >> CODE:print "round", round, rdd__new.count() >> ==== >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ >>rdd.py", >> line 542, in count >> 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to >> java.lang.StackOverflowError [duplicate 1] >> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; >> aborting job >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ >>rdd.py", >> line 533, in sum >> 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state >>FAILED >> from TID 1774 because its task set is gone >> return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ >>rdd.py", >> line 499, in reduce >> vals = self.mapPartitions(func).collect() >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/ >>rdd.py", >> line 463, in collect >> bytesInJava = self._jrdd.collect().iterator() >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j >>-0.8.1-src.zip/py4j/java_gateway.py", >> line 537, in __call__ >> File >> >>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j >>-0.8.1-src.zip/py4j/protocol.py", >> line 300, in get_return_value >> py4j.protocol.Py4JJavaError: An error occurred while calling >>o4317.collect. >> : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 >>times >> (most recent failure: Exception failure: java.lang.StackOverflowError) >> at >> >>org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul >>er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) >> at >> >>org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul >>er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) >> at >> >>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>a:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> >>org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSch >>eduler$$abortStage(DAGScheduler.scala:1026) >> at >> >>org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA >>GScheduler.scala:619) >> at >> >>org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA >>GScheduler.scala:619) >> at scala.Option.foreach(Option.scala:236) >> at >> >>org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:6 >>19) >> at >> >>org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun >>$receive$1.applyOrElse(DAGScheduler.scala:207) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.
Re: java.lang.StackOverflowError when calling count()
After checkPoint, call count directly to materialize it. -Xiangrui On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi wrote: > We are running into same issue. After 700 or so files the stack overflows, > cache, persist & checkpointing dont help. > Basically checkpointing only saves the RDD when it is materialized & it only > materializes in the end, then it runs out of stack. > > Regards > Mayur > > Mayur Rustagi > Ph: +1 (760) 203 3257 > http://www.sigmoidanalytics.com > @mayur_rustagi > > > > On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng wrote: >> >> You have a long lineage that causes the StackOverflow error. Try >> rdd.checkPoint() and rdd.count() for every 20~30 iterations. >> checkPoint can cut the lineage. -Xiangrui >> >> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: >> > Dear Sparkers: >> > >> > I am using Python spark of version 0.9.0 to implement some iterative >> > algorithm. I got some errors shown at the end of this email. It seems >> > that >> > it's due to the Java Stack Overflow error. The same error has been >> > duplicated on a mac desktop and a linux workstation, both running the >> > same >> > version of Spark. >> > >> > The same line of code works correctly after quite some iterations. At >> > the >> > line of error, rdd__new.count() could be 0. (In some previous rounds, >> > this >> > was also 0 without any problem). >> > >> > Any thoughts on this? >> > >> > Thank you very much, >> > - Guanhua >> > >> > >> > ==== >> > CODE:print "round", round, rdd__new.count() >> > >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> > line 542, in count >> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to >> > java.lang.StackOverflowError [duplicate 1] >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; >> > aborting job >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> > line 533, in sum >> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state >> > FAILED >> > from TID 1774 because its task set is gone >> > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> > line 499, in reduce >> > vals = self.mapPartitions(func).collect() >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", >> > line 463, in collect >> > bytesInJava = self._jrdd.collect().iterator() >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", >> > line 537, in __call__ >> > File >> > >> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", >> > line 300, in get_return_value >> > py4j.protocol.Py4JJavaError: An error occurred while calling >> > o4317.collect. >> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 >> > times >> > (most recent failure: Exception failure: java.lang.StackOverflowError) >> > at >> > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) >> > at >> > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) >> > at >> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> > at >> > >> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) >> > at >> > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) >> > at >> > >> > org.apache.spark.scheduler.DAGScheduler$$
Re: java.lang.StackOverflowError when calling count()
We are running into same issue. After 700 or so files the stack overflows, cache, persist & checkpointing dont help. Basically checkpointing only saves the RDD when it is materialized & it only materializes in the end, then it runs out of stack. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng wrote: > You have a long lineage that causes the StackOverflow error. Try > rdd.checkPoint() and rdd.count() for every 20~30 iterations. > checkPoint can cut the lineage. -Xiangrui > > On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: > > Dear Sparkers: > > > > I am using Python spark of version 0.9.0 to implement some iterative > > algorithm. I got some errors shown at the end of this email. It seems > that > > it's due to the Java Stack Overflow error. The same error has been > > duplicated on a mac desktop and a linux workstation, both running the > same > > version of Spark. > > > > The same line of code works correctly after quite some iterations. At the > > line of error, rdd__new.count() could be 0. (In some previous rounds, > this > > was also 0 without any problem). > > > > Any thoughts on this? > > > > Thank you very much, > > - Guanhua > > > > > > > > CODE:print "round", round, rdd__new.count() > > > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > > line 542, in count > > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to > > java.lang.StackOverflowError [duplicate 1] > > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; > > aborting job > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > > line 533, in sum > > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state > FAILED > > from TID 1774 because its task set is gone > > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > > line 499, in reduce > > vals = self.mapPartitions(func).collect() > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > > line 463, in collect > > bytesInJava = self._jrdd.collect().iterator() > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > > line 537, in __call__ > > File > > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > > line 300, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > o4317.collect. > > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 > times > > (most recent failure: Exception failure: java.lang.StackOverflowError) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > > at scala.Option.foreach(Option.scala:236) > > at > > > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > at
Re: java.lang.StackOverflowError when calling count()
You have a long lineage that causes the StackOverflow error. Try rdd.checkPoint() and rdd.count() for every 20~30 iterations. checkPoint can cut the lineage. -Xiangrui On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan wrote: > Dear Sparkers: > > I am using Python spark of version 0.9.0 to implement some iterative > algorithm. I got some errors shown at the end of this email. It seems that > it's due to the Java Stack Overflow error. The same error has been > duplicated on a mac desktop and a linux workstation, both running the same > version of Spark. > > The same line of code works correctly after quite some iterations. At the > line of error, rdd__new.count() could be 0. (In some previous rounds, this > was also 0 without any problem). > > Any thoughts on this? > > Thank you very much, > - Guanhua > > > > CODE:print "round", round, rdd__new.count() > > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > line 542, in count > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to > java.lang.StackOverflowError [duplicate 1] > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; > aborting job > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > line 533, in sum > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED > from TID 1774 because its task set is gone > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > line 499, in reduce > vals = self.mapPartitions(func).collect() > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > line 463, in collect > bytesInJava = self._jrdd.collect().iterator() > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > line 537, in __call__ > File > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect. > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times > (most recent failure: Exception failure: java.lang.StackOverflowError) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > == > The stack overflow error is shown as follows: > == > > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 > java.lang.StackOverflowError > at java.util.zip.Inflater.inflate(Inflater.java:259) > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) > at > java.io.ObjectInputStream$PeekInputStream
java.lang.StackOverflowError when calling count()
Dear Sparkers: I am using Python spark of version 0.9.0 to implement some iterative algorithm. I got some errors shown at the end of this email. It seems that it's due to the Java Stack Overflow error. The same error has been duplicated on a mac desktop and a linux workstation, both running the same version of Spark. The same line of code works correctly after quite some iterations. At the line of error, rdd__new.count() could be 0. (In some previous rounds, this was also 0 without any problem). Any thoughts on this? Thank you very much, - Guanhua CODE:print "round", round, rdd__new.count() File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py", line 542, in count 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to java.lang.StackOverflowError [duplicate 1] return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; aborting job File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py", line 533, in sum 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED from TID 1774 because its task set is gone return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py", line 499, in reduce vals = self.mapPartitions(func).collect() File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd .py", line 463, in collect bytesInJava = self._jrdd.collect().iterator() File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/java_gateway.py", line 537, in __call__ File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0. 8.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect. : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$ DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5 9) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu ler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc heduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re ceive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis patcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1 339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java :107) == The stack overflow error is shown as follows: == 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 java.lang.StackOverflowError at java.util.zip.Inflater.inflate(Inflater.java:259) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2 323) at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav a:2818) at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at j