Re: SparkML RandomForest java.lang.StackOverflowError

2016-04-01 Thread Joseph Bradley
Can you try reducing maxBins?  That reduces communication (at the cost of
coarser discretization of continuous features).

On Fri, Apr 1, 2016 at 11:32 AM, Joseph Bradley <jos...@databricks.com>
wrote:

> In my experience, 20K is a lot but often doable; 2K is easy; 200 is
> small.  Communication scales linearly in the number of features.
>
> On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Joseph,
>>
>> Correction, there 20k features. Is it still a lot?
>> What number of features can be considered as normal?
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley <jos...@databricks.com>
>> wrote:
>>
>>> First thought: 70K features is *a lot* for the MLlib implementation (and
>>> any PLANET-like implementation)
>>>
>>> Using fewer partitions is a good idea.
>>>
>>> Which Spark version was this on?
>>>
>>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
>>>> The questions I have in mind:
>>>>
>>>> Is it smth that the one might expect? From the stack trace itself it's
>>>> not clear where does it come from.
>>>> Is it an already known bug? Although I haven't found anything like that.
>>>> Is it possible to configure something to workaround / avoid this?
>>>>
>>>> I'm not sure it's the right thing to do, but I've
>>>> increased thread stack size 10 times (to 80MB)
>>>> reduced default parallelism 10 times (only 20 cores are available)
>>>>
>>>> Thank you in advance.
>>>>
>>>> --
>>>> Be well!
>>>> Jean Morozov
>>>>
>>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>>>> evgeny.a.moro...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a web service that provides rest api to train random forest
>>>>> algo.
>>>>> I train random forest on a 5 nodes spark cluster with enough memory -
>>>>> everything is cached (~22 GB).
>>>>> On a small datasets up to 100k samples everything is fine, but with
>>>>> the biggest one (400k samples and ~70k features) I'm stuck with
>>>>> StackOverflowError.
>>>>>
>>>>> Additional options for my web service
>>>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>>>>> spark.default.parallelism = 200.
>>>>>
>>>>> On a 400k samples dataset
>>>>> - (with default thread stack size) it took 4 hours of training to get
>>>>> the error.
>>>>> - with increased stack size it took 60 hours to hit it.
>>>>> I can increase it, but it's hard to say what amount of memory it needs
>>>>> and it's applied to all of the treads and might waste a lot of memory.
>>>>>
>>>>> I'm looking at different stages at event timeline now and see that
>>>>> task deserialization time gradually increases. And at the end task
>>>>> deserialization time is roughly same as executor computing time.
>>>>>
>>>>> Code I use to train model:
>>>>>
>>>>> int MAX_BINS = 16;
>>>>> int NUM_CLASSES = 0;
>>>>> double MIN_INFO_GAIN = 0.0;
>>>>> int MAX_MEMORY_IN_MB = 256;
>>>>> double SUBSAMPLING_RATE = 1.0;
>>>>> boolean USE_NODEID_CACHE = true;
>>>>> int CHECKPOINT_INTERVAL = 10;
>>>>> int RANDOM_SEED = 12345;
>>>>>
>>>>> int NODE_SIZE = 5;
>>>>> int maxDepth = 30;
>>>>> int numTrees = 50;
>>>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>>>>> maxDepth, NUM_CLASSES, MAX_BINS,
>>>>> QuantileStrategy.Sort(), new 
>>>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
>>>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>>>>> CHECKPOINT_INTERVAL);
>>>>> RandomForestModel model = 
>>>>> RandomForest.trainRegressor(labeledPoints.rdd(), strategy, numTrees, 
>>>>> "auto", RANDOM_SEED);
>>>>>
>>>>>
>>>>> Any advice would be highly appreciated.
>>>>>
>>>>> The exception 

Re: SparkML RandomForest java.lang.StackOverflowError

2016-04-01 Thread Joseph Bradley
In my experience, 20K is a lot but often doable; 2K is easy; 200 is small.
Communication scales linearly in the number of features.

On Thu, Mar 31, 2016 at 6:12 AM, Eugene Morozov <evgeny.a.moro...@gmail.com>
wrote:

> Joseph,
>
> Correction, there 20k features. Is it still a lot?
> What number of features can be considered as normal?
>
> --
> Be well!
> Jean Morozov
>
> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>
>> First thought: 70K features is *a lot* for the MLlib implementation (and
>> any PLANET-like implementation)
>>
>> Using fewer partitions is a good idea.
>>
>> Which Spark version was this on?
>>
>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> The questions I have in mind:
>>>
>>> Is it smth that the one might expect? From the stack trace itself it's
>>> not clear where does it come from.
>>> Is it an already known bug? Although I haven't found anything like that.
>>> Is it possible to configure something to workaround / avoid this?
>>>
>>> I'm not sure it's the right thing to do, but I've
>>> increased thread stack size 10 times (to 80MB)
>>> reduced default parallelism 10 times (only 20 cores are available)
>>>
>>> Thank you in advance.
>>>
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a web service that provides rest api to train random forest
>>>> algo.
>>>> I train random forest on a 5 nodes spark cluster with enough memory -
>>>> everything is cached (~22 GB).
>>>> On a small datasets up to 100k samples everything is fine, but with the
>>>> biggest one (400k samples and ~70k features) I'm stuck with
>>>> StackOverflowError.
>>>>
>>>> Additional options for my web service
>>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>>>> spark.default.parallelism = 200.
>>>>
>>>> On a 400k samples dataset
>>>> - (with default thread stack size) it took 4 hours of training to get
>>>> the error.
>>>> - with increased stack size it took 60 hours to hit it.
>>>> I can increase it, but it's hard to say what amount of memory it needs
>>>> and it's applied to all of the treads and might waste a lot of memory.
>>>>
>>>> I'm looking at different stages at event timeline now and see that task
>>>> deserialization time gradually increases. And at the end task
>>>> deserialization time is roughly same as executor computing time.
>>>>
>>>> Code I use to train model:
>>>>
>>>> int MAX_BINS = 16;
>>>> int NUM_CLASSES = 0;
>>>> double MIN_INFO_GAIN = 0.0;
>>>> int MAX_MEMORY_IN_MB = 256;
>>>> double SUBSAMPLING_RATE = 1.0;
>>>> boolean USE_NODEID_CACHE = true;
>>>> int CHECKPOINT_INTERVAL = 10;
>>>> int RANDOM_SEED = 12345;
>>>>
>>>> int NODE_SIZE = 5;
>>>> int maxDepth = 30;
>>>> int numTrees = 50;
>>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>>>> maxDepth, NUM_CLASSES, MAX_BINS,
>>>> QuantileStrategy.Sort(), new 
>>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
>>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>>>> CHECKPOINT_INTERVAL);
>>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
>>>> strategy, numTrees, "auto", RANDOM_SEED);
>>>>
>>>>
>>>> Any advice would be highly appreciated.
>>>>
>>>> The exception (~3000 lines long):
>>>>  java.lang.StackOverflowError
>>>> at
>>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
>>>> at
>>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
>>>> at
>>>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
>>>> at
>>>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
>>>> at
>>>> java.io.ObjectInputStre

Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-31 Thread Eugene Morozov
Joseph,

Correction, there 20k features. Is it still a lot?
What number of features can be considered as normal?

--
Be well!
Jean Morozov

On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley <jos...@databricks.com>
wrote:

