This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1a9345e7b1 [core] test: add tests for compression (#6145)
1a9345e7b1 is described below
commit 1a9345e7b1755a3732808c2dc13725c2f54bb85d
Author: jerry <[email protected]>
AuthorDate: Tue Aug 26 15:50:55 2025 +0800
[core] test: add tests for compression (#6145)
---
.../paimon/format/text/HadoopCompressionUtils.java | 12 +-
.../apache/paimon/format/TextCompressionTest.java | 22 ++
.../paimon/format/json/JsonCompressionTest.java | 2 -
.../format/text/HadoopCompressionUtilsTest.java | 314 +++++++++++++++++++++
4 files changed, 345 insertions(+), 5 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
index 1ff11bcc22..7ff2f97e17 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
@@ -58,10 +58,16 @@ public class HadoopCompressionUtils {
* @param inputStream The underlying input stream
* @param filePath The file path (used to detect compression from
extension)
* @return Decompressed input stream
+ * @throws IOException If decompression stream creation fails
*/
public static InputStream createDecompressedInputStream(
- SeekableInputStream inputStream, Path filePath) {
+ SeekableInputStream inputStream, Path filePath) throws IOException
{
try {
+ // Handle null filePath gracefully
+ if (filePath == null) {
+ return inputStream;
+ }
+
CompressionCodecFactory codecFactory =
new CompressionCodecFactory(new Configuration(false));
@@ -71,7 +77,7 @@ public class HadoopCompressionUtils {
return codec.createInputStream(inputStream);
}
return inputStream;
- } catch (Exception e) {
+ } catch (Exception | UnsatisfiedLinkError e) {
throw new RuntimeException("Failed to create decompression
stream", e);
}
}
@@ -97,7 +103,7 @@ public class HadoopCompressionUtils {
(CompressionCodec)
codecClass.getDeclaredConstructor().newInstance();
codec.createOutputStream(new java.io.ByteArrayOutputStream());
return Optional.of(codec);
- } catch (Exception e) {
+ } catch (Exception | UnsatisfiedLinkError e) {
throw new RuntimeException("Failed to get compression codec", e);
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
index e9e8fbc063..92bb45b222 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -42,6 +43,7 @@ import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base class for compression tests across different file formats. */
public abstract class TextCompressionTest {
@@ -65,6 +67,26 @@ public abstract class TextCompressionTest {
/** Returns the file extension for the format. */
protected abstract String getFormatExtension();
+ /**
+ * Test case for when a file has a compression extension but the
corresponding compression codec
+ * is not available or cannot be found.
+ */
+ @Test
+ void testWriteFileWithCompressionExtensionButCompressionNotFound() {
+ String fileName = "test_unsupported." + getFormatExtension() + ".xyz";
+ Options options = new Options();
+ options.set(CoreOptions.FILE_COMPRESSION, "xyz"); // Non-existent
compression type
+
+ FileFormat format = createFileFormat(options);
+ Path filePath = new Path(tempDir.resolve(fileName).toString());
+ FileIO fileIO = new LocalFileIO();
+
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ assertThatThrownBy(
+ () ->
writerFactory.create(fileIO.newOutputStream(filePath, false), "xyz"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
@Disabled // TODO fix dependencies
@ParameterizedTest(name = "compression = {0}")
@EnumSource(HadoopCompressionType.class)
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
index 0e6deca8ff..451a721df3 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
@@ -59,8 +59,6 @@ class JsonCompressionTest extends TextCompressionTest {
void testJsonCompressionWithComplexData() throws IOException {
// Test with complex JSON structures and different compression formats
testCompressionRoundTrip(HadoopCompressionType.GZIP.value(),
"test_complex_gzip.json.gz");
- testCompressionRoundTrip(
- HadoopCompressionType.DEFLATE.value(),
"test_complex_deflate.json.deflate");
testCompressionRoundTrip(HadoopCompressionType.NONE.value(),
"test_complex_none.json");
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
new file mode 100644
index 0000000000..0ff9ecd10e
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.format.HadoopCompressionType;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HadoopCompressionUtils}. */
+class HadoopCompressionUtilsTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private static final String TEST_DATA = "This is test data for
compression.";
+
+ @Test
+ void testCreateCompressedOutputStreamWithNoneCompression() throws
IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ TestPositionOutputStream positionOutputStream =
+ new TestPositionOutputStream(byteArrayOutputStream);
+
+ OutputStream result =
+ HadoopCompressionUtils.createCompressedOutputStream(
+ positionOutputStream,
HadoopCompressionType.NONE.value());
+
+ assertThat(result).isSameAs(positionOutputStream);
+ }
+
+ @Test
+ void testCreateCompressedOutputStreamWithInvalidCompression() {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ TestPositionOutputStream positionOutputStream =
+ new TestPositionOutputStream(byteArrayOutputStream);
+
+ assertThatThrownBy(
+ () ->
+
HadoopCompressionUtils.createCompressedOutputStream(
+ positionOutputStream, "invalid"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testCreateDecompressedInputStreamWithNoCompression() throws
IOException {
+ byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+ TestSeekableInputStream seekableInputStream = new
TestSeekableInputStream(testData);
+ Path filePath = new Path("test.txt");
+
+ InputStream result =
+
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream,
filePath);
+
+ // For uncompressed files, should return the original stream
+ assertThat(result).isSameAs(seekableInputStream);
+ }
+
+ @Test
+ void testCreateDecompressedInputStreamWithGzipFile() throws IOException {
+ byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+ TestSeekableInputStream seekableInputStream = new
TestSeekableInputStream(testData);
+ Path filePath = new Path("test.txt.gz");
+
+ InputStream result =
+
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream,
filePath);
+
+ assertThat(result).isNotNull();
+ // For compressed files, should return a different stream
(decompression wrapper)
+ // Note: The actual decompression behavior depends on Hadoop codecs
being available
+ }
+
+ @ParameterizedTest
+ @EnumSource(HadoopCompressionType.class)
+ void testCreateCompressedOutputStreamWithAvailableCompressions(
+ HadoopCompressionType compressionType) throws IOException {
+ if (compressionType.hadoopCodecClassName() == null) {
+ return; // Skip types without codec class names
+ }
+
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ TestPositionOutputStream positionOutputStream =
+ new TestPositionOutputStream(byteArrayOutputStream);
+
+ try {
+ OutputStream compressedStream =
+ HadoopCompressionUtils.createCompressedOutputStream(
+ positionOutputStream, compressionType.value());
+
+ assertThat(compressedStream).isNotNull();
+
+ // Write and close to ensure compression happens
+ compressedStream.write(TEST_DATA.getBytes(StandardCharsets.UTF_8));
+ compressedStream.close();
+
+ // Verify that some data was written
+ assertThat(byteArrayOutputStream.toByteArray()).isNotEmpty();
+ } catch (Exception e) {
+ // Skip compression type if not available
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HadoopCompressionType.class)
+ void testCreateDecompressedInputStreamWithAvailableExtensions(
+ HadoopCompressionType compressionType) throws IOException {
+ if (compressionType.fileExtension() == null) {
+ return; // Skip types without file extensions
+ }
+
+ byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+ TestSeekableInputStream seekableInputStream = new
TestSeekableInputStream(testData);
+ Path filePath = new Path("test.txt." +
compressionType.fileExtension());
+
+ try {
+ InputStream result =
+ HadoopCompressionUtils.createDecompressedInputStream(
+ seekableInputStream, filePath);
+ assertThat(result).isNotNull();
+ } catch (Exception e) {
+ // Skip compression type if not available
+ }
+ }
+
+ @Test
+ void testRoundTripCompressionDecompression() throws IOException {
+ // Test with actual file I/O using GZIP which is most commonly
available
+ java.nio.file.Path testFile = tempDir.resolve("test_roundtrip.txt.gz");
+ LocalFileIO fileIO = new LocalFileIO();
+ Path paimonPath = new Path(testFile.toString());
+
+ // Write compressed data
+ try (PositionOutputStream outputStream =
fileIO.newOutputStream(paimonPath, false);
+ OutputStream compressedStream =
+ HadoopCompressionUtils.createCompressedOutputStream(
+ outputStream,
HadoopCompressionType.GZIP.value())) {
+ compressedStream.write(TEST_DATA.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Verify file was created and has content
+ assertThat(Files.exists(testFile)).isTrue();
+ assertThat(Files.size(testFile)).isGreaterThan(0);
+
+ // Read decompressed data
+ try (SeekableInputStream inputStream =
fileIO.newInputStream(paimonPath);
+ InputStream decompressedStream =
+ HadoopCompressionUtils.createDecompressedInputStream(
+ inputStream, paimonPath)) {
+
+ byte[] buffer = new byte[TEST_DATA.length() * 2]; // Extra space
+ int bytesRead = decompressedStream.read(buffer);
+
+ String decompressedData = new String(buffer, 0, bytesRead,
StandardCharsets.UTF_8);
+ assertThat(decompressedData).isEqualTo(TEST_DATA);
+ }
+ }
+
+ @Test
+ void testCreateCompressedOutputStreamWithNullCompression() {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ TestPositionOutputStream positionOutputStream =
+ new TestPositionOutputStream(byteArrayOutputStream);
+
+ assertThatThrownBy(
+ () ->
+
HadoopCompressionUtils.createCompressedOutputStream(
+ positionOutputStream, null))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testCreateDecompressedInputStreamWithNullPath() {
+ byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+ TestSeekableInputStream seekableInputStream = new
TestSeekableInputStream(testData);
+
+ try {
+ InputStream result =
+
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream, null);
+ // Should handle null path gracefully and return original stream
+ assertThat(result).isNotNull();
+ } catch (IOException e) {
+ // Null path may cause IOException, which is acceptable behavior
+ assertThat(e).hasMessageContaining("Failed to create decompression
stream");
+ }
+ }
+
+ private static class TestPositionOutputStream extends PositionOutputStream
{
+ private final ByteArrayOutputStream delegate;
+ private long position = 0;
+
+ public TestPositionOutputStream(ByteArrayOutputStream delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ delegate.write(b);
+ position++;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ delegate.write(b);
+ position += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ delegate.write(b, off, len);
+ position += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+
+ private static class TestSeekableInputStream extends SeekableInputStream {
+ private final ByteArrayInputStream delegate;
+ private long position = 0;
+
+ public TestSeekableInputStream(byte[] data) {
+ this.delegate = new ByteArrayInputStream(data);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ delegate.reset();
+ long skipped = delegate.skip(pos);
+ if (skipped != pos) {
+ throw new IOException("Could not seek to position " + pos);
+ }
+ position = pos;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = delegate.read();
+ if (result != -1) {
+ position++;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ int result = delegate.read(b);
+ if (result != -1) {
+ position += result;
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = delegate.read(b, off, len);
+ if (result != -1) {
+ position += result;
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+}