Hi! On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <brk...@gmail.com> wrote:
> Hi Everett, > > IIRC we added unionAll in Spark 2.0 which is the same implementation as > rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, and > that's why you should be seeing the slowdown. > I thought it was the other way -- unionAll was deprecated in 2.0 and union now does not de-dupe -- "Deprecated. use union(). Since 2.0.0. Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL." from https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset) and https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset) > > Best, > Burak > > On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> Looks like the Dataset version of union may also fail with the following >> on larger data sets, which again seems like it might be drawing everything >> into the driver for some reason -- >> >> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID >> 5760, ip-10-8-52-198.us-west-2.compute.internal): >> java.lang.IllegalArgumentException: bound must be positive >> at java.util.Random.nextInt(Random.java:388) >> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c >> onfChanged(LocalDirAllocator.java:305) >> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.g >> etLocalPathForWrite(LocalDirAllocator.java:344) >> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c >> reateTmpFileForWrite(LocalDirAllocator.java:416) >> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite >> (LocalDirAllocator.java:198) >> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStr >> eam.java:87) >> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) >> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi >> leWriter.java:176) >> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFi >> leWriter.java:160) >> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite >> r(ParquetOutputFormat.java:289) >> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite >> r(ParquetOutputFormat.java:262) >> at org.apache.spark.sql.execution.datasources.parquet.ParquetOu >> tputWriter.<init>(ParquetFileFormat.scala:562) >> at org.apache.spark.sql.execution.datasources.parquet.ParquetFi >> leFormat$$anon$1.newInstance(ParquetFileFormat.scala:139) >> at org.apache.spark.sql.execution.datasources.BaseWriterContain >> er.newOutputWriter(WriterContainer.scala:131) >> at org.apache.spark.sql.execution.datasources.DefaultWriterCont >> ainer.writeRows(WriterContainer.scala:247) >> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF >> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1. >> apply(InsertIntoHadoopFsRelationCommand.scala:143) >> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF >> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1. >> apply(InsertIntoHadoopFsRelationCommand.scala:143) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) >> at org.apache.spark.scheduler.Task.run(Task.scala:86) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com> >> wrote: >> >>> Hi, >>> >>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of >>> tables together and save as Parquet to S3, but it seems to take a long >>> time. We're using the S3A FileSystem implementation under the covers, too, >>> if that helps. >>> >>> Watching the Spark UI, the executors all eventually stop (we're using >>> dynamic allocation) but under the SQL tab we can see a "save at >>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is >>> still running of course, but it may take tens of minutes to finish. It >>> makes me wonder if our data all being collected through the driver. >>> >>> If we instead convert the Datasets to RDDs and call SparkContext.union() >>> it works quickly. >>> >>> Anyone know if this is a known issue? >>> >>> >> >