I think the 1.1 will be really helpful for you, it's all compatitble with 1.0, so it's not hard to upgrade to 1.1.
On Mon, Sep 15, 2014 at 2:35 AM, Chengi Liu <chengi.liu...@gmail.com> wrote: > So.. same result with parallelize (matrix,1000) > with broadcast.. seems like I got jvm core dump :-/ > 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978 > with 19.2 GB RAM > 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager > host:43360 with 19.2 GB RAM > Unhandled exception > Unhandled exception > Type=Segmentation error vmState=0x00000000 > J9Generic_Signal_Number=00000004 Signal_Number=0000000b Error_Value=00000000 > Signal_Code=00000001 > Handler1=00002AAAABF53760 Handler2=00002AAAAC3069D0 > InaccessibleAddress=0000000000000000 > RDI=00002AB9505F2698 RSI=00002AABAE2C54D8 RAX=00002AB7CE6009A0 > RBX=00002AB7CE6009C0 > RCX=00000000FFC7FFE0 RDX=00002AB8509726A8 R8=000000007FE41FF0 > R9=0000000000002000 > R10=00002AAAADA318A0 R11=00002AB850959520 R12=00002AB5EF97DD88 > R13=00002AB5EF97BD88 > R14=00002AAAAC0CE940 R15=00002AB5EF97BD88 > RIP=0000000000000000 GS=0000 FS=0000 RSP=00000000007367A0 > EFlags=0000000000210282 CS=0033 RBP=0000000000BCDB00 ERR=0000000000000014 > TRAPNO=000000000000000E OLDMASK=0000000000000000 CR2=0000000000000000 > xmm0 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm1 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm2 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm3 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm4 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm5 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm6 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) > xmm7 f180c714f8e2a139 (f: 4175601920.000000, d: -5.462583e+238) > xmm8 00000000428e8000 (f: 1116635136.000000, d: 5.516911e-315) > xmm9 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm10 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm11 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm12 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm13 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm14 0000000000000000 (f: 0.000000, d: 0.000000e+00) > xmm15 0000000000000000 (f: 0.000000, d: 0.000000e+00) > Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c) > CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM) > ----------- Stack Backtrace ----------- > (0x00002AAAAC2FA122 [libj9prt26.so+0x13122]) > (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) > (0x00002AAAAC2F9E6B [libj9prt26.so+0x12e6b]) > (0x00002AAAAC2F9F67 [libj9prt26.so+0x12f67]) > (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) > (0x00002AAAAC2F9A8B [libj9prt26.so+0x12a8b]) > (0x00002AAAABF52C9D [libj9vm26.so+0x1ac9d]) > (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) > (0x00002AAAABF52F56 [libj9vm26.so+0x1af56]) > (0x00002AAAABF96CA0 [libj9vm26.so+0x5eca0]) > --------------------------------------- > JVMDUMP039I > JVMDUMP032I > > > Note, this still is with the framesize I modified in the last email thread? > > On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: >> >> Try: >> >> rdd = sc.broadcast(matrix) >> >> Or >> >> rdd = sc.parallelize(matrix,100) // Just increase the number of slices, >> give it a try. >> >> >> >> Thanks >> Best Regards >> >> On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >>> >>> Hi Akhil, >>> So with your config (specifically with set("spark.akka.frameSize ", >>> "10000000")) , I see the error: >>> org.apache.spark.SparkException: Job aborted due to stage failure: >>> Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize >>> (10485760 bytes). Consider using broadcast variables for large values. >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>> at org.apache.spark >>> >>> So, I changed >>> set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ", >>> "1000000000") >>> but now I get the same error? >>> >>> y4j.protocol.Py4JJavaError: An error occurred while calling >>> o28.trainKMeansModel. >>> : org.apache.spark.SparkException: Job aborted due to stage failure: All >>> masters are unresponsive! Giving up. >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched >>> >>> >>> along with following: >>> 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted >>> any resources; check your cluster UI to ensure that workers are registered >>> and have sufficient memory >>> 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master >>> spark://host:7077... >>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: >>> Association failed with [akka.tcp://sparkMaster@host:7077] >>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: >>> Association failed with [akka.tcp://sparkMaster@host:7077] >>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: >>> Association failed with [akka.tcp://sparkMaster@host:7077] >>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >>> akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: >>> Association failed with [akka.tcp://sparkMaster@host:7077] >>> 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted >>> any resources; check your cluster UI to ensure that workers are registered >>> and have sufficient memory >>> 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted >>> any resources; check your cluster UI to ensure that workers are registered >>> and have sufficient memory >>> 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been >>> killed. Reason: All masters are unresponsive! Giving up. >>> >>> >>> :-( >>> >>> On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>>> >>>> Can you give this a try: >>>> >>>>> conf = SparkConf().set("spark.executor.memory", >>>>> "32G").set("spark.akka.frameSize ", >>>>> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory") >>>>> sc = SparkContext(conf = conf) >>>>> rdd = sc.parallelize(matrix,5) >>>>> from pyspark.mllib.clustering import KMeans >>>>> from math import sqrt >>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>> initializationMode="random") >>>>> def error(point): >>>>> center = clusters.centers[clusters.predict(point)] >>>>> return sqrt(sum([x**2 for x in (point - center)])) >>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>>>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com> >>>> wrote: >>>>> >>>>> And the thing is code runs just fine if I reduce the number of rows in >>>>> my data? >>>>> >>>>> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> >>>>> wrote: >>>>>> >>>>>> I am using spark1.0.2. >>>>>> This is my work cluster.. so I can't setup a new version readily... >>>>>> But right now, I am not using broadcast .. >>>>>> >>>>>> >>>>>> conf = SparkConf().set("spark.executor.memory", >>>>>> "32G").set("spark.akka.frameSize", "1000") >>>>>> sc = SparkContext(conf = conf) >>>>>> rdd = sc.parallelize(matrix,5) >>>>>> >>>>>> from pyspark.mllib.clustering import KMeans >>>>>> from math import sqrt >>>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>>> initializationMode="random") >>>>>> def error(point): >>>>>> center = clusters.centers[clusters.predict(point)] >>>>>> return sqrt(sum([x**2 for x in (point - center)])) >>>>>> >>>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>>>>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>>>> >>>>>> >>>>>> executed by >>>>>> spark-submit --master $SPARKURL clustering_example.py >>>>>> --executor-memory 32G --driver-memory 60G >>>>>> >>>>>> and the error I see >>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>>> o26.trainKMeansModel. >>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>> All masters are unresponsive! Giving up. >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>>>> at >>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>> at scala.Option.foreach(Option.scala:236) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>>>> at >>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >>>>>> >>>>>> and >>>>>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to >>>>>> akka.tcp://sparkMaster@hostname:7077: >>>>>> akka.remote.EndpointAssociationException: Association failed with >>>>>> [akka.tcp://sparkMaster@ hostname:7077] >>>>>> >>>>>> ?? >>>>>> Any suggestions?? >>>>>> >>>>>> >>>>>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> >>>>>> wrote: >>>>>>> >>>>>>> Hey Chengi, >>>>>>> >>>>>>> What's the version of Spark you are using? It have big improvements >>>>>>> about broadcast in 1.1, could you try it? >>>>>>> >>>>>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >>>>>>> wrote: >>>>>>> > Any suggestions.. I am really blocked on this one >>>>>>> > >>>>>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu >>>>>>> > <chengi.liu...@gmail.com> wrote: >>>>>>> >> >>>>>>> >> And when I use sparksubmit script, I get the following error: >>>>>>> >> >>>>>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>>>> >> o26.trainKMeansModel. >>>>>>> >> : org.apache.spark.SparkException: Job aborted due to stage >>>>>>> >> failure: All >>>>>>> >> masters are unresponsive! Giving up. >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>>>>> >> at >>>>>>> >> >>>>>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>> >> at >>>>>>> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>>> >> at scala.Option.foreach(Option.scala:236) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>>>>> >> at >>>>>>> >> >>>>>>> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>>>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>>>> >> at >>>>>>> >> >>>>>>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>>> >> at >>>>>>> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> >> at >>>>>>> >> >>>>>>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> >> at >>>>>>> >> >>>>>>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> >> at >>>>>>> >> >>>>>>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> >> >>>>>>> >> >>>>>>> >> My spark submit code is >>>>>>> >> >>>>>>> >> conf = SparkConf().set("spark.executor.memory", >>>>>>> >> "32G").set("spark.akka.frameSize", "1000") >>>>>>> >> sc = SparkContext(conf = conf) >>>>>>> >> rdd = sc.parallelize(matrix,5) >>>>>>> >> >>>>>>> >> from pyspark.mllib.clustering import KMeans >>>>>>> >> from math import sqrt >>>>>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>>>> >> initializationMode="random") >>>>>>> >> def error(point): >>>>>>> >> center = clusters.centers[clusters.predict(point)] >>>>>>> >> return sqrt(sum([x**2 for x in (point - center)])) >>>>>>> >> >>>>>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x >>>>>>> >> + y) >>>>>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>>>>> >> >>>>>>> >> Which is executed as following: >>>>>>> >> spark-submit --master $SPARKURL clustering_example.py >>>>>>> >> --executor-memory >>>>>>> >> 32G --driver-memory 60G >>>>>>> >> >>>>>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu >>>>>>> >> <chengi.liu...@gmail.com> >>>>>>> >> wrote: >>>>>>> >>> >>>>>>> >>> How? Example please.. >>>>>>> >>> Also, if I am running this in pyspark shell.. how do i configure >>>>>>> >>> spark.akka.frameSize ?? >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das >>>>>>> >>> <ak...@sigmoidanalytics.com> >>>>>>> >>> wrote: >>>>>>> >>>> >>>>>>> >>>> When the data size is huge, you better of use the >>>>>>> >>>> torrentBroadcastFactory. >>>>>>> >>>> >>>>>>> >>>> Thanks >>>>>>> >>>> Best Regards >>>>>>> >>>> >>>>>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu >>>>>>> >>>> <chengi.liu...@gmail.com> >>>>>>> >>>> wrote: >>>>>>> >>>>> >>>>>>> >>>>> Specifically the error I see when I try to operate on rdd >>>>>>> >>>>> created by >>>>>>> >>>>> sc.parallelize method >>>>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>>>>> >>>>> failure: >>>>>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >>>>>>> >>>>> spark.akka.frameSize >>>>>>> >>>>> (10485760 bytes). Consider using broadcast variables for large >>>>>>> >>>>> values. >>>>>>> >>>>> >>>>>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu >>>>>>> >>>>> <chengi.liu...@gmail.com> >>>>>>> >>>>> wrote: >>>>>>> >>>>>> >>>>>>> >>>>>> Hi, >>>>>>> >>>>>> I am trying to create an rdd out of large matrix.... >>>>>>> >>>>>> sc.parallelize >>>>>>> >>>>>> suggest to use broadcast >>>>>>> >>>>>> But when I do >>>>>>> >>>>>> >>>>>>> >>>>>> sc.broadcast(data) >>>>>>> >>>>>> I get this error: >>>>>>> >>>>>> >>>>>>> >>>>>> Traceback (most recent call last): >>>>>>> >>>>>> File "<stdin>", line 1, in <module> >>>>>>> >>>>>> File >>>>>>> >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", line >>>>>>> >>>>>> 370, in broadcast >>>>>>> >>>>>> pickled = pickleSer.dumps(value) >>>>>>> >>>>>> File >>>>>>> >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >>>>>>> >>>>>> line 279, in dumps >>>>>>> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >>>>>>> >>>>>> SystemError: error return without exception set >>>>>>> >>>>>> Help? >>>>>>> >>>>>> >>>>>>> >>>>> >>>>>>> >>>> >>>>>>> >>> >>>>>>> >> >>>>>>> > >>>>>> >>>>>> >>>>> >>>> >>> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org