Hi Everett,

IIRC we added unionAll in Spark 2.0 which is the same implementation as rdd
union. The union in DataFrames with Spark 2.0 does dedeuplication, and
that's why you should be seeing the slowdown.

Best,
Burak

On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <ever...@nuna.com.invalid>
wrote:

> Looks like the Dataset version of union may also fail with the following
> on larger data sets, which again seems like it might be drawing everything
> into the driver for some reason --
>
> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
> 5760, ip-10-8-52-198.us-west-2.compute.internal): 
> java.lang.IllegalArgumentException:
> bound must be positive
> at java.util.Random.nextInt(Random.java:388)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> confChanged(LocalDirAllocator.java:305)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> getLocalPathForWrite(LocalDirAllocator.java:344)
> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.
> createTmpFileForWrite(LocalDirAllocator.java:416)
> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(
> LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(
> S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:176)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:160)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:289)
> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:262)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetOutputWriter.<init>(ParquetFileFormat.scala:562)
> at org.apache.spark.sql.execution.datasources.parquet.
> ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
> at org.apache.spark.sql.execution.datasources.BaseWriterContainer.
> newOutputWriter(WriterContainer.scala:131)
> at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.
> writeRows(WriterContainer.scala:247)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> Hi,
>>
>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>> tables together and save as Parquet to S3, but it seems to take a long
>> time. We're using the S3A FileSystem implementation under the covers, too,
>> if that helps.
>>
>> Watching the Spark UI, the executors all eventually stop (we're using
>> dynamic allocation) but under the SQL tab we can see a "save at
>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>> still running of course, but it may take tens of minutes to finish. It
>> makes me wonder if our data all being collected through the driver.
>>
>> If we instead convert the Datasets to RDDs and call SparkContext.union()
>> it works quickly.
>>
>> Anyone know if this is a known issue?
>>
>>
>

Reply via email to