Re: JavaRDD.foreach (new VoidFunction<>...) always returns the last element
Hi Sean, Thanks for your great help! It works all right if I remove persist!! For next step, I will transform those values before persist. I convert to RDD and back to JavaRDD just for testing purposes. Best Regards, Jia On Mon, Jul 25, 2016 at 1:01 PM, Sean Owen <so...@cloudera.com> wrote: > Why are you converting to RDD and back to JavaRDD? > The problem is storing references to Writable, which are mutated by the > InputFormat. Somewhere you have 1000 refs to the same key. I think it may > be the persist. You want to immediately transform these values to something > besides a Writable. > > On Mon, Jul 25, 2016, 18:50 Jia Zou <jacqueline...@gmail.com> wrote: > >> >> My code is as following: >> >> System.out.println("Initialize points..."); >> >> JavaPairRDD<IntWritable, DoubleArrayWritable> data = >> >> sc.sequenceFile(inputFile, IntWritable. >> class, DoubleArrayWritable.class); >> >> RDD<Tuple2<IntWritable, DoubleArrayWritable>> rdd = >> >> JavaPairRDD.toRDD(data); >> >> JavaRDD<Tuple2<IntWritable, DoubleArrayWritable>> points >> = JavaRDD.fromRDD(rdd, data.classTag()); >> >> points.persist(StorageLevel.MEMORY_ONLY()); >> >> int i; >> >> >> for (i=0; i<iterations; i++) { >> >> System.out.println("iteration="+i); >> >> //points.foreach(new >> ForEachMapPointToCluster(numDimensions, numClusters)); >> >> points.foreach(new >> VoidFunction<Tuple2<IntWritable, DoubleArrayWritable>>() { >> >> public void call(Tuple2<IntWritable, >> DoubleArrayWritable> tuple) { >> >> IntWritable key = tuple._1(); >> >> System.out.println("key:"+key.get()); >> >> DoubleArrayWritable array = tuple._2(); >> >> double[] point = array.getData(); >> >> for (int d = 0; d < 20; d ++) { >> >> System.out.println(d+":"+point[d]); >> >> } >> >> } >> >> }); >> >> } >> >> >> The output is a lot of following, only the last element in the rdd has >> been output. >> >> key:999 >> >> 0:0.9953839426689233 >> >> 1:0.12656798341145892 >> >> 2:0.16621114723289654 >> >> 3:0.48628049787614236 >> >> 4:0.476991470215116 >> >> 5:0.5033640235789054 >> >> 6:0.09257098597507829 >> >> 7:0.3153088440494892 >> >> 8:0.8807426085223242 >> >> 9:0.2809625780570739 >> >> 10:0.9584880094505738 >> >> 11:0.38521222520661547 >> >> 12:0.5114241334425228 >> >> 13:0.9524628903835111 >> >> 14:0.5252549496842003 >> >> 15:0.5732037830866236 >> >> 16:0.8632451606583632 >> >> 17:0.39754347061499895 >> >> 18:0.2859522809981715 >> >> 19:0.2659002343432888 >> >> key:999 >> >> 0:0.9953839426689233 >> >> 1:0.12656798341145892 >> >> 2:0.16621114723289654 >> >> 3:0.48628049787614236 >> >> 4:0.476991470215116 >> >> 5:0.5033640235789054 >> >> 6:0.09257098597507829 >> >> 7:0.3153088440494892 >> >> 8:0.8807426085223242 >> >> 9:0.2809625780570739 >> >> 10:0.9584880094505738 >> >> 11:0.38521222520661547 >> >> 12:0.5114241334425228 >> >> 13:0.9524628903835111 >> >> 14:0.5252549496842003 >> >> 15:0.5732037830866236 >> >> 16:0.8632451606583632 >> >> 17:0.39754347061499895 >> >> 18:0.2859522809981715 >> >> 19:0.2659002343432888 >> >> key:999 >> >> 0:0.9953839426689233 >> >> 1:0.12656798341145892 >> >> 2:0.16621114723289654 >> >> 3:0.48628049787614236 >> >> 4:0.476991470215116 >> >> 5:0.5033640235789054 >> >> 6:0.09257098597507829 >> >> 7:0.3153088440494892 >> >> 8:0.8807426085223242 >> >> 9:0.2809625780570739 >> >> 10:0.9584880094505738 >> >> 11:0.38521222520661547 >> >> 12:0.5114241334425228 >> >> 13:0.9524628903835111 >> >> 14:0.5252549496842003 >> >> 15:0.5732037830866236 >> >> 16:0.8632451606583632 >> >> 17:0.39754347061499895 >> >> 18:0.2859522809981715 >> >> 19:0.2659002343432888 >> >
JavaRDD.foreach (new VoidFunction<>...) always returns the last element
My code is as following: System.out.println("Initialize points..."); JavaPairRDDdata = sc.sequenceFile(inputFile, IntWritable.class, DoubleArrayWritable.class); RDD > rdd = JavaPairRDD.toRDD(data); JavaRDD > points = JavaRDD.fromRDD(rdd, data.classTag()); points.persist(StorageLevel.MEMORY_ONLY()); int i; for (i=0; i >() { public void call(Tuple2 tuple) { IntWritable key = tuple._1(); System.out.println("key:"+key.get()); DoubleArrayWritable array = tuple._2(); double[] point = array.getData(); for (int d = 0; d < 20; d ++) { System.out.println(d+":"+point[d]); } } }); } The output is a lot of following, only the last element in the rdd has been output. key:999 0:0.9953839426689233 1:0.12656798341145892 2:0.16621114723289654 3:0.48628049787614236 4:0.476991470215116 5:0.5033640235789054 6:0.09257098597507829 7:0.3153088440494892 8:0.8807426085223242 9:0.2809625780570739 10:0.9584880094505738 11:0.38521222520661547 12:0.5114241334425228 13:0.9524628903835111 14:0.5252549496842003 15:0.5732037830866236 16:0.8632451606583632 17:0.39754347061499895 18:0.2859522809981715 19:0.2659002343432888 key:999 0:0.9953839426689233 1:0.12656798341145892 2:0.16621114723289654 3:0.48628049787614236 4:0.476991470215116 5:0.5033640235789054 6:0.09257098597507829 7:0.3153088440494892 8:0.8807426085223242 9:0.2809625780570739 10:0.9584880094505738 11:0.38521222520661547 12:0.5114241334425228 13:0.9524628903835111 14:0.5252549496842003 15:0.5732037830866236 16:0.8632451606583632 17:0.39754347061499895 18:0.2859522809981715 19:0.2659002343432888 key:999 0:0.9953839426689233 1:0.12656798341145892 2:0.16621114723289654 3:0.48628049787614236 4:0.476991470215116 5:0.5033640235789054 6:0.09257098597507829 7:0.3153088440494892 8:0.8807426085223242 9:0.2809625780570739 10:0.9584880094505738 11:0.38521222520661547 12:0.5114241334425228 13:0.9524628903835111 14:0.5252549496842003 15:0.5732037830866236 16:0.8632451606583632 17:0.39754347061499895 18:0.2859522809981715 19:0.2659002343432888
Re: how to calculate -- executor-memory,num-executors,total-executor-cores
Divya, According to my recent Spark tuning experiences, optimal executor-memory size not only depends on your workload characteristics (e.g. working set size at each job stage) and input data size, but also depends on your total available memory and memory requirements of other components like driver (also depends on how your workload interacts with driver) and underlying storage. In my opinion, it may be difficult to derive one generic and easy formular to describe all the dynamic relationships. Best Regards, Jia On Wed, Feb 3, 2016 at 12:13 AM, Divya Gehlotwrote: > Hi, > > I would like to know how to calculate how much -executor-memory should we > allocate , how many num-executors,total-executor-cores we should give while > submitting spark jobs . > Is there any formula for it ? > > > Thanks, > Divya >
Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2
Hi, Calvin, I am running 24GB data Spark KMeans in a c3.2xlarge AWS instance with 30GB physical memory. Spark will cache data off-heap to Tachyon, the input data is also stored in Tachyon. Tachyon is configured to use 15GB memory, and use tired store. Tachyon underFS is /tmp. The only configuration I've changed is Tachyon data block size. Above experiment is a part of a research project. Best Regards, Jia On Thursday, January 28, 2016 at 9:11:19 PM UTC-6, Calvin Jia wrote: > > Hi, > > Thanks for the detailed information. How large is the dataset you are > running against? Also did you change any Tachyon configurations? > > Thanks, > Calvin > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Problem Solved]Re: Spark partition size tuning
Hi, dears, the problem has been solved. I mistakely use tachyon.user.block.size.bytes instead of tachyon.user.block.size.bytes.default. It works now. Sorry for the confusion and thanks again to Gene! Best Regards, Jia On Wed, Jan 27, 2016 at 4:59 AM, Jia Zou <jacqueline...@gmail.com> wrote: > Hi, Gene, > > Thanks for your suggestion. > However, even if I set tachyon.user.block.size.bytes=134217728, and I can > see that from the web console, the files that I load to Tachyon via > copyToLocal, still has 512MB block size. > Do you have more suggestions? > > Best Regards, > Jia > > On Tue, Jan 26, 2016 at 11:46 PM, Gene Pang <gene.p...@gmail.com> wrote: > >> Hi Jia, >> >> If you want to change the Tachyon block size, you can set the >> tachyon.user.block.size.bytes.default parameter ( >> http://tachyon-project.org/documentation/Configuration-Settings.html). >> You can set it via extraJavaOptions per job, or adding it to >> tachyon-site.properties. >> >> I hope that helps, >> Gene >> >> On Mon, Jan 25, 2016 at 8:13 PM, Jia Zou <jacqueline...@gmail.com> wrote: >> >>> Dear all, >>> >>> First to update that the local file system data partition size can be >>> tuned by: >>> sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize) >>> >>> However, I also need to tune Spark data partition size for input data >>> that is stored in Tachyon (default is 512MB), but above method can't work >>> for Tachyon data. >>> >>> Do you have any suggestions? Thanks very much! >>> >>> Best Regards, >>> Jia >>> >>> >>> -- Forwarded message -- >>> From: Jia Zou <jacqueline...@gmail.com> >>> Date: Thu, Jan 21, 2016 at 10:05 PM >>> Subject: Spark partition size tuning >>> To: "user @spark" <user@spark.apache.org> >>> >>> >>> Dear all! >>> >>> When using Spark to read from local file system, the default partition >>> size is 32MB, how can I increase the partition size to 128MB, to reduce the >>> number of tasks? >>> >>> Thank you very much! >>> >>> Best Regards, >>> Jia >>> >>> >> >
TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2
Dears, I keep getting below exception when using Spark 1.6.0 on top of Tachyon 0.8.2. Tachyon is 93% used and configured as CACHE_THROUGH. Any suggestions will be appreciated, thanks! = Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 0.0 failed 4 times, most recent failure: Lost task 13.3 in stage 0.0 (TID 33, ip-10-73-198-35.ec2.internal): java.io.IOException: tachyon.org.apache.thrift.transport.TTransportException at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:416) at tachyon.client.block.LocalBlockInStream.close(LocalBlockInStream.java:87) at tachyon.client.file.FileInStream.close(FileInStream.java:105) at tachyon.hadoop.HdfsFileInputStream.read(HdfsFileInputStream.java:171) at java.io.DataInputStream.readInt(DataInputStream.java:388) at org.apache.hadoop.io.SequenceFile$Reader.readRecordLength(SequenceFile.java:2325) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2356) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2493) at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:193) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31$$anon$1.hasNext(RDD.scala:851) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: tachyon.org.apache.thrift.transport.TTransportException at tachyon.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at tachyon.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at tachyon.org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) at tachyon.org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) at tachyon.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at tachyon.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at tachyon.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at tachyon.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at tachyon.thrift.WorkerService$Client.recv_unlockBlock(WorkerService.java:455) at tachyon.thrift.WorkerService$Client.unlockBlock(WorkerService.java:441) at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:413) ... 28 more
Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2
BTW. The tachyon worker log says following: 2015-12-27 01:33:44,599 ERROR WORKER_LOGGER (WorkerBlockMasterClient.java:getId) - java.net.SocketException: Connection reset org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.thrift.protocol.TProtocolDecorator.readMessageBegin(TProtocolDecorator.java:135) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at tachyon.thrift.BlockMasterService$Client.recv_workerGetWorkerId(BlockMasterService.java:235) at tachyon.thrift.BlockMasterService$Client.workerGetWorkerId(BlockMasterService.java:222) at tachyon.client.WorkerBlockMasterClient.getId(WorkerBlockMasterClient.java:103) at tachyon.worker.WorkerIdRegistry.registerWithBlockMaster(WorkerIdRegistry.java:59) at tachyon.worker.block.BlockWorker.(BlockWorker.java:200) at tachyon.worker.TachyonWorker.main(TachyonWorker.java:42) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) ... 15 more On Wed, Jan 27, 2016 at 5:02 AM, Jia Zou <jacqueline...@gmail.com> wrote: > Dears, I keep getting below exception when using Spark 1.6.0 on top of > Tachyon 0.8.2. Tachyon is 93% used and configured as CACHE_THROUGH. > > Any suggestions will be appreciated, thanks! > > = > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 13 in stage 0.0 failed 4 times, most recent > failure: Lost task 13.3 in stage 0.0 (TID 33, > ip-10-73-198-35.ec2.internal): java.io.IOException: > tachyon.org.apache.thrift.transport.TTransportException > > at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:416) > > at > tachyon.client.block.LocalBlockInStream.close(LocalBlockInStream.java:87) > > at tachyon.client.file.FileInStream.close(FileInStream.java:105) > > at tachyon.hadoop.HdfsFileInputStream.read(HdfsFileInputStream.java:171) > > at java.io.DataInputStream.readInt(DataInputStream.java:388) > > at > org.apache.hadoop.io.SequenceFile$Reader.readRecordLength(SequenceFile.java:2325) > > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2356) > > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2493) > > at > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208) > > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:193) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > > at > org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31$$anon$1.hasNext(RDD.scala:851) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595) > > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) > > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) &g
Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2
BTW. At the end of the log, I also find a lot of errors like below: = 2016-01-27 11:47:18,515 ERROR server.TThreadPoolServer (TThreadPoolServer.java:run) - Error occurred during processing of message. java.lang.NullPointerException at tachyon.worker.block.BlockLockManager.unlockBlock(BlockLockManager.java:142) at tachyon.worker.block.TieredBlockStore.unlockBlock(TieredBlockStore.java:148) at tachyon.worker.block.BlockDataManager.unlockBlock(BlockDataManager.java:476) at tachyon.worker.block.BlockServiceHandler.unlockBlock(BlockServiceHandler.java:232) at tachyon.thrift.WorkerService$Processor$unlockBlock.getResult(WorkerService.java:1150) at tachyon.thrift.WorkerService$Processor$unlockBlock.getResult(WorkerService.java:1135) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Wed, Jan 27, 2016 at 5:53 AM, Jia Zou <jacqueline...@gmail.com> wrote: > BTW. The tachyon worker log says following: > > > > 2015-12-27 01:33:44,599 ERROR WORKER_LOGGER > (WorkerBlockMasterClient.java:getId) - java.net.SocketException: Connection > reset > > org.apache.thrift.transport.TTransportException: java.net.SocketException: > Connection reset > > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) > > at > org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > > at > org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) > > at > org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) > > at > org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > > at > org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) > > at > org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) > > at > org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) > > at > org.apache.thrift.protocol.TProtocolDecorator.readMessageBegin(TProtocolDecorator.java:135) > > at > org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) > > at > tachyon.thrift.BlockMasterService$Client.recv_workerGetWorkerId(BlockMasterService.java:235) > > at > tachyon.thrift.BlockMasterService$Client.workerGetWorkerId(BlockMasterService.java:222) > > at > tachyon.client.WorkerBlockMasterClient.getId(WorkerBlockMasterClient.java:103) > > at > tachyon.worker.WorkerIdRegistry.registerWithBlockMaster(WorkerIdRegistry.java:59) > > at tachyon.worker.block.BlockWorker.(BlockWorker.java:200) > > at tachyon.worker.TachyonWorker.main(TachyonWorker.java:42) > > Caused by: java.net.SocketException: Connection reset > > at java.net.SocketInputStream.read(SocketInputStream.java:196) > > at java.net.SocketInputStream.read(SocketInputStream.java:122) > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > > at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:334) > > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) > > ... 15 more > > On Wed, Jan 27, 2016 at 5:02 AM, Jia Zou <jacqueline...@gmail.com> wrote: > >> Dears, I keep getting below exception when using Spark 1.6.0 on top of >> Tachyon 0.8.2. Tachyon is 93% used and configured as CACHE_THROUGH. >> >> Any suggestions will be appreciated, thanks! >> >> = >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted >> due to stage failure: Task 13 in stage 0.0 failed 4 times, most recent >> failure: Lost task 13.3 in stage 0.0 (TID 33, >> ip-10-73-198-35.ec2.internal): java.io.IOException: >> tachyon.org.apache.thrift.transport.TTransportException >> >> at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:416) >> >> at >> tachyon.client.block.LocalBlockInStream.close(LocalBlockInStream.java:87) >> >> at tachyon.client.
Re: Spark partition size tuning
Hi, Gene, Thanks for your suggestion. However, even if I set tachyon.user.block.size.bytes=134217728, and I can see that from the web console, the files that I load to Tachyon via copyToLocal, still has 512MB block size. Do you have more suggestions? Best Regards, Jia On Tue, Jan 26, 2016 at 11:46 PM, Gene Pang <gene.p...@gmail.com> wrote: > Hi Jia, > > If you want to change the Tachyon block size, you can set the > tachyon.user.block.size.bytes.default parameter ( > http://tachyon-project.org/documentation/Configuration-Settings.html). > You can set it via extraJavaOptions per job, or adding it to > tachyon-site.properties. > > I hope that helps, > Gene > > On Mon, Jan 25, 2016 at 8:13 PM, Jia Zou <jacqueline...@gmail.com> wrote: > >> Dear all, >> >> First to update that the local file system data partition size can be >> tuned by: >> sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize) >> >> However, I also need to tune Spark data partition size for input data >> that is stored in Tachyon (default is 512MB), but above method can't work >> for Tachyon data. >> >> Do you have any suggestions? Thanks very much! >> >> Best Regards, >> Jia >> >> >> -- Forwarded message -- >> From: Jia Zou <jacqueline...@gmail.com> >> Date: Thu, Jan 21, 2016 at 10:05 PM >> Subject: Spark partition size tuning >> To: "user @spark" <user@spark.apache.org> >> >> >> Dear all! >> >> When using Spark to read from local file system, the default partition >> size is 32MB, how can I increase the partition size to 128MB, to reduce the >> number of tasks? >> >> Thank you very much! >> >> Best Regards, >> Jia >> >> >
Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2
BTW. the error happens when configure Spark to read input file from Tachyon like following: /home/ubuntu/spark-1.6.0/bin/spark-submit --properties-file /home/ubuntu/HiBench/report/kmeans/spark/java/conf/sparkbench/spark.conf --class org.apache.spark.examples.mllib.JavaKMeans --master spark://ip -10-73-198-35:7077 /home/ubuntu/HiBench/src/sparkbench/target/sparkbench-5.0-SNAPSHOT-MR2-spark1.5-jar-with-dependencies.jar tachyon://localhost:19998/Kmeans/Input/samples 10 5 On Wed, Jan 27, 2016 at 5:02 AM, Jia Zou <jacqueline...@gmail.com> wrote: > Dears, I keep getting below exception when using Spark 1.6.0 on top of > Tachyon 0.8.2. Tachyon is 93% used and configured as CACHE_THROUGH. > > Any suggestions will be appreciated, thanks! > > = > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 13 in stage 0.0 failed 4 times, most recent > failure: Lost task 13.3 in stage 0.0 (TID 33, > ip-10-73-198-35.ec2.internal): java.io.IOException: > tachyon.org.apache.thrift.transport.TTransportException > > at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:416) > > at > tachyon.client.block.LocalBlockInStream.close(LocalBlockInStream.java:87) > > at tachyon.client.file.FileInStream.close(FileInStream.java:105) > > at tachyon.hadoop.HdfsFileInputStream.read(HdfsFileInputStream.java:171) > > at java.io.DataInputStream.readInt(DataInputStream.java:388) > > at > org.apache.hadoop.io.SequenceFile$Reader.readRecordLength(SequenceFile.java:2325) > > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2356) > > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2493) > > at > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208) > > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:193) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > > at > org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$31$$anon$1.hasNext(RDD.scala:851) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595) > > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) > > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > > at org.apache.spark.scheduler.Task.run(Task.scala:89) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: tachyon.org.apache.thrift.transport.TTransportException > > at > tachyon.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) > > at > tachyon.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > > at > tachyon.org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129) > > at > tachyon.org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) > > at > tachyon.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > > at > tachyon.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) > > at > tachyon.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) > > at > tachyon.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) > > at > tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) > > at > tachyon.thrift.WorkerService$Client.recv_unlockBlock(WorkerService.java:455) > > at tachyon.thrift.WorkerService$Client.unlockBlock(WorkerService.java:441) > > at tachyon.worker.WorkerClient.unlockBlock(WorkerClient.java:413) > > ... 28 more > > >
Fwd: Spark partition size tuning
Dear all, First to update that the local file system data partition size can be tuned by: sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize) However, I also need to tune Spark data partition size for input data that is stored in Tachyon (default is 512MB), but above method can't work for Tachyon data. Do you have any suggestions? Thanks very much! Best Regards, Jia -- Forwarded message ------ From: Jia Zou <jacqueline...@gmail.com> Date: Thu, Jan 21, 2016 at 10:05 PM Subject: Spark partition size tuning To: "user @spark" <user@spark.apache.org> Dear all! When using Spark to read from local file system, the default partition size is 32MB, how can I increase the partition size to 128MB, to reduce the number of tasks? Thank you very much! Best Regards, Jia
Can Spark read input data from HDFS centralized cache?
I configured HDFS to cache file in HDFS's cache, like following: hdfs cacheadmin -addPool hibench hdfs cacheadmin -addDirective -path /HiBench/Kmeans/Input -pool hibench But I didn't see much performance impacts, no matter how I configure dfs.datanode.max.locked.memory Is it possible that Spark doesn't know the data is in HDFS cache, and still read data from disk, instead of from HDFS cache? Thanks! Jia
Spark partition size tuning
Dear all! When using Spark to read from local file system, the default partition size is 32MB, how can I increase the partition size to 128MB, to reduce the number of tasks? Thank you very much! Best Regards, Jia
Can I configure Spark on multiple nodes using local filesystem on each node?
Dear all, Can I configure Spark on multiple nodes without HDFS, so that output data will be written to the local file system on each node? I guess there is no such feature in Spark, but just want to confirm. Best Regards, Jia
Re: Reuse Executor JVM across different JobContext
Hi, Mark, sorry, I mean SparkContext. I mean to change Spark into running all submitted jobs (SparkContexts) in one executor JVM. Best Regards, Jia On Sun, Jan 17, 2016 at 2:21 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > -dev > > What do you mean by JobContext? That is a Hadoop mapreduce concept, not > Spark. > > On Sun, Jan 17, 2016 at 7:29 AM, Jia Zou <jacqueline...@gmail.com> wrote: > >> Dear all, >> >> Is there a way to reuse executor JVM across different JobContexts? Thanks. >> >> Best Regards, >> Jia >> > >
Reuse Executor JVM across different JobContext
Dear all, Is there a way to reuse executor JVM across different JobContexts? Thanks. Best Regards, Jia
org.apache.spark.storage.BlockNotFoundException in Spark1.5.2+Tachyon0.7.1
Dear all, I am using Spark1.5.2 and Tachyon0.7.1 to run KMeans with inputRDD.persist(StorageLevel.OFF_HEAP()). I've set tired storage for Tachyon. It is all right when working set is smaller than available memory. However, when working set exceeds available memory, I keep getting errors like below: 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.1 in stage 0.0 (TID 206) on executor 10.149.11.81: java.lang.RuntimeException (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.1 in stage 0.0 (TID 207) on executor 10.149.11.81: java.lang.RuntimeException (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.2 in stage 0.0 (TID 208) on executor 10.149.11.81: java.lang.RuntimeException (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.2 in stage 0.0 (TID 209) on executor 10.149.11.81: java.lang.RuntimeException (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.3 in stage 0.0 (TID 210) on executor 10.149.11.81: java.lang.RuntimeException (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found Can any one give me some suggestions? Thanks a lot! Best Regards, Jia
Re: Spark MLLib KMeans Performance on Amazon EC2 M3.2xlarge
Thanks, Yanbo. The results become much more reasonable, after I set driver memory to 5GB and increase worker memory to 25GB. So, my question is for following code snippet extracted from main method in JavaKMeans.java in examples, what will the driver do? and what will the worker do? I didn't understand this problem well by reading https://spark.apache.org/docs/1.1.0/cluster-overview.htmland http://stackoverflow.com/questions/27181737/how-to-deal-with-executor-memory-and-driver-memory-in-spark SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD lines = sc.textFile(inputFile); JavaRDD points = lines.map(new ParsePoint()); points.persist(StorageLevel.MEMORY_AND_DISK()); KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); Thank you very much! Best Regards, Jia On Wed, Dec 30, 2015 at 9:00 PM, Yanbo Liang <yblia...@gmail.com> wrote: > Hi Jia, > > You can try to use inputRDD.persist(MEMORY_AND_DISK) and verify whether it > can produce stable performance. The storage level of MEMORY_AND_DISK will > store the partitions that don't fit on disk and read them from there when > they are needed. > Actually, it's not necessary to set so large driver memory in your case, > because KMeans use low memory for driver if your k is not very large. > > Cheers > Yanbo > > 2015-12-30 22:20 GMT+08:00 Jia Zou <jacqueline...@gmail.com>: > >> I am running Spark MLLib KMeans in one EC2 M3.2xlarge instance with 8 CPU >> cores and 30GB memory. Executor memory is set to 15GB, and driver memory is >> set to 15GB. >> >> The observation is that, when input data size is smaller than 15GB, the >> performance is quite stable. However, when input data becomes larger than >> that, the performance will be extremely unpredictable. For example, for >> 15GB input, with inputRDD.persist(MEMORY_ONLY) , I've got three >> dramatically different testing results: 27mins, 61mins and 114 mins. (All >> settings are the same for the 3 tests, and I will create input data >> immediately before running each of the tests to keep OS buffer cache hot.) >> >> Anyone can help to explain this? Thanks very much! >> >> >
Spark MLLib KMeans Performance on Amazon EC2 M3.2xlarge
I am running Spark MLLib KMeans in one EC2 M3.2xlarge instance with 8 CPU cores and 30GB memory. Executor memory is set to 15GB, and driver memory is set to 15GB. The observation is that, when input data size is smaller than 15GB, the performance is quite stable. However, when input data becomes larger than that, the performance will be extremely unpredictable. For example, for 15GB input, with inputRDD.persist(MEMORY_ONLY) , I've got three dramatically different testing results: 27mins, 61mins and 114 mins. (All settings are the same for the 3 tests, and I will create input data immediately before running each of the tests to keep OS buffer cache hot.) Anyone can help to explain this? Thanks very much!
How to use HProf to profile Spark CPU overhead
My goal is to use hprof to profile where the bottleneck is. Is there anyway to do this without modifying and rebuilding Spark source code. I've tried to add " -Xrunhprof:cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=/home/ubuntu/out.hprof" to spark-class script, but it can only profile the CPU usage of the org.apache.spark.deploy.SparkSubmit class, and can not provide insights for other classes like BlockManager, and user classes. Any suggestions? Thanks a lot! Best Regards, Jia
Re: How to use HProf to profile Spark CPU overhead
Hi, Ted, it works, thanks a lot for your help! --Jia On Sat, Dec 12, 2015 at 3:01 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Have you tried adding the option below through > spark.executor.extraJavaOptions ? > > Cheers > > > On Dec 13, 2015, at 3:36 AM, Jia Zou <jacqueline...@gmail.com> wrote: > > > > My goal is to use hprof to profile where the bottleneck is. > > Is there anyway to do this without modifying and rebuilding Spark source > code. > > > > I've tried to add > "-Xrunhprof:cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=/home/ubuntu/out.hprof" > to spark-class script, but it can only profile the CPU usage of the > org.apache.spark.deploy.SparkSubmit class, and can not provide insights for > other classes like BlockManager, and user classes. > > > > Any suggestions? Thanks a lot! > > > > Best Regards, > > Jia >