This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f561137ef [core] Add file compression type in file names (#4420)
f561137ef is described below
commit f561137ef2b7a5b9bff497f108bc4eedef55dc15
Author: askwang <[email protected]>
AuthorDate: Mon Nov 4 13:14:02 2024 +0800
[core] Add file compression type in file names (#4420)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++
.../java/org/apache/paimon/AbstractFileStore.java | 4 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 4 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 16 +++-
.../apache/paimon/utils/FileStorePathFactory.java | 12 ++-
.../apache/paimon/append/AppendOnlyWriterTest.java | 4 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 4 +-
.../apache/paimon/io/DataFilePathFactoryTest.java | 8 +-
.../paimon/io/KeyValueFileReadWriteTest.java | 8 +-
.../apache/paimon/io/RollingFileWriterTest.java | 5 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 4 +-
.../apache/paimon/manifest/ManifestFileTest.java | 4 +-
.../apache/paimon/manifest/ManifestListTest.java | 4 +-
.../paimon/utils/FileStorePathFactoryTest.java | 8 +-
.../flink/source/TestChangelogDataReadWrite.java | 4 +-
.../apache/paimon/spark/SparkFileIndexITCase.java | 4 +-
.../org/apache/paimon/spark/SparkWriteITCase.java | 97 ++++++++++++++++++++++
18 files changed, 187 insertions(+), 20 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 84ba86124..d9ac0b99b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -332,6 +332,12 @@ under the License.
<td>Map</td>
<td>Define different file format for different level, you can add
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file
format for level is not provided, the default format which set by `file.format`
will be used.</td>
</tr>
+ <tr>
+ <td><h5>file.suffix.include.compression</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to add file compression type in the file name of data
file and changelog file.</td>
+ </tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index f4f7f41e2..6ba8d70e0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -190,6 +190,13 @@ public class CoreOptions implements Serializable {
.defaultValue("changelog-")
.withDescription("Specify the file name prefix of
changelog files.");
+ public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
+ key("file.suffix.include.compression")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to add file compression type in the file
name of data file and changelog file.");
+
public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
@@ -1591,6 +1598,10 @@ public class CoreOptions implements Serializable {
return options.get(CHANGELOG_FILE_PREFIX);
}
+ public boolean fileSuffixIncludeCompression() {
+ return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
+ }
+
public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 811848453..55c1c72df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -106,7 +106,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.fileFormat().getFormatIdentifier(),
options.dataFilePrefix(),
options.changelogFilePrefix(),
- options.legacyPartitionName());
+ options.legacyPartitionName(),
+ options.fileSuffixIncludeCompression(),
+ options.fileCompression());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 8f2dbbf5f..8a3bf0b0f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -207,7 +207,9 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
format,
options.dataFilePrefix(),
options.changelogFilePrefix(),
- options.legacyPartitionName())));
+ options.legacyPartitionName(),
+ options.fileSuffixIncludeCompression(),
+ options.fileCompression())));
return pathFactoryMap;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 742888b36..b632d44c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -39,18 +39,24 @@ public class DataFilePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
+ private final boolean fileSuffixIncludeCompression;
+ private final String fileCompression;
public DataFilePathFactory(
Path parent,
String formatIdentifier,
String dataFilePrefix,
- String changelogFilePrefix) {
+ String changelogFilePrefix,
+ boolean fileSuffixIncludeCompression,
+ String fileCompression) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
+ this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
+ this.fileCompression = fileCompression;
}
public Path newPath() {
@@ -62,7 +68,13 @@ public class DataFilePathFactory {
}
private Path newPath(String prefix) {
- String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "."
+ formatIdentifier;
+ String extension;
+ if (fileSuffixIncludeCompression) {
+ extension = "." + fileCompression + "." + formatIdentifier;
+ } else {
+ extension = "." + formatIdentifier;
+ }
+ String name = prefix + uuid + "-" + pathCount.getAndIncrement() +
extension;
return new Path(parent, name);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 612565c72..fcdc4634d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -43,6 +43,8 @@ public class FileStorePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
+ private final boolean fileSuffixIncludeCompression;
+ private final String fileCompression;
private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
@@ -57,7 +59,9 @@ public class FileStorePathFactory {
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix,
- boolean legacyPartitionName) {
+ boolean legacyPartitionName,
+ boolean fileSuffixIncludeCompression,
+ String fileCompression) {
this.root = root;
this.uuid = UUID.randomUUID().toString();
@@ -66,6 +70,8 @@ public class FileStorePathFactory {
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
+ this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
+ this.fileCompression = fileCompression;
this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
@@ -113,7 +119,9 @@ public class FileStorePathFactory {
bucketPath(partition, bucket),
formatIdentifier,
dataFilePrefix,
- changelogFilePrefix);
+ changelogFilePrefix,
+ fileSuffixIncludeCompression,
+ fileCompression);
}
public Path bucketPath(BinaryRow partition, int bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 04b5fa6e6..a9012ed89 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -522,7 +522,9 @@ public class AppendOnlyWriterTest {
new Path(tempDir + "/dt=" + PART + "/bucket-0"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
}
private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 439fc4999..c29519ce8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -70,7 +70,9 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
new Path(tempDir + "/dt=1/bucket-1"),
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
FileFormat fileFormat = FileFormat.fromIdentifier(format, new
Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
CoreOptions options = new CoreOptions(new HashMap<>());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
index 609566258..d36966c55 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
@@ -38,7 +38,9 @@ public class DataFilePathFactoryTest {
new Path(tempDir + "/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();
for (int i = 0; i < 20; i++) {
@@ -64,7 +66,9 @@ public class DataFilePathFactoryTest {
new Path(tempDir + "/dt=20211224/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();
for (int i = 0; i < 20; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index de593e80b..52d56afad 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -229,7 +229,9 @@ public class KeyValueFileReadWriteTest {
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
@@ -246,7 +248,9 @@ public class KeyValueFileReadWriteTest {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()));
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue()));
return KeyValueFileWriterFactory.builder(
fileIO,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 9bee36b7c..9e1de7145 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -81,7 +81,10 @@ public class RollingFileWriterTest {
.toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX
-
.defaultValue())
+
.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION
+
.defaultValue(),
+
CoreOptions.FILE_COMPRESSION.defaultValue())
.newPath(),
SCHEMA,
fileFormat
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 87a6f2e45..5e69035ca 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -147,7 +147,9 @@ public abstract class ManifestFileMetaTestBase {
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()),
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue()),
Long.MAX_VALUE,
null)
.create();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 30e8e7b20..34cca41e6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -103,7 +103,9 @@ public class ManifestFileTest {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index f4703c1ff..ce4f7b807 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -107,7 +107,9 @@ public class ManifestListTest {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd",
pathFactory, null)
.create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 26bbd28e4..d4d45b312 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -88,7 +88,9 @@ public class FileStorePathFactoryTest {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory,
"/dt=20211224/hr=default");
@@ -126,6 +128,8 @@ public class FileStorePathFactoryTest {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
- CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+ CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 762a14ea1..85679e5fd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -108,7 +108,9 @@ public class TestChangelogDataReadWrite {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new
Path(root));
this.commitUser = UUID.randomUUID().toString();
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index d82f92af5..2251619c4 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -131,7 +131,9 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue());
Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 0ff104eeb..b0d5b380c 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -296,6 +296,9 @@ public class SparkWriteITCase {
for (String fileName : fileNames) {
Assertions.assertTrue(fileName.startsWith("test-"));
}
+
+ // reset config, it will affect other tests
+ spark.conf().unset("spark.paimon.data-file.prefix");
}
@Test
@@ -317,6 +320,9 @@ public class SparkWriteITCase {
spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
FileStatus[] files2 = fileIO.listStatus(new Path(tabLocation,
"bucket-0"));
Assertions.assertEquals(1, dataFileCount(files2, "test-changelog-"));
+
+ // reset config, it will affect other tests
+ spark.conf().unset("spark.paimon.changelog-file.prefix");
}
@Test
@@ -333,6 +339,97 @@ public class SparkWriteITCase {
Assertions.assertTrue(fileIO.exists(new Path(tabLocation,
"c=aa/_SUCCESS")));
}
+ @Test
+ public void testDataFileSuffixName() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING)"
+ + " TBLPROPERTIES ("
+ + "'bucket' = '1', "
+ + "'primary-key'='a', "
+ + "'write-only' = 'true', "
+ + "'file.format' = 'parquet', "
+ + "'file.compression' = 'zstd')");
+
+ spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
+ spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
+
+ // enable file suffix
+ spark.conf().set("spark.paimon.file.suffix.include.compression", true);
+ spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
+ spark.sql("INSERT INTO T VALUES (4, 4, 'dd')");
+
+ List<Row> data2 = spark.sql("SELECT * FROM T order by
a").collectAsList();
+ assertThat(data2.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc],
[4,4,dd]]");
+
+ // check files suffix name
+ List<String> files =
+ spark.sql("select file_path from
`T$files`").collectAsList().stream()
+ .map(x -> x.getString(0))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(4, files.size());
+
+ String defaultExtension = "." + "parquet";
+ String newExtension = "." + "zstd" + "." + "parquet";
+ // two data files end with ".parquet", two data file end with
".zstd.parquet"
+ Assertions.assertEquals(
+ 2,
+ files.stream()
+ .filter(
+ name ->
+ name.endsWith(defaultExtension)
+ &&
!name.endsWith(newExtension))
+ .count());
+ Assertions.assertEquals(
+ 2, files.stream().filter(name ->
name.endsWith(newExtension)).count());
+
+ // reset config
+ spark.conf().unset("spark.paimon.file.suffix.include.compression");
+ }
+
+ @Test
+ public void testChangelogFileSuffixName() throws Exception {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) "
+ + "TBLPROPERTIES ("
+ + "'primary-key'='a', "
+ + "'bucket' = '1', "
+ + "'changelog-producer' = 'lookup', "
+ + "'file.format' = 'parquet', "
+ + "'file.compression' = 'zstd')");
+
+ FileStoreTable table = getTable("T");
+ Path tabLocation = table.location();
+ FileIO fileIO = table.fileIO();
+
+ spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
+
+ spark.conf().set("spark.paimon.file.suffix.include.compression", true);
+ spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
+
+ // collect changelog files
+ List<String> files =
+ Arrays.stream(fileIO.listStatus(new Path(tabLocation,
"bucket-0")))
+ .map(name -> name.getPath().getName())
+ .filter(name -> name.startsWith("changelog-"))
+ .collect(Collectors.toList());
+ String defaultExtension = "." + "parquet";
+ String newExtension = "." + "zstd" + "." + "parquet";
+ // one changelog file end with ".parquet", one changelog file end with
".zstd.parquet"
+ Assertions.assertEquals(
+ 1,
+ files.stream()
+ .filter(
+ name ->
+ name.endsWith(defaultExtension)
+ &&
!name.endsWith(newExtension))
+ .count());
+ Assertions.assertEquals(
+ 1, files.stream().filter(name ->
name.endsWith(newExtension)).count());
+
+ // reset config
+ spark.conf().unset("spark.paimon.file.suffix.include.compression");
+ }
+
protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),