> First thought: 70K features is *a lot* for the MLlib implementation (and
> any PLANET-like implementation)
>
> Using fewer partitions is a good idea.
>
> Which Spark version was this on?
>
> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> The questions I have in mind:
>>
>> Is it smth that the one might expect? From the stack trace itself it's
>> not clear where does it come from.
>> Is it an already known bug? Although I haven't found anything like that.
>> Is it possible to configure something to workaround / avoid this?
>>
>> I'm not sure it's the right thing to do, but I've
>> increased thread stack size 10 times (to 80MB)
>> reduced default parallelism 10 times (only 20 cores are available)
>>
>> Thank you in advance.
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a web service that provides rest api to train random forest algo.
>>> I train random forest on a 5 nodes spark cluster with enough memory -
>>> everything is cached (~22 GB).
>>> On a small datasets up to 100k samples everything is fine, but with the
>>> biggest one (400k samples and ~70k features) I'm stuck with
>>> StackOverflowError.
>>>
>>> Additional options for my web service
>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>>> spark.default.parallelism = 200.
>>>
>>> On a 400k samples dataset
>>> - (with default thread stack size) it took 4 hours of training to get
>>> the error.
>>> - with increased stack size it took 60 hours to hit it.
>>> I can increase it, but it's hard to say what amount of memory it needs
>>> and it's applied to all of the treads and might waste a lot of memory.
>>>
>>> I'm looking at different stages at event timeline now and see that task
>>> deserialization time gradually increases. And at the end task
>>> deserialization time is roughly same as executor computing time.
>>>
>>> Code I use to train model:
>>>
>>> int MAX_BINS = 16;
>>> int NUM_CLASSES = 0;
>>> double MIN_INFO_GAIN = 0.0;
>>> int MAX_MEMORY_IN_MB = 256;
>>> double SUBSAMPLING_RATE = 1.0;
>>> boolean USE_NODEID_CACHE = true;
>>> int CHECKPOINT_INTERVAL = 10;
>>> int RANDOM_SEED = 12345;
>>>
>>> int NODE_SIZE = 5;
>>> int maxDepth = 30;
>>> int numTrees = 50;
>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>>> maxDepth, NUM_CLASSES, MAX_BINS,
>>> QuantileStrategy.Sort(), new 
>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>>> CHECKPOINT_INTERVAL);
>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
>>> strategy, numTrees, "auto", RANDOM_SEED);
>>>
>>>
>>> Any advice would be highly appreciated.
>>>
>>> The exception (~3000 lines long):
>>>  java.lang.StackOverflowError
>>> at
>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
>>> at
>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
>>> at
>>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> at
>>> java.io.ObjectInputStream.readObject0(Obje

Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-30 Thread Eugene Morozov
One more thing.

With increased stack size it completed twice more already, but now I see in
the log.

[dispatcher-event-loop-1] WARN  o.a.spark.scheduler.TaskSetManager - Stage
24860 contains a task of very large size (157 KB). The maximum recommended
task size is 100 KB.

Size of the task increases over time.
When the warning appeared first time it was around 100KB.

Also time to complete collectAsMap at DecisionTree.scala:651 also increased
from 8 seconds at the beginning of the training up to 20-24 seconds now.

--
Be well!
Jean Morozov

On Wed, Mar 30, 2016 at 12:14 AM, Eugene Morozov <evgeny.a.moro...@gmail.com
> wrote:

> Joseph,
>
> I'm using 1.6.0.
>
> --
> Be well!
> Jean Morozov
>
> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>
>> First thought: 70K features is *a lot* for the MLlib implementation (and
>> any PLANET-like implementation)
>>
>> Using fewer partitions is a good idea.
>>
>> Which Spark version was this on?
>>
>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> The questions I have in mind:
>>>
>>> Is it smth that the one might expect? From the stack trace itself it's
>>> not clear where does it come from.
>>> Is it an already known bug? Although I haven't found anything like that.
>>> Is it possible to configure something to workaround / avoid this?
>>>
>>> I'm not sure it's the right thing to do, but I've
>>> increased thread stack size 10 times (to 80MB)
>>> reduced default parallelism 10 times (only 20 cores are available)
>>>
>>> Thank you in advance.
>>>
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a web service that provides rest api to train random forest
>>>> algo.
>>>> I train random forest on a 5 nodes spark cluster with enough memory -
>>>> everything is cached (~22 GB).
>>>> On a small datasets up to 100k samples everything is fine, but with the
>>>> biggest one (400k samples and ~70k features) I'm stuck with
>>>> StackOverflowError.
>>>>
>>>> Additional options for my web service
>>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>>>> spark.default.parallelism = 200.
>>>>
>>>> On a 400k samples dataset
>>>> - (with default thread stack size) it took 4 hours of training to get
>>>> the error.
>>>> - with increased stack size it took 60 hours to hit it.
>>>> I can increase it, but it's hard to say what amount of memory it needs
>>>> and it's applied to all of the treads and might waste a lot of memory.
>>>>
>>>> I'm looking at different stages at event timeline now and see that task
>>>> deserialization time gradually increases. And at the end task
>>>> deserialization time is roughly same as executor computing time.
>>>>
>>>> Code I use to train model:
>>>>
>>>> int MAX_BINS = 16;
>>>> int NUM_CLASSES = 0;
>>>> double MIN_INFO_GAIN = 0.0;
>>>> int MAX_MEMORY_IN_MB = 256;
>>>> double SUBSAMPLING_RATE = 1.0;
>>>> boolean USE_NODEID_CACHE = true;
>>>> int CHECKPOINT_INTERVAL = 10;
>>>> int RANDOM_SEED = 12345;
>>>>
>>>> int NODE_SIZE = 5;
>>>> int maxDepth = 30;
>>>> int numTrees = 50;
>>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>>>> maxDepth, NUM_CLASSES, MAX_BINS,
>>>> QuantileStrategy.Sort(), new 
>>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
>>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>>>> CHECKPOINT_INTERVAL);
>>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
>>>> strategy, numTrees, "auto", RANDOM_SEED);
>>>>
>>>>
>>>> Any advice would be highly appreciated.
>>>>
>>>> The exception (~3000 lines long):
>>>>  java.lang.StackOverflowError
>>>> at
>>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
>>>> at
>>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333

Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-29 Thread Eugene Morozov
Joseph,

I'm using 1.6.0.

--
Be well!
Jean Morozov

On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley <jos...@databricks.com>
wrote:

