I think the 1.1 will be really helpful for you, it's all compatitble
with 1.0, so it's
not hard to upgrade to 1.1.

On Mon, Sep 15, 2014 at 2:35 AM, Chengi Liu <chengi.liu...@gmail.com> wrote:
> So.. same result with parallelize (matrix,1000)
> with broadcast.. seems like I got jvm core dump :-/
> 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978
> with 19.2 GB RAM
> 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
> host:43360 with 19.2 GB RAM
> Unhandled exception
> Unhandled exception
> Type=Segmentation error vmState=0x00000000
> J9Generic_Signal_Number=00000004 Signal_Number=0000000b Error_Value=00000000
> Signal_Code=00000001
> Handler1=00002AAAABF53760 Handler2=00002AAAAC3069D0
> InaccessibleAddress=0000000000000000
> RDI=00002AB9505F2698 RSI=00002AABAE2C54D8 RAX=00002AB7CE6009A0
> RBX=00002AB7CE6009C0
> RCX=00000000FFC7FFE0 RDX=00002AB8509726A8 R8=000000007FE41FF0
> R9=0000000000002000
> R10=00002AAAADA318A0 R11=00002AB850959520 R12=00002AB5EF97DD88
> R13=00002AB5EF97BD88
> R14=00002AAAAC0CE940 R15=00002AB5EF97BD88
> RIP=0000000000000000 GS=0000 FS=0000 RSP=00000000007367A0
> EFlags=0000000000210282 CS=0033 RBP=0000000000BCDB00 ERR=0000000000000014
> TRAPNO=000000000000000E OLDMASK=0000000000000000 CR2=0000000000000000
> xmm0 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm1 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm2 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm3 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm4 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm5 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm6 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
> xmm7 f180c714f8e2a139 (f: 4175601920.000000, d: -5.462583e+238)
> xmm8 00000000428e8000 (f: 1116635136.000000, d: 5.516911e-315)
> xmm9 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm10 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm11 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm12 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm13 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm14 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> xmm15 0000000000000000 (f: 0.000000, d: 0.000000e+00)
> Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c)
> CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM)
> ----------- Stack Backtrace -----------
> (0x00002AAAAC2FA122 [libj9prt26.so+0x13122])
> (0x00002AAAAC30779F [libj9prt26.so+0x2079f])
> (0x00002AAAAC2F9E6B [libj9prt26.so+0x12e6b])
> (0x00002AAAAC2F9F67 [libj9prt26.so+0x12f67])
> (0x00002AAAAC30779F [libj9prt26.so+0x2079f])
> (0x00002AAAAC2F9A8B [libj9prt26.so+0x12a8b])
> (0x00002AAAABF52C9D [libj9vm26.so+0x1ac9d])
> (0x00002AAAAC30779F [libj9prt26.so+0x2079f])
> (0x00002AAAABF52F56 [libj9vm26.so+0x1af56])
> (0x00002AAAABF96CA0 [libj9vm26.so+0x5eca0])
> ---------------------------------------
> JVMDUMP039I
> JVMDUMP032I
>
>
> Note, this still is with the framesize I modified in the last email thread?
>
> On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>>
>> Try:
>>
>> rdd = sc.broadcast(matrix)
>>
>> Or
>>
>> rdd = sc.parallelize(matrix,100) // Just increase the number of slices,
>> give it a try.
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu <chengi.liu...@gmail.com>
>> wrote:
>>>
>>> Hi Akhil,
>>>   So with your config (specifically with set("spark.akka.frameSize ",
>>> "10000000")) , I see the error:
>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
>>> (10485760 bytes). Consider using broadcast variables for large values.
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>> at org.apache.spark
>>>
>>> So, I changed
>>> set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ",
>>> "1000000000")
>>> but now I get the same error?
>>>
>>> y4j.protocol.Py4JJavaError: An error occurred while calling
>>> o28.trainKMeansModel.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: All
>>> masters are unresponsive! Giving up.
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched
>>>
>>>
>>> along with following:
>>> 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted
>>> any resources; check your cluster UI to ensure that workers are registered
>>> and have sufficient memory
>>> 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master
>>> spark://host:7077...
>>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
>>> Association failed with [akka.tcp://sparkMaster@host:7077]
>>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
>>> Association failed with [akka.tcp://sparkMaster@host:7077]
>>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
>>> Association failed with [akka.tcp://sparkMaster@host:7077]
>>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException:
>>> Association failed with [akka.tcp://sparkMaster@host:7077]
>>> 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted
>>> any resources; check your cluster UI to ensure that workers are registered
>>> and have sufficient memory
>>> 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted
>>> any resources; check your cluster UI to ensure that workers are registered
>>> and have sufficient memory
>>> 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been
>>> killed. Reason: All masters are unresponsive! Giving up.
>>>
>>>
>>> :-(
>>>
>>> On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>>
>>>> Can you give this a try:
>>>>
>>>>> conf = SparkConf().set("spark.executor.memory",
>>>>> "32G").set("spark.akka.frameSize ",
>>>>> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")
>>>>> sc = SparkContext(conf = conf)
>>>>> rdd = sc.parallelize(matrix,5)
>>>>> from pyspark.mllib.clustering import KMeans
>>>>> from math import sqrt
>>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>>> initializationMode="random")
>>>>> def error(point):
>>>>>     center = clusters.centers[clusters.predict(point)]
>>>>>     return sqrt(sum([x**2 for x in (point - center)]))
>>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
>>>>> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>>
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com>
>>>> wrote:
>>>>>
>>>>> And the thing is code runs just fine if I reduce the number of rows in
>>>>> my data?
>>>>>
>>>>> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> I am using spark1.0.2.
>>>>>> This is my work cluster.. so I can't setup a new version readily...
>>>>>> But right now, I am not using broadcast ..
>>>>>>
>>>>>>
>>>>>> conf = SparkConf().set("spark.executor.memory",
>>>>>> "32G").set("spark.akka.frameSize", "1000")
>>>>>> sc = SparkContext(conf = conf)
>>>>>> rdd = sc.parallelize(matrix,5)
>>>>>>
>>>>>> from pyspark.mllib.clustering import KMeans
>>>>>> from math import sqrt
>>>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>>>> initializationMode="random")
>>>>>> def error(point):
>>>>>>     center = clusters.centers[clusters.predict(point)]
>>>>>>     return sqrt(sum([x**2 for x in (point - center)]))
>>>>>>
>>>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
>>>>>> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>>>>
>>>>>>
>>>>>> executed by
>>>>>> spark-submit --master $SPARKURL clustering_example.py
>>>>>> --executor-memory 32G  --driver-memory 60G
>>>>>>
>>>>>> and the error I see
>>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>>> o26.trainKMeansModel.
>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> All masters are unresponsive! Giving up.
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>>>>>> at
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>>> at
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> at
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>>
>>>>>> and
>>>>>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to
>>>>>> akka.tcp://sparkMaster@hostname:7077:
>>>>>> akka.remote.EndpointAssociationException: Association failed with
>>>>>> [akka.tcp://sparkMaster@ hostname:7077]
>>>>>>
>>>>>> ??
>>>>>> Any suggestions??
>>>>>>
>>>>>>
>>>>>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hey Chengi,
>>>>>>>
>>>>>>> What's the version of Spark you are using? It have big improvements
>>>>>>> about broadcast in 1.1, could you try it?
>>>>>>>
>>>>>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com>
>>>>>>> wrote:
>>>>>>> > Any suggestions.. I am really blocked on this one
>>>>>>> >
>>>>>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu
>>>>>>> > <chengi.liu...@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> And when I use sparksubmit script, I get the following error:
>>>>>>> >>
>>>>>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>>>> >> o26.trainKMeansModel.
>>>>>>> >> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>>> >> failure: All
>>>>>>> >> masters are unresponsive! Giving up.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>> >> at
>>>>>>> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>>> >> at scala.Option.foreach(Option.scala:236)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>>>>>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>> >> at
>>>>>>> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> My spark submit code is
>>>>>>> >>
>>>>>>> >> conf = SparkConf().set("spark.executor.memory",
>>>>>>> >> "32G").set("spark.akka.frameSize", "1000")
>>>>>>> >> sc = SparkContext(conf = conf)
>>>>>>> >> rdd = sc.parallelize(matrix,5)
>>>>>>> >>
>>>>>>> >> from pyspark.mllib.clustering import KMeans
>>>>>>> >> from math import sqrt
>>>>>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>>>>> >> initializationMode="random")
>>>>>>> >> def error(point):
>>>>>>> >>     center = clusters.centers[clusters.predict(point)]
>>>>>>> >>     return sqrt(sum([x**2 for x in (point - center)]))
>>>>>>> >>
>>>>>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x
>>>>>>> >> + y)
>>>>>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>>>>> >>
>>>>>>> >> Which is executed as following:
>>>>>>> >> spark-submit --master $SPARKURL clustering_example.py
>>>>>>> >> --executor-memory
>>>>>>> >> 32G  --driver-memory 60G
>>>>>>> >>
>>>>>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu
>>>>>>> >> <chengi.liu...@gmail.com>
>>>>>>> >> wrote:
>>>>>>> >>>
>>>>>>> >>> How? Example please..
>>>>>>> >>> Also, if I am running this in pyspark shell.. how do i configure
>>>>>>> >>> spark.akka.frameSize ??
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das
>>>>>>> >>> <ak...@sigmoidanalytics.com>
>>>>>>> >>> wrote:
>>>>>>> >>>>
>>>>>>> >>>> When the data size is huge, you better of use the
>>>>>>> >>>> torrentBroadcastFactory.
>>>>>>> >>>>
>>>>>>> >>>> Thanks
>>>>>>> >>>> Best Regards
>>>>>>> >>>>
>>>>>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu
>>>>>>> >>>> <chengi.liu...@gmail.com>
>>>>>>> >>>> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>> Specifically the error I see when I try to operate on rdd
>>>>>>> >>>>> created by
>>>>>>> >>>>> sc.parallelize method
>>>>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>>> >>>>> failure:
>>>>>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds
>>>>>>> >>>>> spark.akka.frameSize
>>>>>>> >>>>> (10485760 bytes). Consider using broadcast variables for large
>>>>>>> >>>>> values.
>>>>>>> >>>>>
>>>>>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu
>>>>>>> >>>>> <chengi.liu...@gmail.com>
>>>>>>> >>>>> wrote:
>>>>>>> >>>>>>
>>>>>>> >>>>>> Hi,
>>>>>>> >>>>>>    I am trying to create an rdd out of large matrix....
>>>>>>> >>>>>> sc.parallelize
>>>>>>> >>>>>> suggest to use broadcast
>>>>>>> >>>>>> But when I do
>>>>>>> >>>>>>
>>>>>>> >>>>>> sc.broadcast(data)
>>>>>>> >>>>>> I get this error:
>>>>>>> >>>>>>
>>>>>>> >>>>>> Traceback (most recent call last):
>>>>>>> >>>>>>   File "<stdin>", line 1, in <module>
>>>>>>> >>>>>>   File
>>>>>>> >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", line
>>>>>>> >>>>>> 370, in broadcast
>>>>>>> >>>>>>     pickled = pickleSer.dumps(value)
>>>>>>> >>>>>>   File
>>>>>>> >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py",
>>>>>>> >>>>>> line 279, in dumps
>>>>>>> >>>>>>     def dumps(self, obj): return cPickle.dumps(obj, 2)
>>>>>>> >>>>>> SystemError: error return without exception set
>>>>>>> >>>>>> Help?
>>>>>>> >>>>>>
>>>>>>> >>>>>
>>>>>>> >>>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to