unsubscribe

2024-02-24 Thread Ameet Kini



Re: job uuid not unique

2024-02-24 Thread Xin Zhang
unsubscribe

On Sat, Feb 17, 2024 at 3:04 AM Рамик И  wrote:

>
> Hi
> I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I
> get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException.
>
> Spark version: 3.5.0
> scala version : 2.13.8
> Cluster: k8s
>
> libraryDependencies
> org.apache.hadoop.hadoop-aws3.3.4
> com.amazonaws.aws-java-sdk-s31.12.600
>
>
>
> code:
> df
> .coalesce(1)
> .write
> .option("fs.s3a.committer.require.uuid", "true")
>  .option("fs.s3a.committer.generate.uuid", "true")
> .option("fs.s3a.committer.name", "magic")
> .option("fs.s3a.committer.magic.enabled", "true")
>  .option("orc.compress", "zlib")
>  .mode(SaveMode.Append)
> .orc(path)
>
>
>
> executor 9
>
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13217, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13217/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217
> 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13217: duration
> 0:00.061s
> 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1
> (TID 13217)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
> executor 10
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID
> 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic
> to output data to s3a://mybucket/test
> 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer
> MagicCommitter{AbstractS3ACommitter{role=Task committer
> attempt_202402161305112153373254688311399_0367_m_00_13216, name=magic,
> outputPath=s3a://mybucket/test,
> workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_00_13216/__base,
> uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid
> source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test
> 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output
> committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216
> 24/02/16 13:05:24 INFO AbstractS3ACommitter: Setup Task
> attempt_202402161305112153373254688311399_0367_m_00_13216: duration
> 0:00.112s
> 24/02/16 13:05:24 ERROR Executor: Exception in task 0.1 in stage 367.1
> (TID 13216)
> org.apache.hadoop.fs.FileAlreadyExistsException:
> s3a://mybucket/test/part-0-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc
> already exists
>
>
>
> how can I fix it ?
>


-- 
Zhang Xin(张欣)
Email:josseph.zh...@gmail.com


Re: AQE coalesce 60G shuffle data into a single partition

2024-02-24 Thread Enrico Minack

Hi Shay,

maybe this is related to the small number of output rows (1,250) of the
last exchange step that consume those 60GB shuffle data.

Looks like your outer transformation is something like

df.groupBy($"id").agg(collect_list($"prop_name"))

Have you tried adding a repartition as an attempt to convince AQE to
exchange into a specific number of partitions?

df.groupBy($"id").agg(collect_list($"prop_name")).repartition(100, $"id")

Can you provide some Spark code that reproduce the issue with synthetic
data and cleansed Spark logic?

Cheers,
Enrico


Am 22.02.24 um 15:14 schrieb Shay Elbaz:

Dear community,

We have noticed that AQE is coalescing a substantial amount of data
(approximately 60GB) into a single partition during query execution.
This behavior is unexpected given the absence of data skew, broadcast,
and the significant size of the shuffle operation.

*Environment Details:*

 *
Apache Spark Version: 3.1.3
 *
Platform: Dataproc 2.0
 *
Executors Configuration: 90GB memory, 15 cores

*Configuration Parameters:* We have examined the relevant
configuration parameters, and tried many different variations, but the
behavior persists. For example:
spark.sql.adaptive.advisoryPartitionSizeInBytes=104857600 //100MB
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000
spark.sql.adaptive.coalescePartitions.minPartitionNum=500
spark.sql.optimizer.dynamicPartitionPruning.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1 // off

*The full plan and diagram from the SQL tab are shown below*

Please advice:

 1. Are there additional configuration parameters or best practices we
should be aware of in such scenarios?
2.
Are there known related issues in 3.1.3? (didn't find any on Jira)


Thanks in advance,
Shay


...