> First thought: 70K features is *a lot* for the MLlib implementation (and
> any PLANET-like implementation)
>
> Using fewer partitions is a good idea.
>
> Which Spark version was this on?
>
> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> The questions I have in mind:
>>
>> Is it smth that the one might expect? From the stack trace itself it's
>> not clear where does it come from.
>> Is it an already known bug? Although I haven't found anything like that.
>> Is it possible to configure something to workaround / avoid this?
>>
>> I'm not sure it's the right thing to do, but I've
>> increased thread stack size 10 times (to 80MB)
>> reduced default parallelism 10 times (only 20 cores are available)
>>
>> Thank you in advance.
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a web service that provides rest api to train random forest algo.
>>> I train random forest on a 5 nodes spark cluster with enough memory -
>>> everything is cached (~22 GB).
>>> On a small datasets up to 100k samples everything is fine, but with the
>>> biggest one (400k samples and ~70k features) I'm stuck with
>>> StackOverflowError.
>>>
>>> Additional options for my web service
>>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>>> spark.default.parallelism = 200.
>>>
>>> On a 400k samples dataset
>>> - (with default thread stack size) it took 4 hours of training to get
>>> the error.
>>> - with increased stack size it took 60 hours to hit it.
>>> I can increase it, but it's hard to say what amount of memory it needs
>>> and it's applied to all of the treads and might waste a lot of memory.
>>>
>>> I'm looking at different stages at event timeline now and see that task
>>> deserialization time gradually increases. And at the end task
>>> deserialization time is roughly same as executor computing time.
>>>
>>> Code I use to train model:
>>>
>>> int MAX_BINS = 16;
>>> int NUM_CLASSES = 0;
>>> double MIN_INFO_GAIN = 0.0;
>>> int MAX_MEMORY_IN_MB = 256;
>>> double SUBSAMPLING_RATE = 1.0;
>>> boolean USE_NODEID_CACHE = true;
>>> int CHECKPOINT_INTERVAL = 10;
>>> int RANDOM_SEED = 12345;
>>>
>>> int NODE_SIZE = 5;
>>> int maxDepth = 30;
>>> int numTrees = 50;
>>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>>> maxDepth, NUM_CLASSES, MAX_BINS,
>>> QuantileStrategy.Sort(), new 
>>> scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
>>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>>> CHECKPOINT_INTERVAL);
>>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
>>> strategy, numTrees, "auto", RANDOM_SEED);
>>>
>>>
>>> Any advice would be highly appreciated.
>>>
>>> The exception (~3000 lines long):
>>>  java.lang.StackOverflowError
>>> at
>>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
>>> at
>>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
>>> at
>>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.r

Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-29 Thread Joseph Bradley
First thought: 70K features is *a lot* for the MLlib implementation (and
any PLANET-like implementation)

Using fewer partitions is a good idea.

Which Spark version was this on?

On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <evgeny.a.moro...@gmail.com>
wrote:

> The questions I have in mind:
>
> Is it smth that the one might expect? From the stack trace itself it's not
> clear where does it come from.
> Is it an already known bug? Although I haven't found anything like that.
> Is it possible to configure something to workaround / avoid this?
>
> I'm not sure it's the right thing to do, but I've
> increased thread stack size 10 times (to 80MB)
> reduced default parallelism 10 times (only 20 cores are available)
>
> Thank you in advance.
>
> --
> Be well!
> Jean Morozov
>
> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a web service that provides rest api to train random forest algo.
>> I train random forest on a 5 nodes spark cluster with enough memory -
>> everything is cached (~22 GB).
>> On a small datasets up to 100k samples everything is fine, but with the
>> biggest one (400k samples and ~70k features) I'm stuck with
>> StackOverflowError.
>>
>> Additional options for my web service
>> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
>> spark.default.parallelism = 200.
>>
>> On a 400k samples dataset
>> - (with default thread stack size) it took 4 hours of training to get the
>> error.
>> - with increased stack size it took 60 hours to hit it.
>> I can increase it, but it's hard to say what amount of memory it needs
>> and it's applied to all of the treads and might waste a lot of memory.
>>
>> I'm looking at different stages at event timeline now and see that task
>> deserialization time gradually increases. And at the end task
>> deserialization time is roughly same as executor computing time.
>>
>> Code I use to train model:
>>
>> int MAX_BINS = 16;
>> int NUM_CLASSES = 0;
>> double MIN_INFO_GAIN = 0.0;
>> int MAX_MEMORY_IN_MB = 256;
>> double SUBSAMPLING_RATE = 1.0;
>> boolean USE_NODEID_CACHE = true;
>> int CHECKPOINT_INTERVAL = 10;
>> int RANDOM_SEED = 12345;
>>
>> int NODE_SIZE = 5;
>> int maxDepth = 30;
>> int numTrees = 50;
>> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
>> maxDepth, NUM_CLASSES, MAX_BINS,
>> QuantileStrategy.Sort(), new scala.collection.immutable.HashMap<>(), 
>> nodeSize, MIN_INFO_GAIN,
>> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
>> CHECKPOINT_INTERVAL);
>> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
>> strategy, numTrees, "auto", RANDOM_SEED);
>>
>>
>> Any advice would be highly appreciated.
>>
>> The exception (~3000 lines long):
>>  java.lang.StackOverflowError
>> at
>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
>> at
>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
>> at
>> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> scala.collection.immutable.$colon$colon.readObject(List.scala:366)
>> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(Ob

Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-29 Thread Eugene Morozov
The questions I have in mind:

Is it smth that the one might expect? From the stack trace itself it's not
clear where does it come from.
Is it an already known bug? Although I haven't found anything like that.
Is it possible to configure something to workaround / avoid this?

I'm not sure it's the right thing to do, but I've
increased thread stack size 10 times (to 80MB)
reduced default parallelism 10 times (only 20 cores are available)

Thank you in advance.

--
Be well!
Jean Morozov

On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <evgeny.a.moro...@gmail.com>
wrote:

> Hi,
>
> I have a web service that provides rest api to train random forest algo.
> I train random forest on a 5 nodes spark cluster with enough memory -
> everything is cached (~22 GB).
> On a small datasets up to 100k samples everything is fine, but with the
> biggest one (400k samples and ~70k features) I'm stuck with
> StackOverflowError.
>
> Additional options for my web service
> spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
> spark.default.parallelism = 200.
>
> On a 400k samples dataset
> - (with default thread stack size) it took 4 hours of training to get the
> error.
> - with increased stack size it took 60 hours to hit it.
> I can increase it, but it's hard to say what amount of memory it needs and
> it's applied to all of the treads and might waste a lot of memory.
>
> I'm looking at different stages at event timeline now and see that task
> deserialization time gradually increases. And at the end task
> deserialization time is roughly same as executor computing time.
>
> Code I use to train model:
>
> int MAX_BINS = 16;
> int NUM_CLASSES = 0;
> double MIN_INFO_GAIN = 0.0;
> int MAX_MEMORY_IN_MB = 256;
> double SUBSAMPLING_RATE = 1.0;
> boolean USE_NODEID_CACHE = true;
> int CHECKPOINT_INTERVAL = 10;
> int RANDOM_SEED = 12345;
>
> int NODE_SIZE = 5;
> int maxDepth = 30;
> int numTrees = 50;
> Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
> maxDepth, NUM_CLASSES, MAX_BINS,
> QuantileStrategy.Sort(), new scala.collection.immutable.HashMap<>(), 
> nodeSize, MIN_INFO_GAIN,
> MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
> CHECKPOINT_INTERVAL);
> RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
> strategy, numTrees, "auto", RANDOM_SEED);
>
>
> Any advice would be highly appreciated.
>
> The exception (~3000 lines long):
>  java.lang.StackOverflowError
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
> at
> java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> scala.collection.immutable.$colon$colon.readObject(List.scala:366)
> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSe

SparkML RandomForest java.lang.StackOverflowError

2016-03-29 Thread Eugene Morozov
Hi,

I have a web service that provides rest api to train random forest algo.
I train random forest on a 5 nodes spark cluster with enough memory -
everything is cached (~22 GB).
On a small datasets up to 100k samples everything is fine, but with the
biggest one (400k samples and ~70k features) I'm stuck with
StackOverflowError.

