Can you try writing to a different S3 bucket and confirm that? Thanks Best Regards
On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler <dmah...@gmail.com> wrote: > Hi Akhil, > > It works fine when outprefix is a hdfs:///localhost/... url. > > It looks to me as if there is something about spark writing to the same s3 > bucket it is reading from. > > That is the only real difference between the 2 saveAsTextFile whet > outprefix is on s3, > inpath is also on s3 but in a different bucket, but jsonRaw and jsonClean > are distinct directories in the same bucket. > I do know know why that should be a problem though. > > I will rerun using s3 paths and send the log information. > > thanks > Daniel > > thanks > Daniel > > On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Can you look in your worker logs and see whats happening in there? Are >> you able to write the same to your HDFS? >> >> Thanks >> Best Regards >> >> On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler <dmah...@gmail.com> wrote: >> >>> I am having a strange problem writing to s3 that I have distilled to >>> this minimal example: >>> >>> def jsonRaw = s"${outprefix}-json-raw" >>> def jsonClean = s"${outprefix}-json-clean" >>> >>> val txt = sc.textFile(inpath)//.coalesce(shards, false) >>> txt.count >>> >>> val res = txt.saveAsTextFile(jsonRaw) >>> >>> val txt2 = sc.textFile(jsonRaw +"/part-*") >>> txt2.count >>> >>> txt2.saveAsTextFile(jsonClean) >>> >>> This code should simply copy files from inpath to jsonRaw and then from >>> jsonRaw to jsonClean. >>> This code executes all the way down to the last line where it hangs >>> after creating the output directory contatining a _temporary_$folder but no >>> actual files not even temporary ones. >>> >>> `outputprefix` is and bucket url, both jsonRaw and jsonClean are in the >>> same bucket. >>> Both calls .count succeed and return the same number. This means Spark >>> can read from inpath and can both read from and write to jsonRaw. Since >>> jsonClean is in the same bucket as jsonRaw and the final line does create >>> the directory, I cannot think of any reason why the files should not be >>> written. If there were any access or url problems they should already >>> manifest when writing jsonRaw. >>> >>> This problem is completely reproduceable with Spark 1.2.1 and 1.3.1 >>> The console output from the last line is >>> >>> scala> txt0.saveAsTextFile(jsonClean) >>> 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3 >>> 15/04/21 22:55:48 INFO storage.BlockManager: Removing block >>> broadcast_3_piece0 >>> 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of >>> size 2024 dropped from memory (free 278251716) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size: >>> 2024.0 B, free: 265.4 MB) >>> 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of block >>> broadcast_3_piece0 >>> 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3 >>> 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size >>> 2728 dropped from memory (free 278254444) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size: >>> 2024.0 B, free: 13.8 GB) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size: >>> 2024.0 B, free: 13.8 GB) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size: >>> 2024.0 B, free: 13.8 GB) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size: >>> 2024.0 B, free: 13.8 GB) >>> 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed >>> broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size: >>> 2024.0 B, free: 13.8 GB) >>> 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3 >>> 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile >>> at <console>:38 >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2 (saveAsTextFile >>> at <console>:38) with 96 output partitions (allowLocal=false) >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage >>> 2(saveAsTextFile at <console>:38) >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage: >>> List() >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List() >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2 >>> (MapPartitionsRDD[5] at saveAsTextFile at <console>:38), which has no >>> missing parents >>> 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248) >>> called with curMem=48112, maxMem=278302556 >>> 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as >>> values in memory (estimated size 21.7 KB, free 265.3 MB) >>> 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352) >>> called with curMem=70360, maxMem=278302556 >>> 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4_piece0 >>> stored as bytes in memory (estimated size 16.9 KB, free 265.3 MB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-51-181-81.ec2.internal:45199 (size: >>> 16.9 KB, free: 265.4 MB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerMaster: Updated info of block >>> broadcast_4_piece0 >>> 15/04/21 22:55:49 INFO spark.SparkContext: Created broadcast 4 from >>> broadcast at DAGScheduler.scala:839 >>> 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting 96 missing >>> tasks from Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at <console>:38) >>> 15/04/21 22:55:49 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 >>> with 96 tasks >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 0.0 in >>> stage 2.0 (TID 192, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 1.0 in >>> stage 2.0 (TID 193, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 2.0 in >>> stage 2.0 (TID 194, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 3.0 in >>> stage 2.0 (TID 195, ip-10-69-77-180.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 4.0 in >>> stage 2.0 (TID 196, ip-10-51-153-34.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 5.0 in >>> stage 2.0 (TID 197, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 6.0 in >>> stage 2.0 (TID 198, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 7.0 in >>> stage 2.0 (TID 199, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 8.0 in >>> stage 2.0 (TID 200, ip-10-69-77-180.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 9.0 in >>> stage 2.0 (TID 201, ip-10-51-153-34.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 10.0 in >>> stage 2.0 (TID 202, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 11.0 in >>> stage 2.0 (TID 203, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 12.0 in >>> stage 2.0 (TID 204, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 13.0 in >>> stage 2.0 (TID 205, ip-10-69-77-180.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 14.0 in >>> stage 2.0 (TID 206, ip-10-51-153-34.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 15.0 in >>> stage 2.0 (TID 207, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 16.0 in >>> stage 2.0 (TID 208, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 17.0 in >>> stage 2.0 (TID 209, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 >>> bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 18.0 in >>> stage 2.0 (TID 210, ip-10-69-77-180.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 19.0 in >>> stage 2.0 (TID 211, ip-10-51-153-34.ec2.internal, PROCESS_LOCAL, 1377 bytes) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-61-144-7.ec2.internal:44849 (size: >>> 16.9 KB, free: 13.8 GB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-69-77-180.ec2.internal:42417 (size: >>> 16.9 KB, free: 13.8 GB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-158-142-155.ec2.internal:54690 (size: >>> 16.9 KB, free: 13.8 GB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-166-129-153.ec2.internal:46671 (size: >>> 16.9 KB, free: 13.8 GB) >>> 15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added >>> broadcast_4_piece0 in memory on ip-10-51-153-34.ec2.internal:51691 (size: >>> 16.9 KB, free: 13.8 GB) >>> >>> This feels like I am missing something really basic. >>> >>> thanks >>> Daniel >>> >> >> >