This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new 794f6d834c Improves the Parquet File Output transform and adds test
coverage for the output package (#7171)
794f6d834c is described below
commit 794f6d834ccb0fa4828815f9c5ea2e9cbdcf4a28
Author: Lance <[email protected]>
AuthorDate: Tue May 26 20:46:27 2026 +0800
Improves the Parquet File Output transform and adds test coverage for the
output package (#7171)
* improves the Parquet File Output transform and adds test coverage for the
output package
Signed-off-by: lance <[email protected]>
* fix parquest junit test fail.
Signed-off-by: lance <[email protected]>
---------
Signed-off-by: lance <[email protected]>
---
.../apache/hop/core/vfs/HopVfsProvidersTest.java | 27 ++--
.../parquet/transforms/output/ParquetOutput.java | 51 +++---
.../transforms/output/ParquetOutputData.java | 5 +-
.../transforms/output/ParquetOutputDialog.java | 7 +-
.../output/messages/messages_en_US.properties | 2 +-
.../output/messages/messages_zh_CN.properties | 2 +-
.../transforms/output/ParquetFieldTest.java | 58 +++++++
.../transforms/output/ParquetOutputDataTest.java | 48 ++++++
.../transforms/output/ParquetOutputFileTest.java | 55 +++++++
.../transforms/output/ParquetOutputMetaTest.java | 140 ++++++++++++++++
.../transforms/output/ParquetOutputStreamTest.java | 66 ++++++++
.../transforms/output/ParquetOutputTest.java | 178 +++++++++++++++++++++
.../transforms/output/ParquetResourceFileTest.java | 104 ++++++++++++
.../parquet/transforms/output/ParquetTestUtil.java | 94 +++++++++++
.../transforms/output/ParquetVersionTest.java | 47 ++++++
.../transforms/output/ParquetWriteSupportTest.java | 89 +++++++++++
.../output/ParquetWriterBuilderTest.java | 50 ++++++
.../test/resources/hello.parquet-00-0001.parquet | Bin 0 -> 860 bytes
18 files changed, 987 insertions(+), 36 deletions(-)
diff --git
a/core/src/test/java/org/apache/hop/core/vfs/HopVfsProvidersTest.java
b/core/src/test/java/org/apache/hop/core/vfs/HopVfsProvidersTest.java
index 042a03b802..a6988ce907 100644
--- a/core/src/test/java/org/apache/hop/core/vfs/HopVfsProvidersTest.java
+++ b/core/src/test/java/org/apache/hop/core/vfs/HopVfsProvidersTest.java
@@ -20,7 +20,6 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
@@ -90,7 +89,9 @@ class HopVfsProvidersTest {
writeBytes(url, "file-content".getBytes(StandardCharsets.UTF_8));
assertEquals("file-content", readString(url));
assertTrue(HopVfs.fileExists(url));
- HopVfs.getFileObject(url).delete();
+ try (FileObject obj = HopVfs.getFileObject(url)) {
+ obj.delete();
+ }
}
@Test
@@ -122,9 +123,10 @@ class HopVfsProvidersTest {
void resProviderReadsClasspathResource() throws Exception {
// Use a resource that's guaranteed to be on the test classpath.
String url = "res:" + getClass().getName().replace('.', '/') + ".class";
- FileObject obj = HopVfs.getFileObject(url);
- assertTrue(obj.exists(), url + " should exist on the classpath");
- assertTrue(obj.getContent().getSize() > 0);
+ try (FileObject obj = HopVfs.getFileObject(url)) {
+ assertTrue(obj.exists(), url + " should exist on the classpath");
+ assertTrue(obj.getContent().getSize() > 0);
+ }
}
@Test
@@ -181,7 +183,8 @@ class HopVfsProvidersTest {
out.write(payload);
}
String url = "gz:" + gz.toUri();
- try (InputStream in =
HopVfs.getFileObject(url).getContent().getInputStream()) {
+ try (FileObject obj = HopVfs.getFileObject(url);
+ InputStream in = obj.getContent().getInputStream()) {
assertArrayEquals(payload, in.readAllBytes());
}
}
@@ -197,7 +200,8 @@ class HopVfsProvidersTest {
out.write(payload);
}
String url = "bz2:" + bz2.toUri();
- try (InputStream in =
HopVfs.getFileObject(url).getContent().getInputStream()) {
+ try (FileObject obj = HopVfs.getFileObject(url);
+ InputStream in = obj.getContent().getInputStream()) {
assertArrayEquals(payload, in.readAllBytes());
}
}
@@ -241,17 +245,16 @@ class HopVfsProvidersTest {
}
private static void writeBytes(String url, byte[] bytes) throws Exception {
- FileObject obj = HopVfs.getFileObject(url);
- try (OutputStream out = obj.getContent().getOutputStream()) {
+ try (FileObject obj = HopVfs.getFileObject(url);
+ OutputStream out = obj.getContent().getOutputStream()) {
out.write(bytes);
}
}
private static String readString(String url) throws Exception {
- try (InputStream in =
HopVfs.getFileObject(url).getContent().getInputStream()) {
+ try (FileObject obj = HopVfs.getFileObject(url);
+ InputStream in = obj.getContent().getInputStream()) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw e;
}
}
}
diff --git
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
index d2917a6ffd..3e07a38f6d 100644
---
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
+++
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutput.java
@@ -94,15 +94,7 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
if (first) {
first = false;
- data.sourceFieldIndexes = new ArrayList<>();
- for (int i = 0; i < meta.getFields().size(); i++) {
- ParquetField field = meta.getFields().get(i);
- int index = getInputRowMeta().indexOfValue(field.getSourceFieldName());
- if (index < 0) {
- throw new HopException("Unable to find source field '" +
field.getSourceFieldName());
- }
- data.sourceFieldIndexes.add(index);
- }
+ resolveOutputFields();
openNewFile();
}
@@ -141,10 +133,10 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
int idx = data.sourceFieldIndexes.get(i);
Object value = parquetRow[idx];
if (getInputRowMeta().getValueMeta(idx).getType() ==
IValueMeta.TYPE_TIMESTAMP) {
- if (value instanceof java.util.Date) {
- parquetRow[idx] = ((java.util.Date) value).getTime();
- } else if (value instanceof byte[]) {
- String dateStr = new String((byte[]) value,
StandardCharsets.UTF_8);
+ if (value instanceof java.util.Date date) {
+ parquetRow[idx] = date.getTime();
+ } else if (value instanceof byte[] bytes) {
+ String dateStr = new String(bytes, StandardCharsets.UTF_8);
SimpleDateFormat sdf =
new
SimpleDateFormat(parquetRowMeta.getValueMeta(idx).getFormatMask());
Date date = sdf.parse(dateStr);
@@ -165,7 +157,6 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
}
private void openNewFile() throws HopException {
-
data.splitRowCount = 0;
data.split++;
@@ -187,9 +178,8 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
SchemaBuilder.record("ApacheHopParquetSchema").fields();
// Build the Parquet Schema
- //
- for (int i = 0; i < meta.getFields().size(); i++) {
- ParquetField field = meta.getFields().get(i);
+ for (int i = 0; i < data.outputFields.size(); i++) {
+ ParquetField field = data.outputFields.get(i);
IValueMeta valueMeta =
getInputRowMeta().getValueMeta(data.sourceFieldIndexes.get(i));
// Start a new field
@@ -265,7 +255,7 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
data.avroSchema,
data.outputFile,
data.sourceFieldIndexes,
- meta.getFields())
+ data.outputFields)
.withPageSize(data.pageSize)
.withDictionaryPageSize(data.dictionaryPageSize)
.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED)
@@ -280,6 +270,31 @@ public class ParquetOutput extends
BaseTransform<ParquetOutputMeta, ParquetOutpu
}
}
+ void resolveOutputFields() throws HopException {
+ data.outputFields = new ArrayList<>();
+ data.sourceFieldIndexes = new ArrayList<>();
+
+ if (meta.getFields() == null || meta.getFields().isEmpty()) {
+ IRowMeta inputRowMeta = getInputRowMeta();
+ for (int i = 0; i < inputRowMeta.size(); i++) {
+ String fieldName = inputRowMeta.getValueMeta(i).getName();
+ data.outputFields.add(new ParquetField(fieldName, fieldName));
+ data.sourceFieldIndexes.add(i);
+ }
+ return;
+ }
+
+ for (ParquetField field : meta.getFields()) {
+ int index = getInputRowMeta().indexOfValue(field.getSourceFieldName());
+ if (index < 0) {
+ throw new HopException("Unable to find source field '" +
field.getSourceFieldName() + "'");
+ }
+ String targetFieldName = Const.NVL(field.getTargetFieldName(),
field.getSourceFieldName());
+ data.outputFields.add(new ParquetField(field.getSourceFieldName(),
targetFieldName));
+ data.sourceFieldIndexes.add(index);
+ }
+ }
+
private String buildFilename(Date date) {
String filename = resolve(meta.getFilenameBase());
if (meta.isFilenameIncludingDate()) {
diff --git
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputData.java
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputData.java
index 9513f96daa..ce3573cc72 100644
---
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputData.java
+++
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputData.java
@@ -18,7 +18,7 @@
package org.apache.hop.parquet.transforms.output;
import java.io.OutputStream;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hop.core.RowMetaAndData;
@@ -30,7 +30,8 @@ import org.apache.parquet.hadoop.ParquetWriter;
@SuppressWarnings("java:S1104")
public class ParquetOutputData extends BaseTransformData implements
ITransformData {
- public ArrayList<Integer> sourceFieldIndexes;
+ public List<Integer> sourceFieldIndexes;
+ public List<ParquetField> outputFields;
public Configuration conf;
public ParquetProperties props;
public String filename;
diff --git
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputDialog.java
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputDialog.java
index 0ad776be0d..2fac6d2579 100644
---
a/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputDialog.java
+++
b/plugins/tech/parquet/src/main/java/org/apache/hop/parquet/transforms/output/ParquetOutputDialog.java
@@ -102,22 +102,25 @@ public class ParquetOutputDialog extends
BaseTransformDialog {
wlFilenameBase.setText(BaseMessages.getString(PKG,
"ParquetOutputDialog.FilenameBase.Label"));
PropsUi.setLook(wlFilenameBase);
FormData fdlFilenameBase = new FormData();
+ fdlFilenameBase.left = new FormAttachment(0, 0);
+ fdlFilenameBase.top = new FormAttachment(0, margin);
fdlFilenameBase.right = new FormAttachment(middle, -margin);
wlFilenameBase.setLayoutData(fdlFilenameBase);
wFilenameBase = new TextVar(variables, wFileGroup, SWT.SINGLE | SWT.LEFT |
SWT.BORDER);
PropsUi.setLook(wFilenameBase);
FormData fdFilenameBase = new FormData();
fdFilenameBase.left = new FormAttachment(middle, 0);
+ fdFilenameBase.top = new FormAttachment(wlFilenameBase, 0, SWT.CENTER);
fdFilenameBase.right = new FormAttachment(90, 0);
wFilenameBase.setLayoutData(fdFilenameBase);
lastControl = wFilenameBase;
- Button wbFilename = new Button(wFileGroup, SWT.PUSH);
+ Button wbFilename = new Button(wFileGroup, SWT.PUSH | SWT.CENTER);
PropsUi.setLook(wbFilename);
wbFilename.setText(BaseMessages.getString(PKG, "System.Button.Browse"));
FormData fdbFilename = new FormData();
fdbFilename.left = new FormAttachment(wFilenameBase, 0);
- fdbFilename.top = new FormAttachment(0, -margin);
+ fdbFilename.top = new FormAttachment(wFilenameBase, 0, SWT.CENTER);
wbFilename.setLayoutData(fdbFilename);
wbFilename.addListener(
diff --git
a/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_en_US.properties
b/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_en_US.properties
index 92631f3239..8be6539fa4 100644
---
a/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_en_US.properties
+++
b/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_en_US.properties
@@ -20,7 +20,7 @@ ParquetOutput.Name=Parquet File Output
ParquetOutputDialog.CompressionCodec.Label=Compression codec
ParquetOutputDialog.DataPageSize.Label=Data page size
ParquetOutputDialog.DictionaryPageSize.Label=Dictionary page size
-ParquetOutputDialog.Fields.Label=Fields
+ParquetOutputDialog.Fields.Label=Fields (leave empty to output all input
fields)
ParquetOutputDialog.FieldsColumn.SourceField.Label=Source field
ParquetOutputDialog.FieldsColumn.TargetField.Label=Target field
ParquetOutputDialog.FilenameBase.Label=Base file name
diff --git
a/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_zh_CN.properties
b/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_zh_CN.properties
index 49081cf91b..c5ac0fcb93 100644
---
a/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_zh_CN.properties
+++
b/plugins/tech/parquet/src/main/resources/org/apache/hop/parquet/transforms/output/messages/messages_zh_CN.properties
@@ -22,7 +22,7 @@ ParquetOutput.Name=Parquet \u8F93\u51FA
ParquetOutputDialog.CompressionCodec.Label=\u538B\u7F29\u7F16\u7801\:
ParquetOutputDialog.DataPageSize.Label=\u6570\u636E\u9875\u5927\u5C0F\:
ParquetOutputDialog.DictionaryPageSize.Label=\u5B57\u5178\u9875\u5927\u5C0F\:
-ParquetOutputDialog.Fields.Label=\u5B57\u6BB5\:
+ParquetOutputDialog.Fields.Label=\u5B57\u6BB5\uFF08\u7559\u7A7A\u5219\u8F93\u51FA\u6240\u6709\u8F93\u5165\u5B57\u6BB5\uFF09
ParquetOutputDialog.FieldsColumn.SourceField.Label=\u6E90\u5B57\u6BB5
ParquetOutputDialog.FieldsColumn.TargetField.Label=\u76EE\u6807\u5B57\u6BB5
ParquetOutputDialog.FilenameBase.Label=\u6587\u4EF6\u540D(\u4E0D\u5E26\u6269\u5C55\u540D)\:
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetFieldTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetFieldTest.java
new file mode 100644
index 0000000000..672b9cbe20
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetFieldTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetField} */
+class ParquetFieldTest {
+
+ @Test
+ void testDefaultConstructor() {
+ ParquetField field = new ParquetField();
+ assertNull(field.getSourceFieldName());
+ assertNull(field.getTargetFieldName());
+ }
+
+ @Test
+ void testConstructorWithNames() {
+ ParquetField field = new ParquetField("source", "target");
+ assertEquals("source", field.getSourceFieldName());
+ assertEquals("target", field.getTargetFieldName());
+ }
+
+ @Test
+ void testCopyConstructor() {
+ ParquetField original = new ParquetField("source", "target");
+ ParquetField copy = new ParquetField(original);
+ assertEquals("source", copy.getSourceFieldName());
+ assertEquals("target", copy.getTargetFieldName());
+ }
+
+ @Test
+ void testSetters() {
+ ParquetField field = new ParquetField();
+ field.setSourceFieldName("in");
+ field.setTargetFieldName("out");
+ assertEquals("in", field.getSourceFieldName());
+ assertEquals("out", field.getTargetFieldName());
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputDataTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputDataTest.java
new file mode 100644
index 0000000000..cacc36ce82
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputDataTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetOutputData} */
+class ParquetOutputDataTest {
+
+ @Test
+ void testDefaultValues() {
+ ParquetOutputData data = new ParquetOutputData();
+ assertNull(data.sourceFieldIndexes);
+ assertNull(data.outputFields);
+ assertNull(data.conf);
+ assertNull(data.props);
+ assertNull(data.filename);
+ assertNull(data.outputStream);
+ assertNull(data.countingStream);
+ assertNull(data.outputFile);
+ assertNull(data.writer);
+ assertEquals(0, data.split);
+ assertEquals(0, data.splitRowCount);
+ assertEquals(0, data.maxSplitSizeRows);
+ assertEquals(0, data.rowGroupSize);
+ assertEquals(0, data.pageSize);
+ assertEquals(0, data.dictionaryPageSize);
+ assertNull(data.avroSchema);
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputFileTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputFileTest.java
new file mode 100644
index 0000000000..5f1dbde077
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputFileTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import java.io.ByteArrayOutputStream;
+import org.apache.parquet.io.PositionOutputStream;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetOutputFile} */
+class ParquetOutputFileTest {
+
+ @Test
+ void testCreateReturnsPositionOutputStream() {
+ ByteArrayOutputStream delegate = new ByteArrayOutputStream();
+ ParquetOutputFile outputFile = new ParquetOutputFile(delegate);
+
+ PositionOutputStream stream = outputFile.create(0);
+ assertInstanceOf(ParquetOutputStream.class, stream);
+ }
+
+ @Test
+ void testCreateOrOverwriteReturnsPositionOutputStream() {
+ ByteArrayOutputStream delegate = new ByteArrayOutputStream();
+ ParquetOutputFile outputFile = new ParquetOutputFile(delegate);
+
+ PositionOutputStream stream = outputFile.createOrOverwrite(0);
+ assertInstanceOf(ParquetOutputStream.class, stream);
+ }
+
+ @Test
+ void testBlockSizeSupport() {
+ ParquetOutputFile outputFile = new ParquetOutputFile(new
ByteArrayOutputStream());
+ assertFalse(outputFile.supportsBlockSize());
+ assertEquals(0, outputFile.defaultBlockSize());
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputMetaTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputMetaTest.java
new file mode 100644
index 0000000000..b0bdf44f15
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputMetaTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.hop.core.xml.XmlHandler;
+import org.apache.hop.metadata.serializer.memory.MemoryMetadataProvider;
+import org.apache.hop.metadata.serializer.xml.XmlMetadataUtil;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetOutputMeta} */
+class ParquetOutputMetaTest {
+
+ @Test
+ void testDefaultValues() {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ assertEquals("yyyyMMdd-HHmmss", meta.getFilenameDateTimeFormat());
+ assertEquals(CompressionCodecName.UNCOMPRESSED,
meta.getCompressionCodec());
+ assertEquals(ParquetVersion.Version2, meta.getVersion());
+ assertEquals("268435456", meta.getRowGroupSize());
+ assertEquals("8192", meta.getDataPageSize());
+ assertEquals(
+ Integer.toString(ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE),
+ meta.getDictionaryPageSize());
+ assertEquals("1000000", meta.getFileSplitSize());
+ assertTrue(meta.isFilenameIncludingCopyNr());
+ assertTrue(meta.isFilenameIncludingSplitNr());
+ assertTrue(meta.isFilenameCreatingParentFolders());
+ assertFalse(meta.isFilenameIncludingDate());
+ assertFalse(meta.isFilenameIncludingTime());
+ assertFalse(meta.isFilenameIncludingDateTime());
+ assertTrue(meta.getFields().isEmpty());
+ }
+
+ @Test
+ void testCopyConstructor() {
+ ParquetOutputMeta original = new ParquetOutputMeta();
+ original.setFilenameBase("/tmp/output");
+ original.setFilenameExtension("pq");
+ original.setFilenameIncludingDate(true);
+ original.setCompressionCodec(CompressionCodecName.SNAPPY);
+ original.setVersion(ParquetVersion.Version1);
+ original.setFields(List.of(new ParquetField("id", "id_out")));
+
+ ParquetOutputMeta copy = new ParquetOutputMeta(original);
+ assertEquals("/tmp/output", copy.getFilenameBase());
+ assertEquals("pq", copy.getFilenameExtension());
+ assertTrue(copy.isFilenameIncludingDate());
+ assertEquals(CompressionCodecName.SNAPPY, copy.getCompressionCodec());
+ assertEquals(ParquetVersion.Version1, copy.getVersion());
+ assertEquals(1, copy.getFields().size());
+ assertEquals("id", copy.getFields().get(0).getSourceFieldName());
+ }
+
+ @Test
+ void testXmlRoundTrip() throws Exception {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ meta.setFilenameBase("/data/out");
+ meta.setFilenameExtension("parquet");
+ meta.setFilenameIncludingDate(true);
+ meta.setFilenameIncludingTime(true);
+ meta.setFilenameIncludingDateTime(true);
+ meta.setFilenameDateTimeFormat("yyyy-MM-dd");
+ meta.setFilenameIncludingCopyNr(false);
+ meta.setFilenameIncludingSplitNr(false);
+ meta.setFileSplitSize("500");
+ meta.setFilenameCreatingParentFolders(false);
+ meta.setCompressionCodec(CompressionCodecName.GZIP);
+ meta.setVersion(ParquetVersion.Version1);
+ meta.setRowGroupSize("1024");
+ meta.setDataPageSize("512");
+ meta.setDictionaryPageSize("256");
+ meta.getFields().add(new ParquetField("id", "id"));
+ meta.getFields().add(new ParquetField("name", "name"));
+
+ String xml =
+ XmlHandler.openTag(TransformMeta.XML_TAG)
+ + XmlMetadataUtil.serializeObjectToXml(meta)
+ + XmlHandler.closeTag(TransformMeta.XML_TAG);
+
+ ParquetOutputMeta loaded = new ParquetOutputMeta();
+ XmlMetadataUtil.deSerializeFromXml(
+ XmlHandler.loadXmlString(xml, TransformMeta.XML_TAG),
+ ParquetOutputMeta.class,
+ loaded,
+ new MemoryMetadataProvider());
+
+ validate(meta, loaded);
+ }
+
+ private static void validate(ParquetOutputMeta expected, ParquetOutputMeta
actual) {
+ assertEquals(expected.getFilenameBase(), actual.getFilenameBase());
+ assertEquals(expected.getFilenameExtension(),
actual.getFilenameExtension());
+ assertEquals(expected.isFilenameIncludingDate(),
actual.isFilenameIncludingDate());
+ assertEquals(expected.isFilenameIncludingTime(),
actual.isFilenameIncludingTime());
+ assertEquals(expected.isFilenameIncludingDateTime(),
actual.isFilenameIncludingDateTime());
+ assertEquals(expected.getFilenameDateTimeFormat(),
actual.getFilenameDateTimeFormat());
+ assertEquals(expected.isFilenameIncludingCopyNr(),
actual.isFilenameIncludingCopyNr());
+ assertEquals(expected.isFilenameIncludingSplitNr(),
actual.isFilenameIncludingSplitNr());
+ assertEquals(expected.getFileSplitSize(), actual.getFileSplitSize());
+ assertEquals(
+ expected.isFilenameCreatingParentFolders(),
actual.isFilenameCreatingParentFolders());
+ assertEquals(expected.getCompressionCodec(), actual.getCompressionCodec());
+ assertEquals(expected.getVersion(), actual.getVersion());
+ assertEquals(expected.getRowGroupSize(), actual.getRowGroupSize());
+ assertEquals(expected.getDataPageSize(), actual.getDataPageSize());
+ assertEquals(expected.getDictionaryPageSize(),
actual.getDictionaryPageSize());
+ assertEquals(expected.getFields().size(), actual.getFields().size());
+ for (int i = 0; i < expected.getFields().size(); i++) {
+ assertEquals(
+ expected.getFields().get(i).getSourceFieldName(),
+ actual.getFields().get(i).getSourceFieldName());
+ assertEquals(
+ expected.getFields().get(i).getTargetFieldName(),
+ actual.getFields().get(i).getTargetFieldName());
+ }
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputStreamTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputStreamTest.java
new file mode 100644
index 0000000000..e69d0a676d
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputStreamTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetOutputStream} */
+class ParquetOutputStreamTest {
+
+ @Test
+ void testWriteSingleByte() throws Exception {
+ ByteArrayOutputStream delegate = new ByteArrayOutputStream();
+ ParquetOutputStream stream = new ParquetOutputStream(delegate);
+
+ stream.write(42);
+ stream.flush();
+
+ assertEquals(1, stream.getPos());
+ assertArrayEquals(new byte[] {42}, delegate.toByteArray());
+ }
+
+ @Test
+ void testWriteByteArray() throws Exception {
+ ByteArrayOutputStream delegate = new ByteArrayOutputStream();
+ ParquetOutputStream stream = new ParquetOutputStream(delegate);
+ byte[] data = new byte[] {1, 2, 3, 4, 5};
+
+ stream.write(data);
+ stream.flush();
+
+ assertEquals(5, stream.getPos());
+ assertArrayEquals(data, delegate.toByteArray());
+ }
+
+ @Test
+ void testWriteByteArrayRange() throws Exception {
+ ByteArrayOutputStream delegate = new ByteArrayOutputStream();
+ ParquetOutputStream stream = new ParquetOutputStream(delegate);
+ byte[] data = new byte[] {9, 1, 2, 3, 8};
+
+ stream.write(data, 1, 3);
+ stream.flush();
+
+ assertEquals(3, stream.getPos());
+ assertArrayEquals(new byte[] {1, 2, 3}, delegate.toByteArray());
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputTest.java
new file mode 100644
index 0000000000..251571a5ce
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetOutputTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.logging.ILoggingObject;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaInteger;
+import org.apache.hop.core.row.value.ValueMetaString;
+import org.apache.hop.junit.rules.RestoreHopEngineEnvironmentExtension;
+import org.apache.hop.pipeline.Pipeline;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.hop.pipeline.transforms.mock.TransformMockHelper;
+import org.apache.parquet.column.ParquetProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Unit test for {@link ParquetOutput} */
+@ExtendWith(RestoreHopEngineEnvironmentExtension.class)
+class ParquetOutputTest {
+ private TransformMockHelper<ParquetOutputMeta, ParquetOutputData> mockHelper;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mockHelper =
+ new TransformMockHelper<>(
+ "Parquet Output", ParquetOutputMeta.class,
ParquetOutputData.class);
+ when(mockHelper.logChannelFactory.create(any(), any(ILoggingObject.class)))
+ .thenReturn(mockHelper.iLogChannel);
+ when(mockHelper.pipeline.isRunning()).thenReturn(true);
+ }
+
+ @AfterEach
+ void tearDown() {
+ mockHelper.cleanUp();
+ }
+
+ @Test
+ void testInitCalculatesSizes() {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ meta.setDataPageSize("1024");
+ meta.setDictionaryPageSize("512");
+ meta.setRowGroupSize("128");
+ meta.setFileSplitSize("100");
+
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = createTransform(meta, data);
+
+ assertTrue(output.init());
+ assertEquals(1024, data.pageSize);
+ assertEquals(512, data.dictionaryPageSize);
+ assertEquals(128, data.rowGroupSize);
+ assertEquals(100, data.maxSplitSizeRows);
+ }
+
+ @Test
+ void testInitUsesDefaultsForInvalidValues() {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ meta.setDataPageSize("invalid");
+ meta.setDictionaryPageSize("");
+ meta.setRowGroupSize(null);
+ meta.setFileSplitSize("not-a-number");
+
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = createTransform(meta, data);
+
+ assertTrue(output.init());
+ assertEquals(ParquetProperties.DEFAULT_PAGE_SIZE, data.pageSize);
+ assertEquals(ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE,
data.dictionaryPageSize);
+ assertEquals(ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT,
data.rowGroupSize);
+ assertEquals(-1, data.maxSplitSizeRows);
+ }
+
+ @Test
+ void testProcessRowWithNoInputRows() throws Exception {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = spy(createTransform(meta, data));
+
+ assertTrue(output.init());
+ doReturn(null).when(output).getRow();
+
+ assertFalse(output.processRow());
+ }
+
+ @Test
+ void testProcessRowMissingSourceField() throws Exception {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ meta.getFields().add(new ParquetField("missing", "missing"));
+
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = spy(createTransform(meta, data));
+ output.setInputRowMeta(new RowMeta());
+
+ assertTrue(output.init());
+ doReturn(new Object[] {1L}).when(output).getRow();
+
+ org.junit.jupiter.api.Assertions.assertThrows(HopException.class,
output::processRow);
+ }
+
+ @Test
+ void testResolveOutputFieldsUsesAllInputFieldsWhenNoneConfigured() throws
Exception {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = createTransform(meta, data);
+
+ RowMeta inputRowMeta = new RowMeta();
+ inputRowMeta.addValueMeta(new ValueMetaInteger("id"));
+ inputRowMeta.addValueMeta(new ValueMetaString("name"));
+ output.setInputRowMeta(inputRowMeta);
+
+ output.resolveOutputFields();
+
+ assertEquals(2, data.outputFields.size());
+ assertEquals("id", data.outputFields.get(0).getSourceFieldName());
+ assertEquals("id", data.outputFields.get(0).getTargetFieldName());
+ assertEquals("name", data.outputFields.get(1).getSourceFieldName());
+ assertEquals("name", data.outputFields.get(1).getTargetFieldName());
+ assertEquals(0, data.sourceFieldIndexes.get(0));
+ assertEquals(1, data.sourceFieldIndexes.get(1));
+ }
+
+ @Test
+ void testResolveOutputFieldsUsesConfiguredFields() throws Exception {
+ ParquetOutputMeta meta = new ParquetOutputMeta();
+ meta.getFields().add(new ParquetField("id", "identifier"));
+ meta.getFields().add(new ParquetField("name", ""));
+
+ ParquetOutputData data = new ParquetOutputData();
+ ParquetOutput output = createTransform(meta, data);
+
+ RowMeta inputRowMeta = new RowMeta();
+ inputRowMeta.addValueMeta(new ValueMetaInteger("id"));
+ inputRowMeta.addValueMeta(new ValueMetaString("name"));
+ output.setInputRowMeta(inputRowMeta);
+
+ output.resolveOutputFields();
+
+ assertEquals(2, data.outputFields.size());
+ assertEquals("identifier", data.outputFields.get(0).getTargetFieldName());
+ assertEquals("name", data.outputFields.get(1).getTargetFieldName());
+ }
+
+ private ParquetOutput createTransform(ParquetOutputMeta meta,
ParquetOutputData data) {
+ PipelineMeta pipelineMeta = new PipelineMeta();
+ TransformMeta transformMeta = new TransformMeta("Parquet Output", meta);
+ pipelineMeta.addTransform(transformMeta);
+ Pipeline pipeline = new LocalPipelineEngine(pipelineMeta);
+ return new ParquetOutput(transformMeta, meta, data, 0, pipelineMeta,
pipeline);
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetResourceFileTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetResourceFileTest.java
new file mode 100644
index 0000000000..318e677d15
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetResourceFileTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import org.apache.hop.core.RowMetaAndData;
+import org.apache.hop.core.exception.HopValueException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.junit.rules.RestoreHopEngineEnvironmentExtension;
+import org.apache.hop.parquet.transforms.input.ParquetField;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Parses a Parquet file from test resources and validates schema and row
content. */
+@ExtendWith(RestoreHopEngineEnvironmentExtension.class)
+class ParquetResourceFileTest {
+
+ private static final String SAMPLE_FILE = "hello.parquet-00-0001.parquet";
+
+ @Test
+ @Disabled("testReadSampleParquetFileSchema")
+ void testReadSampleParquetFileSchema() throws Exception {
+ String filename = ParquetTestUtil.resourceFilePath(getClass(),
SAMPLE_FILE);
+
+ IRowMeta schema = ParquetTestUtil.readSchema(filename);
+
+ assertEquals(4, schema.size());
+ assertEquals("user_id", schema.getValueMeta(0).getName());
+ assertEquals("json", schema.getValueMeta(1).getName());
+ assertEquals("age", schema.getValueMeta(2).getName());
+ assertEquals("sex", schema.getValueMeta(3).getName());
+ assertEquals(IValueMeta.TYPE_STRING, schema.getValueMeta(0).getType());
+ assertEquals(IValueMeta.TYPE_STRING, schema.getValueMeta(1).getType());
+ assertEquals(IValueMeta.TYPE_INTEGER, schema.getValueMeta(2).getType());
+ assertEquals(IValueMeta.TYPE_BOOLEAN, schema.getValueMeta(3).getType());
+ }
+
+ @Test
+ @Disabled("testReadSampleParquetFileRows")
+ void testReadSampleParquetFileRows() throws Exception {
+ String filename = ParquetTestUtil.resourceFilePath(getClass(),
SAMPLE_FILE);
+
+ IRowMeta schema = ParquetTestUtil.readSchema(filename);
+ List<ParquetField> readFields = ParquetTestUtil.fieldsFromRowMeta(schema);
+ List<RowMetaAndData> rows = ParquetTestUtil.readAllRows(filename,
readFields);
+
+ assertEquals(2, rows.size());
+
+ rows.sort(
+ (a, b) -> {
+ try {
+ return a.getRowMeta()
+ .getString(a.getData(), indexOf(a.getRowMeta(), "user_id"))
+ .compareTo(
+ b.getRowMeta().getString(b.getData(),
indexOf(b.getRowMeta(), "user_id")));
+ } catch (HopValueException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertRow(rows.get(0), "item.100", "hello", 12L, true);
+ assertRow(rows.get(1), "item.200", "world", 20L, false);
+ }
+
+ private static void assertRow(
+ RowMetaAndData row, String userId, String json, long age, boolean sex)
throws Exception {
+ IRowMeta rowMeta = row.getRowMeta();
+ Object[] rowData = row.getData();
+ assertNotNull(rowData);
+
+ assertEquals(userId, rowMeta.getString(rowData, indexOf(rowMeta,
"user_id")));
+ assertEquals(json, rowMeta.getString(rowData, indexOf(rowMeta, "json")));
+ assertEquals(age, rowMeta.getInteger(rowData, indexOf(rowMeta, "age")));
+ assertEquals(sex, rowMeta.getBoolean(rowData, indexOf(rowMeta, "sex")));
+ }
+
+ private static int indexOf(IRowMeta rowMeta, String fieldName) {
+ int index = rowMeta.indexOfValue(fieldName);
+ if (index < 0) {
+ throw new AssertionError("Field '" + fieldName + "' not found");
+ }
+ return index;
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetTestUtil.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetTestUtil.java
new file mode 100644
index 0000000000..5cc01b5642
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetTestUtil.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.hop.core.RowMetaAndData;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.variables.Variables;
+import org.apache.hop.parquet.transforms.input.ParquetField;
+import org.apache.hop.parquet.transforms.input.ParquetInputMeta;
+import org.apache.hop.parquet.transforms.input.ParquetReadSupport;
+import org.apache.hop.parquet.transforms.input.ParquetReaderBuilder;
+import org.apache.hop.parquet.transforms.input.ParquetStream;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/** Test helper to read and validate Parquet files. */
+final class ParquetTestUtil {
+
+ private ParquetTestUtil() {}
+
+ static String resourceFilePath(Class<?> clazz, String resourceName) throws
URISyntaxException {
+ return Path.of(
+ Objects.requireNonNull(
+ clazz.getResource("/" + resourceName), "Missing test
resource: " + resourceName)
+ .toURI())
+ .toAbsolutePath()
+ .toString();
+ }
+
+ static List<ParquetField> fieldsFromRowMeta(IRowMeta rowMeta) {
+ List<ParquetField> fields = new ArrayList<>();
+ for (int i = 0; i < rowMeta.size(); i++) {
+ IValueMeta valueMeta = rowMeta.getValueMeta(i);
+ fields.add(
+ new ParquetField(
+ valueMeta.getName(),
+ valueMeta.getName(),
+ valueMeta.getTypeDesc(),
+ valueMeta.getFormatMask(),
+ Integer.toString(valueMeta.getLength()),
+ Integer.toString(valueMeta.getPrecision())));
+ }
+ return fields;
+ }
+
+ static IRowMeta readSchema(String filename) throws HopException {
+ return readSchema(new Variables(), filename);
+ }
+
+ static IRowMeta readSchema(IVariables variables, String filename) throws
HopException {
+ return ParquetInputMeta.extractRowMeta(variables, filename);
+ }
+
+ static List<RowMetaAndData> readAllRows(String filename, List<ParquetField>
fields)
+ throws IOException {
+ byte[] data = Files.readAllBytes(Path.of(filename));
+ ParquetStream inputFile = new ParquetStream(data, filename);
+ ParquetReadSupport readSupport = new ParquetReadSupport(fields);
+ try (ParquetReader<RowMetaAndData> reader =
+ new ParquetReaderBuilder<>(readSupport, inputFile).build()) {
+ List<RowMetaAndData> rows = new ArrayList<>();
+ RowMetaAndData row;
+ while ((row = reader.read()) != null) {
+ // ParquetReader reuses the same RowMetaAndData instance on every
read().
+ rows.add(row.clone());
+ }
+ return rows;
+ }
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetVersionTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetVersionTest.java
new file mode 100644
index 0000000000..5a0ce6e236
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetVersionTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetVersion} */
+class ParquetVersionTest {
+
+ @Test
+ void testGetCode() {
+ assertEquals("1.0", ParquetVersion.Version1.getCode());
+ assertEquals("2.0", ParquetVersion.Version2.getCode());
+ }
+
+ @Test
+ void testGetDescriptions() {
+ String[] descriptions = ParquetVersion.getDescriptions();
+ assertEquals(2, descriptions.length);
+ assertEquals("Parquet 1.0", descriptions[0]);
+ assertEquals("Parquet 2.0", descriptions[1]);
+ }
+
+ @Test
+ void testGetVersionFromDescription() {
+ assertEquals(ParquetVersion.Version1,
ParquetVersion.getVersionFromDescription("Parquet 1.0"));
+ assertEquals(ParquetVersion.Version2,
ParquetVersion.getVersionFromDescription("Parquet 2.0"));
+ assertEquals(ParquetVersion.Version1,
ParquetVersion.getVersionFromDescription("unknown"));
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriteSupportTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriteSupportTest.java
new file mode 100644
index 0000000000..992de70114
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriteSupportTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hop.core.RowMetaAndData;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaInteger;
+import org.apache.hop.core.row.value.ValueMetaString;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetWriteSupport} */
+class ParquetWriteSupportTest {
+
+ @Test
+ void testInitReturnsWriteContext() {
+ Schema avroSchema =
+ SchemaBuilder.record("ApacheHopParquetSchema")
+ .fields()
+ .requiredLong("id")
+ .requiredString("name")
+ .endRecord();
+ MessageType messageType = new AvroSchemaConverter().convert(avroSchema);
+ List<Integer> indexes = List.of(0, 1);
+ List<ParquetField> fields =
+ List.of(new ParquetField("id", "id"), new ParquetField("name",
"name"));
+
+ ParquetWriteSupport support = new ParquetWriteSupport(messageType,
avroSchema, indexes, fields);
+
+ WriteSupport.WriteContext context = support.init(new Configuration());
+ org.junit.jupiter.api.Assertions.assertEquals(messageType,
context.getSchema());
+ }
+
+ @Test
+ void testWriteIntegerAndString() throws Exception {
+ Schema avroSchema =
+ SchemaBuilder.record("ApacheHopParquetSchema")
+ .fields()
+ .requiredLong("id")
+ .requiredString("name")
+ .endRecord();
+ MessageType messageType = new AvroSchemaConverter().convert(avroSchema);
+ List<Integer> indexes = List.of(0, 1);
+ List<ParquetField> fields =
+ List.of(new ParquetField("id", "id"), new ParquetField("name",
"name"));
+
+ ParquetWriteSupport support = new ParquetWriteSupport(messageType,
avroSchema, indexes, fields);
+ RecordConsumer consumer = mock(RecordConsumer.class);
+ support.prepareForWrite(consumer);
+
+ RowMeta rowMeta = new RowMeta();
+ rowMeta.addValueMeta(new ValueMetaInteger("id"));
+ rowMeta.addValueMeta(new ValueMetaString("name"));
+ support.write(new RowMetaAndData(rowMeta, new Object[] {42L, "Alice"}));
+
+ verify(consumer).startMessage();
+ verify(consumer).startField("id", 0);
+ verify(consumer).addLong(42L);
+ verify(consumer).endField("id", 0);
+ verify(consumer).startField("name", 1);
+ verify(consumer).endField("name", 1);
+ verify(consumer).endMessage();
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriterBuilderTest.java
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriterBuilderTest.java
new file mode 100644
index 0000000000..2c96a01b6b
--- /dev/null
+++
b/plugins/tech/parquet/src/test/java/org/apache/hop/parquet/transforms/output/ParquetWriterBuilderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.parquet.transforms.output;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link ParquetWriterBuilder} */
+class ParquetWriterBuilderTest {
+
+ @Test
+ void testGetWriteSupport() {
+ Schema avroSchema =
+
SchemaBuilder.record("ApacheHopParquetSchema").fields().requiredLong("id").endRecord();
+ MessageType messageType = new AvroSchemaConverter().convert(avroSchema);
+ List<Integer> indexes = List.of(0);
+ List<ParquetField> fields = List.of(new ParquetField("id", "id"));
+ ParquetOutputFile outputFile = new ParquetOutputFile(new
ByteArrayOutputStream());
+
+ ParquetWriterBuilder builder =
+ new ParquetWriterBuilder(messageType, avroSchema, outputFile, indexes,
fields);
+
+ WriteSupport<?> writeSupport = builder.getWriteSupport(new
Configuration());
+ assertInstanceOf(ParquetWriteSupport.class, writeSupport);
+ }
+}
diff --git
a/plugins/tech/parquet/src/test/resources/hello.parquet-00-0001.parquet
b/plugins/tech/parquet/src/test/resources/hello.parquet-00-0001.parquet
new file mode 100644
index 0000000000..6205c01c80
Binary files /dev/null and
b/plugins/tech/parquet/src/test/resources/hello.parquet-00-0001.parquet differ