Additional options for my web service
spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
spark.default.parallelism = 200.

On a 400k samples dataset
- (with default thread stack size) it took 4 hours of training to get the
error.
- with increased stack size it took 60 hours to hit it.
I can increase it, but it's hard to say what amount of memory it needs and
it's applied to all of the treads and might waste a lot of memory.

I'm looking at different stages at event timeline now and see that task
deserialization time gradually increases. And at the end task
deserialization time is roughly same as executor computing time.

Code I use to train model:

int MAX_BINS = 16;
int NUM_CLASSES = 0;
double MIN_INFO_GAIN = 0.0;
int MAX_MEMORY_IN_MB = 256;
double SUBSAMPLING_RATE = 1.0;
boolean USE_NODEID_CACHE = true;
int CHECKPOINT_INTERVAL = 10;
int RANDOM_SEED = 12345;

int NODE_SIZE = 5;
int maxDepth = 30;
int numTrees = 50;
Strategy strategy = new Strategy(Algo.Regression(),
Variance.instance(), maxDepth, NUM_CLASSES, MAX_BINS,
QuantileStrategy.Sort(), new
scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE,
CHECKPOINT_INTERVAL);
RandomForestModel model =
RandomForest.trainRegressor(labeledPoints.rdd(), strategy, numTrees,
"auto", RANDOM_SEED);


Any advice would be highly appreciated.

The exception (~3000 lines long):
 java.lang.StackOverflowError
at
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
at
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
at
java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

--
Be well!
Jean Morozov


Re: Spark SQL - java.lang.StackOverflowError after caching table

2016-03-24 Thread Mohamed Nadjib MAMI

I ran:

sqlContext.cacheTable("product")
var df = sqlContext.sql("...complex query...")
df.explain(true)

...and obtained: http://pastebin.com/k9skERsr

...where "[...]" corresponds therein to huge lists of records from the 
addressed table (product)


The query is of the following form:
"SELECT distinct p.id, p.`aaa`, p.`bbb` FROM product p, (SELECT distinct 
p1.id FROM product p1 WHERE p1.`ccc`='fff') p2, (SELECT distinct p3.id 
FROM product p3 WHERE p3.`ccc`='ddd') p4 WHERE p.`eee` = '1' AND 
p.id=p2.id AND p.`eee` > 137 AND p4.id=p.id UNION SELECT distinct 
p.id,p.`bbb`, p.`bbb` FROM product p, (SELECT distinct p1.id FROM 
product p1 WHERE p1.`ccc`='fff') p2, (SELECT distinct p5.id FROM product 
p5 WHERE p5.`ccc`='ggg') p6 WHERE p.`eee` = '1' AND p.id=p2.id AND 
p.`hhh` > 93 AND p6.id=p.id ORDER BY p.`bbb` LIMIT 10"



On 24.03.2016 22:16, Ted Yu wrote:
Can you obtain output from explain(true) on the query after 
cacheTable() call ?


Potentially related JIRA:

[SPARK-13657] [SQL] Support parsing very long AND/OR expressions


On Thu, Mar 24, 2016 at 12:55 PM, Mohamed Nadjib MAMI 
<m...@iai.uni-bonn.de <mailto:m...@iai.uni-bonn.de>> wrote:


Here is the stack trace: http://pastebin.com/ueHqiznH

Here's the code:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val table = sqlContext.read.parquet("hdfs://...parquet_table")
table.registerTempTable("table")

sqlContext.sql("...complex query...").show() /** works */

sqlContext.cacheTable("table")

sqlContext.sql("...complex query...").show() /** works */

sqlContext.sql("...complex query...").show() /** fails */



On 24.03.2016 13:40, Ted Yu wrote:

Can you pastebin the stack trace ?

If you can show snippet of your code, that would help give us more clue.

Thanks


On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI<m...@iai.uni-bonn.de> 
<mailto:m...@iai.uni-bonn.de>  wrote:

Hi all,
I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a 
problem with table caching (sqlContext.cacheTable()), using spark-shell of 
Spark 1.5.1.

After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes longer 
the first time (well, for the lazy execution reason) but it finishes and returns results. 
However, the weird thing is that after I run the same query again, I get the error: 
"java.lang.StackOverflowError".

I Googled it but didn't find the error appearing with table caching and 
querying.
Any hint is appreciated.


-- 
Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问

候, تحياتي. Mohamed Nadjib Mami
PhD Student - EIS Department - Bonn University, Germany.
Website <http://www.mohamednadjibmami.com>.
LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.




--
Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, 
تحياتي. Mohamed Nadjib Mami

PhD Student - EIS Department - Bonn University, Germany.
Website <http://www.mohamednadjibmami.com>.
LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.


Re: Spark SQL - java.lang.StackOverflowError after caching table

2016-03-24 Thread Ted Yu
Can you obtain output from explain(true) on the query after cacheTable()
call ?

Potentially related JIRA:

[SPARK-13657] [SQL] Support parsing very long AND/OR expressions


On Thu, Mar 24, 2016 at 12:55 PM, Mohamed Nadjib MAMI <m...@iai.uni-bonn.de>
wrote:

> Here is the stack trace: http://pastebin.com/ueHqiznH
>
> Here's the code:
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> val table = sqlContext.read.parquet("hdfs://...parquet_table")
> table.registerTempTable("table")
>
> sqlContext.sql("...complex query...").show() /** works */
>
> sqlContext.cacheTable("table")
>
> sqlContext.sql("...complex query...").show() /** works */
>
> sqlContext.sql("...complex query...").show() /** fails */
>
>
>
> On 24.03.2016 13:40, Ted Yu wrote:
>
> Can you pastebin the stack trace ?
>
> If you can show snippet of your code, that would help give us more clue.
>
> Thanks
>
>
> On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI <m...@iai.uni-bonn.de> 
> <m...@iai.uni-bonn.de> wrote:
>
> Hi all,
> I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a 
> problem with table caching (sqlContext.cacheTable()), using spark-shell of 
> Spark 1.5.1.
>
> After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes 
> longer the first time (well, for the lazy execution reason) but it finishes 
> and returns results. However, the weird thing is that after I run the same 
> query again, I get the error: "java.lang.StackOverflowError".
>
> I Googled it but didn't find the error appearing with table caching and 
> querying.
> Any hint is appreciated.
>
>
>
> --
> Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, تحياتي.
> Mohamed Nadjib Mami
> PhD Student - EIS Department - Bonn University, Germany.
> Website <http://www.mohamednadjibmami.com>.
> LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.
>
>


Re: Spark SQL - java.lang.StackOverflowError after caching table

2016-03-24 Thread Mohamed Nadjib MAMI

Here is the stack trace: http://pastebin.com/ueHqiznH

Here's the code:

   val sqlContext = new org.apache.spark.sql.SQLContext(sc)

   val table = sqlContext.read.parquet("hdfs://...parquet_table")
   table.registerTempTable("table")

   sqlContext.sql("...complex query...").show() /** works */

   sqlContext.cacheTable("table")

   sqlContext.sql("...complex query...").show() /** works */

   sqlContext.sql("...complex query...").show() /** fails */



On 24.03.2016 13:40, Ted Yu wrote:

Can you pastebin the stack trace ?

If you can show snippet of your code, that would help give us more clue.

Thanks


On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI <m...@iai.uni-bonn.de> wrote:

