[ 
https://issues.apache.org/jira/browse/SPARK-57135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshat Shenoi updated SPARK-57135:
----------------------------------
    Description: 
h2. Problem

V1 {{FileFormat}} implementations (CSV, JSON, Parquet, ORC, etc.) are not 
archive-aware: if a user points a datasource reader at a {{{}.tar{}}}, 
{{{}.tar.gz{}}}, or {{.tgz}} file, Spark treats it as a single opaque file and 
either fails or returns garbage instead of reading the entries inside.

A common ingestion pattern stores many small files inside tar archives to 
reduce namespace pressure. Today there is no way to read these without first 
unpacking them externally.
h2. Proposed Solution

Add an {{ArchiveFormat}} utility object in 
{{org.apache.spark.sql.execution.datasources}} and hook it into the V1 scan 
pipeline:
 * {*}{{ArchiveFormat.readArchive}}{*}: at scan time, materializes one tar 
entry at a time to a local temp file and invokes the caller-supplied {{readFn}} 
against a synthetic {{PartitionedFile}} pointing at that temp file. Only one 
entry's bytes live on disk per task; the temp dir is cleaned up on iterator 
close and on task completion.
 * {*}{{ArchiveFormat.expandArchives}}{*}: at schema-inference time 
(driver-side), does the same materialization and substitutes the resulting 
{{{}FileStatuses into inferSchema{}}}.
 * {*}{{ArchiveFormat.isArchivePath}}{*}: detects {{{}.tar{}}}, 
{{{}.tar.gz{}}}, and {{.tgz}} extensions.
 * Entries whose basename starts with {{.}} are skipped (covers macOS 
AppleDouble sidecars, {{{}.DS_Store{}}}, etc.).
 * Gzip handling: Hadoop's {{CompressionCodecFactory}} auto-decompresses 
{{.tar.gz}} via {{{}CodecStreams{}}}; {{.tgz}} is not a registered Hadoop codec 
extension so the gzip layer is unwrapped explicitly with 
{{{}GZIPInputStream{}}}.

Materializing to disk (rather than streaming) means formats that need random 
access (Parquet/ORC footers) work without modification.

The feature is gated behind {{spark.sql.files.archive.enabled}} (default 
{{{}false{}}}).
h2. Integration Points
 # {{{}PartitionedFileUtil.splitFiles{}}}: archive paths forced to a single 
split.
 # {{{}FileScanRDD.readCurrentFile{}}}: archive paths routed through 
{{{}ArchiveFormat.readArchive{}}}.
 # {{{}DataSource.resolve{}}}: both {{inferSchema}} call sites expand archives 
before delegating to the format.

  was:
h2. Problem

V1 {{FileFormat}} implementations (CSV, JSON, Parquet, ORC, etc.) are not 
archive-aware: if a user points a datasource reader at a {{.tar}}, {{.tar.gz}}, 
or {{.tgz}} file, Spark treats it as a single opaque file and either fails or 
returns garbage instead of reading the entries inside.

A common ingestion pattern stores many small files inside tar archives to 
reduce namespace pressure. Today there is no way to read these without first 
unpacking them externally.

h2. Proposed Solution

Add an {{ArchiveFormat}} utility object in 
{{org.apache.spark.sql.execution.datasources}} and hook it into the V1 scan 
pipeline:

* *{{ArchiveFormat.readArchive}}*: at scan time, materializes one tar entry at 
a time to a local temp file and invokes the caller-supplied {{readFn}} against 
a synthetic {{PartitionedFile}} pointing at that temp file. Only one entry's 
bytes live on disk per task; the temp dir is cleaned up on iterator close and 
on task completion.
* *{{ArchiveFormat.expandArchives}}*: at schema-inference time (driver-side), 
does the same materialization and substitutes the resulting {{FileStatus}}es 
into {{inferSchema}}.
* *{{ArchiveFormat.isArchivePath}}*: detects {{.tar}}, {{.tar.gz}}, and 
{{.tgz}} extensions.
* Entries whose basename starts with {{.}} are skipped (covers macOS 
AppleDouble sidecars, {{.DS_Store}}, etc.).
* Gzip handling: Hadoop's {{CompressionCodecFactory}} auto-decompresses 
{{.tar.gz}} via {{CodecStreams}}; {{.tgz}} is not a registered Hadoop codec 
extension so the gzip layer is unwrapped explicitly with {{GZIPInputStream}}.

Materializing to disk (rather than streaming) means formats that need random 
access (Parquet/ORC footers) work without modification.

The feature is gated behind {{spark.sql.files.archive.enabled}} (default 
{{false}}).

h2. Integration Points

# {{PartitionedFileUtil.splitFiles}}: archive paths forced to a single split.
# {{FileScanRDD.readCurrentFile}}: archive paths routed through 
{{ArchiveFormat.readArchive}}.
# {{DataSource.resolve}}: both {{inferSchema}} call sites expand archives 
before delegating to the format.


> [SQL] Add ArchiveFormat for reading .tar / .tar.gz / .tgz archives as files
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-57135
>                 URL: https://issues.apache.org/jira/browse/SPARK-57135
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.3.0
>            Reporter: Akshat Shenoi
>            Priority: Major
>
> h2. Problem
> V1 {{FileFormat}} implementations (CSV, JSON, Parquet, ORC, etc.) are not 
> archive-aware: if a user points a datasource reader at a {{{}.tar{}}}, 
> {{{}.tar.gz{}}}, or {{.tgz}} file, Spark treats it as a single opaque file 
> and either fails or returns garbage instead of reading the entries inside.
> A common ingestion pattern stores many small files inside tar archives to 
> reduce namespace pressure. Today there is no way to read these without first 
> unpacking them externally.
> h2. Proposed Solution
> Add an {{ArchiveFormat}} utility object in 
> {{org.apache.spark.sql.execution.datasources}} and hook it into the V1 scan 
> pipeline:
>  * {*}{{ArchiveFormat.readArchive}}{*}: at scan time, materializes one tar 
> entry at a time to a local temp file and invokes the caller-supplied 
> {{readFn}} against a synthetic {{PartitionedFile}} pointing at that temp 
> file. Only one entry's bytes live on disk per task; the temp dir is cleaned 
> up on iterator close and on task completion.
>  * {*}{{ArchiveFormat.expandArchives}}{*}: at schema-inference time 
> (driver-side), does the same materialization and substitutes the resulting 
> {{{}FileStatuses into inferSchema{}}}.
>  * {*}{{ArchiveFormat.isArchivePath}}{*}: detects {{{}.tar{}}}, 
> {{{}.tar.gz{}}}, and {{.tgz}} extensions.
>  * Entries whose basename starts with {{.}} are skipped (covers macOS 
> AppleDouble sidecars, {{{}.DS_Store{}}}, etc.).
>  * Gzip handling: Hadoop's {{CompressionCodecFactory}} auto-decompresses 
> {{.tar.gz}} via {{{}CodecStreams{}}}; {{.tgz}} is not a registered Hadoop 
> codec extension so the gzip layer is unwrapped explicitly with 
> {{{}GZIPInputStream{}}}.
> Materializing to disk (rather than streaming) means formats that need random 
> access (Parquet/ORC footers) work without modification.
> The feature is gated behind {{spark.sql.files.archive.enabled}} (default 
> {{{}false{}}}).
> h2. Integration Points
>  # {{{}PartitionedFileUtil.splitFiles{}}}: archive paths forced to a single 
> split.
>  # {{{}FileScanRDD.readCurrentFile{}}}: archive paths routed through 
> {{{}ArchiveFormat.readArchive{}}}.
>  # {{{}DataSource.resolve{}}}: both {{inferSchema}} call sites expand 
> archives before delegating to the format.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to