Hi all,
I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a 
problem with table caching (sqlContext.cacheTable()), using spark-shell of 
Spark 1.5.1.

After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes longer the 
first time (well, for the lazy execution reason) but it finishes and returns results. 
However, the weird thing is that after I run the same query again, I get the error: 
"java.lang.StackOverflowError".

I Googled it but didn't find the error appearing with table caching and 
querying.
Any hint is appreciated.


--
Regards, Grüße, Cordialement, Recuerdos, Saluti, προσρήσεις, 问候, 
تحياتي. Mohamed Nadjib Mami

PhD Student - EIS Department - Bonn University, Germany.
Website <http://www.mohamednadjibmami.com>.
LinkedIn <http://fr.linkedin.com/in/mohamednadjibmami>.


Re: Spark SQL - java.lang.StackOverflowError after caching table

2016-03-24 Thread Ted Yu
Can you pastebin the stack trace ?

If you can show snippet of your code, that would help give us more clue. 

Thanks

> On Mar 24, 2016, at 2:43 AM, Mohamed Nadjib MAMI <m...@iai.uni-bonn.de> wrote:
> 
> Hi all,
> I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing a 
> problem with table caching (sqlContext.cacheTable()), using spark-shell of 
> Spark 1.5.1.
> 
> After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) takes 
> longer the first time (well, for the lazy execution reason) but it finishes 
> and returns results. However, the weird thing is that after I run the same 
> query again, I get the error: "java.lang.StackOverflowError".
> 
> I Googled it but didn't find the error appearing with table caching and 
> querying. 
> Any hint is appreciated.
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL - java.lang.StackOverflowError after caching table

2016-03-24 Thread Mohamed Nadjib MAMI

Hi all,
I'm running SQL queries (sqlContext.sql()) on Parquet tables and facing 
a problem with table caching (sqlContext.cacheTable()), using 
spark-shell of Spark 1.5.1.


After I run the sqlContext.cacheTable(table), the sqlContext.sql(query) 
takes longer the first time (well, for the lazy execution reason) but it 
finishes and returns results. However, the weird thing is that after I 
run the same query again, I get the error: "java.lang.StackOverflowError".


I Googled it but didn't find the error appearing with table caching and 
querying.

Any hint is appreciated.



Re: Spark Streaming: java.lang.StackOverflowError

2016-03-01 Thread Cody Koeninger
What code is triggering the stack overflow?

On Mon, Feb 29, 2016 at 11:13 PM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I am getting below error in spark-streaming application, i am using kafka
> for input stream. When i was doing with socket, it was working fine. But
> when i changed to kafka it's giving error. Anyone has idea why it's
> throwing error, do i need to change my batch time and check pointing time?
>
>
>
> *ERROR StreamingContext: Error starting the context, marking it as
> stoppedjava.lang.StackOverflowError*
>
> My program:
>
> def main(args: Array[String]): Unit = {
>
> // Function to create and setup a new StreamingContext
> def functionToCreateContext(): StreamingContext = {
>   val conf = new SparkConf().setAppName("HBaseStream")
>   val sc = new SparkContext(conf)
>   // create a StreamingContext, the main entry point for all streaming 
> functionality
>   val ssc = new StreamingContext(sc, Seconds(5))
>   val brokers = args(0)
>   val topics= args(1)
>   val topicsSet = topics.split(",").toSet
>   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>   val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
> ssc, kafkaParams, topicsSet)
>
>   val inputStream = messages.map(_._2)
> //val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
>   ssc.checkpoint(checkpointDirectory)
>   inputStream.print(1)
>   val parsedStream = inputStream
> .map(line => {
>   val splitLines = line.split(",")
>   (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
> })
>   import breeze.linalg.{DenseVector => BDV}
>   import scala.util.Try
>
>   val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
> (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>   prev.map(_ +: current).orElse(Some(current))
> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
> })
>
>   state.checkpoint(Duration(1))
>   state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>   ssc
> }
> // Get StreamingContext from checkpoint data or create a new one
> val context = StreamingContext.getOrCreate(checkpointDirectory, 
> functionToCreateContext _)
>   }
> }
>
> Regards,
> ~Vinti
>


Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError

2015-07-24 Thread Jadhav Shweta
Hi,

I am trying one transformation by calling scala method
this scala method returns MutableList[AvroObject]

def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): 
scala.collection.mutable.MutableList[AvroObject] 

Hence, the output of transaformation is RDD[MutableList[AvroObject]]

But I want o/p as RDD[AvroObject]

I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject]

var uA = sparkContext.accumulableCollection[MutableList[AvroObject], 
universe](MutableList[AvroObject]())
rdd_list_avroObj.foreach(u = {
uA ++= u
})
var uRDD = sparkContext.parallelize(uA.value)

Its failing on large dataset with following error

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359)
at 
java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

I have two queries regarding this issue:
Option 1: REplacement of accumulator
Option 2: In scala method instead of returning List[AvroObject] can I send 
multiple AvroObject. SO that I'll get RDD[AvroObject]

Note:
I am using Saprk 1.3.0
Input DataSize 200GB
Cluster 3 Machines(2 Cores, 8GB)
Spark running in YARN Mode

Thanks  Regards
Shweta Jadhav

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError

2015-07-15 Thread Jadhav Shweta
Hi,

I am trying one transformation by calling scala method
this scala method returns MutableList[AvroObject]

def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): 
scala.collection.mutable.MutableList[AvroObject] 

Hence, the output of transaformation is RDD[MutableList[AvroObject]]

But I want o/p as RDD[AvroObject]

I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject]

var uA = sparkContext.accumulableCollection[MutableList[AvroObject], 
universe](MutableList[AvroObject]())
rdd_list_avroObj.foreach(u = {
uA ++= u
})
var uRDD = sparkContext.parallelize(uA.value)

Its failing on large dataset with following error

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359)
at 
java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

I have two queries regarding this issue:
Option 1: REplacement of accumulator
Option 2: In scala method instead of returning List[AvroObject] can I send 
multiple AvroObject. SO that I'll get RDD[AvroObject]

Note:
I am using Saprk 1.3.0
Input DataSize 200GB
Cluster 3 Machines(2 Cores, 8GB)
Spark running in YARN Mode

Thanks  Regards
Shweta Jadhav
Tata Consultancy Services Limited
Cell:- +91-9867515614
Mailto: jadhav.shw...@tcs.com
Website: http://www.tcs.com

Experience certainty.   IT Services
Business Solutions
Consulting

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




java.lang.StackOverflowError when doing spark sql

2015-02-19 Thread bit1...@163.com
I am using spark 1.2.0(prebuild with hadoop 2.4) on windows7

I found a same bug here https://issues.apache.org/jira/browse/SPARK-4208,but it 
is still open, is there a workaround for this? Thanks!

The stack trace:
StackOverflow Exception occurs
Exception in thread main java.lang.StackOverflowError 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 









java.lang.stackoverflowerror when running Spark shell

2014-09-23 Thread mrshen
I tested the examples according to the docs in spark sql programming guide,
but the java.lang.stackoverflowerror occurred everytime I called
sqlContext.sql(...).

Meanwhile, it worked fine in a hiveContext. The Hadoop version is 2.2.0, the
Spark version is 1.1.0, built with Yarn, Hive.. I would be grateful if u
could give me a clue.

http://apache-spark-user-list.1001560.n3.nabble.com/file/n14979/img.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-stackoverflowerror-when-running-Spark-shell-tp14979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread Tathagata Das
The long lineage causes a long/deep Java object tree (DAG of RDD objects),
which needs to be serialized as part of the task creation. When
serializing, the whole object DAG needs to be traversed leading to the
stackoverflow error.

TD


On Mon, Aug 11, 2014 at 7:14 PM, randylu randyl...@gmail.com wrote:

 hi, TD. I also fall into the trap of long lineage, and your suggestions do
 work well. But i don't understand why the long lineage can cause stackover,
 and where it takes effect?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread randylu
hi, TD. Thanks very much! I got it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.StackOverflowError when calling count()

2014-08-11 Thread randylu
hi, TD. I also fall into the trap of long lineage, and your suggestions do
work well. But i don't understand why the long lineage can cause stackover,
and where it takes effect?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.StackOverflowError

2014-08-05 Thread Chengi Liu
Hi,
  I am doing some basic preprocessing in pyspark (local mode as follows):

files = [ input files]
def read(filename,sc):
  #process file
  return rdd

if __name__ ==__main__:
   conf = SparkConf()
  conf.setMaster('local')
  sc = SparkContext(conf =conf)
  sc.setCheckpointDir(root+temp/)

  data = sc.parallelize([])

  for i,f in enumerate(files):

data = data.union(read(f,sc))
if i ==20:
  data.checkpoint()
  data.count()
if i == 500:break
  #print data.count()
  #rdd_1 = read(files[0],sc)
  data.saveAsTextFile(root+output/)


But I see this error:
  keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File
/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File
/Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o9564.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.lang.StackOverflowError
java.io.Bits.putInt(Bits.java:93)
java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)


Re: java.lang.StackOverflowError

2014-08-05 Thread Chengi Liu
Bump

On Tuesday, August 5, 2014, Chengi Liu chengi.liu...@gmail.com wrote:

 Hi,
   I am doing some basic preprocessing in pyspark (local mode as follows):

 files = [ input files]
 def read(filename,sc):
   #process file
   return rdd

 if __name__ ==__main__:
conf = SparkConf()
   conf.setMaster('local')
   sc = SparkContext(conf =conf)
   sc.setCheckpointDir(root+temp/)

   data = sc.parallelize([])

   for i,f in enumerate(files):

 data = data.union(read(f,sc))
 if i ==20:
   data.checkpoint()
   data.count()
 if i == 500:break
   #print data.count()
   #rdd_1 = read(files[0],sc)
   data.saveAsTextFile(root+output/)


 But I see this error:
   keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o9564.saveAsTextFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.lang.StackOverflowError
 java.io.Bits.putInt(Bits.java:93)

 java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)



Re: java.lang.StackOverflowError

2014-08-05 Thread Davies Liu
Could you create an re-producable script (and data) to allow us to
investigate this?

Davies

On Tue, Aug 5, 2014 at 1:10 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Hi,
   I am doing some basic preprocessing in pyspark (local mode as follows):

 files = [ input files]
 def read(filename,sc):
   #process file
   return rdd

 if __name__ ==__main__:
conf = SparkConf()
   conf.setMaster('local')
   sc = SparkContext(conf =conf)
   sc.setCheckpointDir(root+temp/)

   data = sc.parallelize([])

   for i,f in enumerate(files):

 data = data.union(read(f,sc))

union is an lazy transformation, you could union them at once,

rdds = [read(f,sc) for f in files]
rdd = sc.union(rdds)

 if i ==20:
   data.checkpoint()
   data.count()
 if i == 500:break
   #print data.count()
   #rdd_1 = read(files[0],sc)
   data.saveAsTextFile(root+output/)


 But I see this error:
   keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/ping/Desktop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o9564.saveAsTextFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.lang.StackOverflowError
 java.io.Bits.putInt(Bits.java:93)
 java.io.ObjectOutputStream$BlockDataOutputStream.writeInt(ObjectOutputStream.java:1927)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.StackOverflowError when calling count()

2014-07-26 Thread Tathagata Das
Responses inline.

On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 la...@sigmoidanalytics.com wrote:
 Hi,
 Thanks TD for your reply. I am still not able to resolve the problem for my
 use case.
 I have let's say 1000 different RDD's, and I am applying a transformation
 function on each RDD and I want the output of all rdd's combined to a single
 output RDD. For, this I am doing the following:

 *Loop Start*
 tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD();
 *//creating new rdd in every loop*
 outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into
 a single RDD*

 *//after every 10 iteration, in order to truncate the lineage*
 cachRDD = outRDD.cache();
 cachRDD.checkpoint();
 System.out.println(cachRDD.collect().size());
 outRDD = new JavaRDDString(cachRDD.rdd(),cachRDD.classTag());
 *Loop Ends*

 *//finally after whole computation*
 outRDD.saveAsTextFile(..)

 The above operations is overall slow, runs successfully when performed less
 iterations i.e. ~100. But, when the num of iterations in increased to ~1000,
 The whole job is taking more than *30 mins* and ultimately break down giving
 OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am
 running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I think this is happening because how you are caching the output RDD
that are being generated repeatedly. In every iteration, it is
building this new union RDD which contains the data of the previous
union RDD plus some new data. Since each of these union RDDs are
cached, the underlying data is being cached repeatedly. So the cached
Iteration 1: union RDD: X MB
Iteration 2: union RDD: 2X MB   |  Total size cached: 3X
Iteration 3: union RDD: 3X MB   |  Total size cached: 6X MB
Iteration 4: union RDD: 4X MB   |  Total size cached: 10X MB
...

If you do the math, that is a quadratic increase in the size of the
data being processed and cached, wrt the # iterations. This leads to
both increase in run time and memory usage.


 I also observed that when collect() operation is performed, the number of
 tasks keeps on increasing as the loop proceeds, like on first collect() 22
 total task, then ~40 total tasks ... ~300 task for single collect.
 Does this means that all the operations are repeatedly performed, and RDD
 lineage is not broken??

Same reason as above. Each union RDD is build by appending the
partitions of the previous union RDD plus the new set of partitions
(~22 partitions). So Nth union RDD has N * 22 partitions, hence that
many tasks.
You could change this by also doing repartitioning when you want to
cache+checkpoint the union RDD (therefore,
outRDD.repartition(100).cache().checkpoint().count()).

And do you really need all the data to be collected at the driver? If
you are doing the cachRDD.collect() just to forced the checkpoint,
then use cachRDD.count()


 Can you please elaborate on the point from your last post i.e. how to
 perform: *Create a modified RDD R` which has the same data as RDD R but
 does not have the lineage. This is done by creating a new BlockRDD using the
 ids of blocks of data representing the in-memory R*

Please refer to the lines in the function:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74
What those lines do is save the data of the associated RDD to HDFS
files, and then create a new CheckpointRDD from the same files.Then
the dependency of the associated RDD is changed to use the new RDD.
This truncates the lineage because the associated RDD's parent is not
the new RDD which has a very short lineage (links to checkpoint
files). And the previous dependencies (parent RDDs) are forgotten.

This implementation can be modified by forcing the data of the
associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And
then instead of CheckpointRDD, you can create a new BlockRDD (using
the names of the blocks that are used to cache the RDD), which is then
set as the new dependency. This is definitely a behind-the-public API
implementation, that is




 -
 Lalit Yadav
 la...@sigmoidanalytics.com
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: java.lang.StackOverflowError when calling count()

2014-07-23 Thread lalit1303
Hi,
Thanks TD for your reply. I am still not able to resolve the problem for my
use case.
I have let's say 1000 different RDD's, and I am applying a transformation
function on each RDD and I want the output of all rdd's combined to a single
output RDD. For, this I am doing the following:

*Loop Start*
tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); 
*//creating new rdd in every loop* 
outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into
a single RDD*

*//after every 10 iteration, in order to truncate the lineage*
cachRDD = outRDD.cache();
cachRDD.checkpoint();
System.out.println(cachRDD.collect().size());
outRDD = new JavaRDDString(cachRDD.rdd(),cachRDD.classTag());
*Loop Ends*

*//finally after whole computation*
outRDD.saveAsTextFile(..)

The above operations is overall slow, runs successfully when performed less
iterations i.e. ~100. But, when the num of iterations in increased to ~1000,
The whole job is taking more than *30 mins* and ultimately break down giving
OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am
running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I also observed that when collect() operation is performed, the number of
tasks keeps on increasing as the loop proceeds, like on first collect() 22
total task, then ~40 total tasks ... ~300 task for single collect.
Does this means that all the operations are repeatedly performed, and RDD
lineage is not broken?? 


Can you please elaborate on the point from your last post i.e. how to
perform: *Create a modified RDD R` which has the same data as RDD R but
does not have the lineage. This is done by creating a new BlockRDD using the
ids of blocks of data representing the in-memory R*



-
Lalit Yadav
la...@sigmoidanalytics.com
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: java.lang.StackOverflowError when calling count()

2014-05-14 Thread Nicholas Chammas
Would cache() + count() every N iterations work just as well as
checkPoint() + count() to get around this issue?

We're basically trying to get Spark to avoid working on too lengthy a
lineage at once, right?

Nick


On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng men...@gmail.com wrote:

 After checkPoint, call count directly to materialize it. -Xiangrui

 On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:
  We are running into same issue. After 700 or so files the stack
 overflows,
  cache, persist  checkpointing dont help.
  Basically checkpointing only saves the RDD when it is materialized  it
 only
  materializes in the end, then it runs out of stack.
 
  Regards
  Mayur
 
  Mayur Rustagi
  Ph: +1 (760) 203 3257
  http://www.sigmoidanalytics.com
  @mayur_rustagi
 
 
 
  On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng men...@gmail.com
 wrote:
 
  You have a long lineage that causes the StackOverflow error. Try
  rdd.checkPoint() and rdd.count() for every 20~30 iterations.
  checkPoint can cut the lineage. -Xiangrui
 
  On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote:
   Dear Sparkers:
  
   I am using Python spark of version 0.9.0 to implement some iterative
   algorithm. I got some errors shown at the end of this email. It seems
   that
   it's due to the Java Stack Overflow error. The same error has been
   duplicated on a mac desktop and a linux workstation, both running the
   same
   version of Spark.
  
   The same line of code works correctly after quite some iterations. At
   the
   line of error, rdd__new.count() could be 0. (In some previous rounds,
   this
   was also 0 without any problem).
  
   Any thoughts on this?
  
   Thank you very much,
   - Guanhua
  
  
   
   CODE:print round, round, rdd__new.count()
   
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 542, in count
   14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
   java.lang.StackOverflowError [duplicate 1]
   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
   aborting job
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 533, in sum
   14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
   FAILED
   from TID 1774 because its task set is gone
   return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 499, in reduce
   vals = self.mapPartitions(func).collect()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 463, in collect
   bytesInJava = self._jrdd.collect().iterator()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
   line 537, in __call__
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
   line 300, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling
   o4317.collect.
   : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
   times
   (most recent failure: Exception failure: java.lang.StackOverflowError)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
   at
  
  
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at
  
   org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at scala.Option.foreach(Option.scala:236)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
  
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260

Re: java.lang.StackOverflowError when calling count()

2014-05-14 Thread lalit1303
If we do cache() + count() after say every 50 iterations. The whole process
becomes very slow.
I have tried checkpoint() , cache() + count(), saveAsObjectFiles().
Nothing works.
Materializing RDD's lead to drastic decrease in performance  if we don't
materialize, we face stackoverflowerror.


On Wed, May 14, 2014 at 10:25 AM, Nick Chammas [via Apache Spark User List]
ml-node+s1001560n5683...@n3.nabble.com wrote:

 Would cache() + count() every N iterations work just as well as
 checkPoint() + count() to get around this issue?

 We're basically trying to get Spark to avoid working on too lengthy a
 lineage at once, right?

 Nick


 On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=5683i=0
  wrote:

 After checkPoint, call count directly to materialize it. -Xiangrui

 On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=5683i=1
 wrote:
  We are running into same issue. After 700 or so files the stack
 overflows,
  cache, persist  checkpointing dont help.
  Basically checkpointing only saves the RDD when it is materialized  it
 only
  materializes in the end, then it runs out of stack.
 
  Regards
  Mayur
 
  Mayur Rustagi
  Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257+1
 (760) 203 3257
  http://www.sigmoidanalytics.com
  @mayur_rustagi

 
 
 
  On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng [hidden 
  email]http://user/SendEmail.jtp?type=nodenode=5683i=2
 wrote:
 
  You have a long lineage that causes the StackOverflow error. Try
  rdd.checkPoint() and rdd.count() for every 20~30 iterations.
  checkPoint can cut the lineage. -Xiangrui
 
  On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan [hidden 
  email]http://user/SendEmail.jtp?type=nodenode=5683i=3
 wrote:
   Dear Sparkers:
  
   I am using Python spark of version 0.9.0 to implement some iterative
   algorithm. I got some errors shown at the end of this email. It seems
   that
   it's due to the Java Stack Overflow error. The same error has been
   duplicated on a mac desktop and a linux workstation, both running the
   same
   version of Spark.
  
   The same line of code works correctly after quite some iterations. At
   the
   line of error, rdd__new.count() could be 0. (In some previous rounds,
   this
   was also 0 without any problem).
  
   Any thoughts on this?
  
   Thank you very much,
   - Guanhua
  
  
   
   CODE:print round, round, rdd__new.count()
   
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 542, in count
   14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
   java.lang.StackOverflowError [duplicate 1]
   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
   aborting job
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 533, in sum
   14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
   FAILED
   from TID 1774 because its task set is gone
   return self.mapPartitions(lambda x:
 [sum(x)]).reduce(operator.add)
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 499, in reduce
   vals = self.mapPartitions(func).collect()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 463, in collect
   bytesInJava = self._jrdd.collect().iterator()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
   line 537, in __call__
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
   line 300, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling
   o4317.collect.
   : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed
 1
   times
   (most recent failure: Exception failure:
 java.lang.StackOverflowError)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
   at
  
  
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at
  
   org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at scala.Option.foreach

Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Mayur Rustagi
We are running into same issue. After 700 or so files the stack overflows,
cache, persist  checkpointing dont help.
Basically checkpointing only saves the RDD when it is materialized  it
only materializes in the end, then it runs out of stack.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng men...@gmail.com wrote:

 You have a long lineage that causes the StackOverflow error. Try
 rdd.checkPoint() and rdd.count() for every 20~30 iterations.
 checkPoint can cut the lineage. -Xiangrui

 On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote:
  Dear Sparkers:
 
  I am using Python spark of version 0.9.0 to implement some iterative
  algorithm. I got some errors shown at the end of this email. It seems
 that
  it's due to the Java Stack Overflow error. The same error has been
  duplicated on a mac desktop and a linux workstation, both running the
 same
  version of Spark.
 
  The same line of code works correctly after quite some iterations. At the
  line of error, rdd__new.count() could be 0. (In some previous rounds,
 this
  was also 0 without any problem).
 
  Any thoughts on this?
 
  Thank you very much,
  - Guanhua
 
 
  
  CODE:print round, round, rdd__new.count()
  
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
  line 542, in count
  14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
  java.lang.StackOverflowError [duplicate 1]
  return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
  aborting job
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
  line 533, in sum
  14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
 FAILED
  from TID 1774 because its task set is gone
  return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
  line 499, in reduce
  vals = self.mapPartitions(func).collect()
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
  line 463, in collect
  bytesInJava = self._jrdd.collect().iterator()
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 537, in __call__
File
 
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
  line 300, in get_return_value
  py4j.protocol.Py4JJavaError: An error occurred while calling
 o4317.collect.
  : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
 times
  (most recent failure: Exception failure: java.lang.StackOverflowError)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
  ==
  The stack overflow error is shown as follows:
  ==
 
  14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
  java.lang.StackOverflowError
  at java.util.zip.Inflater.inflate(Inflater.java:259)
  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java

Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Guanhua Yan
Thanks Xiangrui. After some debugging efforts, it turns out that the
problem results from a bug in my code. But it's good to know that a long
lineage could also lead to this problem. I will also try checkpointing to
see whether the performance can be improved.

Best regards,
- Guanhua

On 5/13/14 12:10 AM, Xiangrui Meng men...@gmail.com wrote:

You have a long lineage that causes the StackOverflow error. Try
rdd.checkPoint() and rdd.count() for every 20~30 iterations.
checkPoint can cut the lineage. -Xiangrui

On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote:
 Dear Sparkers:

 I am using Python spark of version 0.9.0 to implement some iterative
 algorithm. I got some errors shown at the end of this email. It seems
that
 it's due to the Java Stack Overflow error. The same error has been
 duplicated on a mac desktop and a linux workstation, both running the
same
 version of Spark.

 The same line of code works correctly after quite some iterations. At
the
 line of error, rdd__new.count() could be 0. (In some previous rounds,
this
 was also 0 without any problem).

 Any thoughts on this?

 Thank you very much,
 - Guanhua


 
 CODE:print round, round, rdd__new.count()
 
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 542, in count
 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
 java.lang.StackOverflowError [duplicate 1]
 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
 aborting job
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 533, in sum
 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
FAILED
 from TID 1774 because its task set is gone
 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 499, in reduce
 vals = self.mapPartitions(func).collect()
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
rdd.py,
 line 463, in collect
 bytesInJava = self._jrdd.collect().iterator()
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File
 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
o4317.collect.
 : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
times
 (most recent failure: Exception failure: java.lang.StackOverflowError)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
a:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSch
eduler$$abortStage(DAGScheduler.scala:1026)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
GScheduler.scala:619)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
GScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
 at
 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:6
19)
 at
 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun
$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
Dispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
a:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
ava:107)

 ==
 The stack overflow error is shown as follows:
 ==

 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
 java.lang.StackOverflowError
 at java.util.zip.Inflater.inflate(Inflater.java:259)
 at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
 at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
 at
 
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:231
0)
 at
 
java.io.ObjectInputStream

Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Mayur Rustagi
Count causes the overall performance to drop drastically. Infact beyond 50
files it starts to hang. if i force materialization.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, May 13, 2014 at 9:34 PM, Xiangrui Meng men...@gmail.com wrote:

 After checkPoint, call count directly to materialize it. -Xiangrui

 On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:
  We are running into same issue. After 700 or so files the stack
 overflows,
  cache, persist  checkpointing dont help.
  Basically checkpointing only saves the RDD when it is materialized  it
 only
  materializes in the end, then it runs out of stack.
 
  Regards
  Mayur
 
  Mayur Rustagi
  Ph: +1 (760) 203 3257
  http://www.sigmoidanalytics.com
  @mayur_rustagi
 
 
 
  On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng men...@gmail.com
 wrote:
 
  You have a long lineage that causes the StackOverflow error. Try
  rdd.checkPoint() and rdd.count() for every 20~30 iterations.
  checkPoint can cut the lineage. -Xiangrui
 
  On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan gh...@lanl.gov wrote:
   Dear Sparkers:
  
   I am using Python spark of version 0.9.0 to implement some iterative
   algorithm. I got some errors shown at the end of this email. It seems
   that
   it's due to the Java Stack Overflow error. The same error has been
   duplicated on a mac desktop and a linux workstation, both running the
   same
   version of Spark.
  
   The same line of code works correctly after quite some iterations. At
   the
   line of error, rdd__new.count() could be 0. (In some previous rounds,
   this
   was also 0 without any problem).
  
   Any thoughts on this?
  
   Thank you very much,
   - Guanhua
  
  
   
   CODE:print round, round, rdd__new.count()
   
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 542, in count
   14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
   java.lang.StackOverflowError [duplicate 1]
   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
   aborting job
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 533, in sum
   14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
   FAILED
   from TID 1774 because its task set is gone
   return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 499, in reduce
   vals = self.mapPartitions(func).collect()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py,
   line 463, in collect
   bytesInJava = self._jrdd.collect().iterator()
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
   line 537, in __call__
 File
  
  
 /home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
   line 300, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling
   o4317.collect.
   : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
   times
   (most recent failure: Exception failure: java.lang.StackOverflowError)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
   at
  
  
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at
  
   org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
   at scala.Option.foreach(Option.scala:236)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
   at
  
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
  
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at
 scala.concurrent.forkjoin.ForkJoinTask.doExec

java.lang.StackOverflowError when calling count()

2014-05-12 Thread Guanhua Yan
Dear Sparkers:

I am using Python spark of version 0.9.0 to implement some iterative
algorithm. I got some errors shown at the end of this email. It seems that
it's due to the Java Stack Overflow error. The same error has been
duplicated on a mac desktop and a linux workstation, both running the same
version of Spark.

The same line of code works correctly after quite some iterations. At the
line of error, rdd__new.count() could be 0. (In some previous rounds, this
was also 0 without any problem).

Any thoughts on this?

Thank you very much,
- Guanhua



CODE:print round, round, rdd__new.count()

  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 542, in count
14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
java.lang.StackOverflowError [duplicate 1]
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
aborting job
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 533, in sum
14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
from TID 1774 because its task set is gone
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 499, in reduce
vals = self.mapPartitions(func).collect()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd
.py, line 463, in collect
bytesInJava = self._jrdd.collect().iterator()
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/java_gateway.py, line 537, in __call__
  File 
/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.
8.1-src.zip/py4j/protocol.py, line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
: org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$
DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu
ler$$abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGSc
heduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$re
ceive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDis
patcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)

==
The stack overflow error is shown as follows:
==

14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
java.lang.StackOverflowError
at java.util.zip.Inflater.inflate(Inflater.java:259)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at 
java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2
323)
at 
java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.jav
a:2818)
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798