This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new d19f505c84 [SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for
Parquet
d19f505c84 is described below
commit d19f505c844dfeb0dc5933653a06deb29bc61bfd
Author: sayedkeika <[email protected]>
AuthorDate: Fri Apr 18 14:59:26 2025 +0200
[SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for Parquet
Closes #2229.
---
src/main/java/org/apache/sysds/common/Types.java | 1 +
.../sysds/runtime/io/FrameReaderParquet.java | 157 +++++++++++++++
.../runtime/io/FrameReaderParquetParallel.java | 118 +++++++++++
.../sysds/runtime/io/FrameWriterParquet.java | 199 ++++++++++++++++++
.../runtime/io/FrameWriterParquetParallel.java | 120 +++++++++++
.../sysds/test/functions/io/SeqParReadTest2.java | 10 +
.../io/parquet/FrameParquetSchemaTest.java | 223 +++++++++++++++++++++
7 files changed, 828 insertions(+)
diff --git a/src/main/java/org/apache/sysds/common/Types.java
b/src/main/java/org/apache/sysds/common/Types.java
index c9820a2c09..e6d5f8e945 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -868,6 +868,7 @@ public interface Types {
PROTO, // protocol buffer representation
HDF5, // Hierarchical Data Format (HDF)
COG, // Cloud-optimized GeoTIFF
+ PARQUET, // parquet format for columnar data storage
UNKNOWN;
public boolean isIJV() {
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
new file mode 100644
index 0000000000..ff23e9ea31
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+/**
+ * Single-threaded frame parquet reader.
+ *
+ */
+public class FrameReaderParquet extends FrameReader {
+
+ /**
+ * Reads a Parquet file from HDFS and converts it into a FrameBlock.
+ *
+ * @param fname The HDFS file path to the Parquet file.
+ * @param schema The expected data types of the columns.
+ * @param names The names of the columns.
+ * @param rlen The expected number of rows.
+ * @param clen The expected number of columns.
+ * @return A FrameBlock containing the data read from the Parquet file.
+ */
+ @Override
+ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema,
String[] names, long rlen, long clen) throws IOException, DMLRuntimeException {
+ // Prepare file access
+ Configuration conf = ConfigurationManager.getCachedJobConf();
+ Path path = new Path(fname);
+
+ // Check existence and non-empty file
+ if (!HDFSTool.existsFileOnHDFS(path.toString())) {
+ throw new IOException("File does not exist on HDFS: " +
fname);
+ }
+
+ // Allocate output frame block
+ ValueType[] lschema = createOutputSchema(schema, clen);
+ String[] lnames = createOutputNames(names, clen);
+ FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
+
+ // Read Parquet file
+ readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen);
+
+ return ret;
+ }
+
+ /**
+ * Reads data from a Parquet file on HDFS and fills the provided
FrameBlock.
+ * The method retrieves the Parquet schema from the file footer, maps
the required column names
+ * to their corresponding indices, and then uses a ParquetReader to
iterate over each row.
+ * Data is extracted based on the column type and set into the output
FrameBlock.
+ *
+ * @param path The HDFS path to the Parquet file.
+ * @param conf The Hadoop configuration.
+ * @param dest The FrameBlock to populate with data.
+ * @param schema The expected value types for the output columns.
+ * @param rlen The expected number of rows.
+ * @param clen The expected number of columns.
+ */
+ protected void readParquetFrameFromHDFS(Path path, Configuration conf,
FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException {
+ // Retrieve schema from Parquet footer
+ ParquetMetadata metadata =
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter();
+ MessageType parquetSchema =
metadata.getFileMetaData().getSchema();
+
+ // Map column names to Parquet schema indices
+ String[] columnNames = dest.getColumnNames();
+ int[] columnIndices = new int[columnNames.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnIndices[i] =
parquetSchema.getFieldIndex(columnNames[i]);
+ }
+
+ // Read data usind ParquetReader
+ try (ParquetReader<Group> rowReader = ParquetReader.builder(new
GroupReadSupport(), path)
+ .withConf(conf)
+ .build()) {
+
+ Group group;
+ int row = 0;
+ while ((group = rowReader.read()) != null) {
+ for (int col = 0; col < clen; col++) {
+ int colIndex = columnIndices[col];
+ if
(group.getFieldRepetitionCount(colIndex) > 0) {
+ PrimitiveType.PrimitiveTypeName
type =
parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName();
+ switch (type) {
+ case INT32:
+ dest.set(row,
col, group.getInteger(colIndex, 0));
+ break;
+ case INT64:
+ dest.set(row,
col, group.getLong(colIndex, 0));
+ break;
+ case FLOAT:
+ dest.set(row,
col, group.getFloat(colIndex, 0));
+ break;
+ case DOUBLE:
+ dest.set(row,
col, group.getDouble(colIndex, 0));
+ break;
+ case BOOLEAN:
+ dest.set(row,
col, group.getBoolean(colIndex, 0));
+ break;
+ case BINARY:
+ dest.set(row,
col, group.getBinary(colIndex, 0).toStringUsingUTF8());
+ break;
+ default:
+ throw new
IOException("Unsupported data type: " + type);
+ }
+ } else {
+ dest.set(row, col, null);
+ }
+ }
+ row++;
+ }
+
+ // Check frame dimensions
+ if (row != rlen) {
+ throw new IOException("Mismatch in row count:
expected " + rlen + ", but got " + row);
+ }
+ }
+ }
+
+ //not implemented
+ @Override
+ public FrameBlock readFrameFromInputStream(InputStream is, ValueType[]
schema, String[] names, long rlen, long clen)
+ throws IOException, DMLRuntimeException {
+ throw new UnsupportedOperationException("Unimplemented method
'readFrameFromInputStream'");
+ }
+}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java
new file mode 100644
index 0000000000..3d40f53c62
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+/**
+ * Multi-threaded frame parquet reader.
+ *
+ */
+public class FrameReaderParquetParallel extends FrameReaderParquet {
+
+ /**
+ * Reads a Parquet frame in parallel and populates the provided
FrameBlock with the data.
+ * The method retrieves all file paths from the sequence files at that
location, it then determines
+ * the number of threads to use based on the available files and a
configured parallelism setting.
+ * A thread pool is created to run a reading task for each file
concurrently.
+ *
+ * @param path The HDFS path to the Parquet file or the directory
containing sequence files.
+ * @param conf The Hadoop configuration.
+ * @param dest The FrameBlock to be updated with the data read from
the files.
+ * @param schema The expected value types for the frame columns.
+ * @param rlen The expected number of rows.
+ * @param clen The expected number of columns.
+ */
+ @Override
+ protected void readParquetFrameFromHDFS(Path path, Configuration conf,
FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException,
DMLRuntimeException {
+ FileSystem fs = IOUtilFunctions.getFileSystem(path);
+ Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path);
+ int numThreads =
Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length);
+
+ // Create and execute read tasks
+ ExecutorService pool = CommonThreadPool.get(numThreads);
+ try {
+ List<ReadFileTask> tasks = new ArrayList<>();
+ for (Path file : files) {
+ tasks.add(new ReadFileTask(file, conf, dest,
schema, clen));
+ }
+
+ for (Future<Object> task : pool.invokeAll(tasks)) {
+ task.get();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed parallel read of Parquet
frame.", e);
+ } finally {
+ pool.shutdown();
+ }
+ }
+
+ private class ReadFileTask implements Callable<Object> {
+ private Path path;
+ private Configuration conf;
+ private FrameBlock dest;
+ @SuppressWarnings("unused")
+ private ValueType[] schema;
+ private long clen;
+
+ public ReadFileTask(Path path, Configuration conf, FrameBlock
dest, ValueType[] schema, long clen) {
+ this.path = path;
+ this.conf = conf;
+ this.dest = dest;
+ this.schema = schema;
+ this.clen = clen;
+ }
+
+ // When executed, a ParquetReader for the assigned file opens
and iterates over each row processing every column.
+ @Override
+ public Object call() throws Exception {
+ try (ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) {
+ Group group;
+ int row = 0;
+ while ((group = reader.read()) != null) {
+ for (int col = 0; col < clen; col++) {
+ if
(group.getFieldRepetitionCount(col) > 0) {
+ dest.set(row, col,
group.getValueToString(col, 0));
+ } else {
+ dest.set(row, col,
null);
+ }
+ }
+ row++;
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java
new file mode 100644
index 0000000000..ccaeeb56d5
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.common.Types.ValueType;
+
+/**
+ * Single-threaded frame parquet writer.
+ *
+ */
+public class FrameWriterParquet extends FrameWriter {
+
+ /**
+ * Writes a FrameBlock to a Parquet file on HDFS.
+ *
+ * @param src The FrameBlock containing the data to write.
+ * @param fname The HDFS file path where the Parquet file will be
stored.
+ * @param rlen The expected number of rows.
+ * @param clen The expected number of columns.
+ */
+ @Override
+ public final void writeFrameToHDFS(FrameBlock src, String fname, long
rlen, long clen) throws IOException, DMLRuntimeException {
+ // Prepare file access
+ JobConf conf = ConfigurationManager.getCachedJobConf();
+ Path path = new Path(fname);
+
+ // If the file already exists on HDFS, remove it
+ HDFSTool.deleteFileIfExistOnHDFS(path, conf);
+
+ // Check frame dimensions
+ if (src.getNumRows() != rlen || src.getNumColumns() != clen) {
+ throw new IOException("Frame dimensions mismatch with
metadata: " + src.getNumRows() + "x" + src.getNumColumns() + " vs " + rlen +
"x" + clen + ".");
+ }
+
+ // Write parquet file
+ writeParquetFrameToHDFS(path, conf, src);
+ }
+
+ /**
+ * Writes the FrameBlock data to a Parquet file using a ParquetWriter.
+ * The method generates a Parquet schema based on the metadata of the
FrameBlock, initializes a ParquetWriter with specified configurations,
+ * iterates over each row and column, adding values (in batches for
improved performance) using type-specific conversions.
+ *
+ * @param path The HDFS path where the Parquet file will be written.
+ * @param conf The Hadoop configuration.
+ * @param src The FrameBlock containing the data to write.
+ */
+ protected void writeParquetFrameToHDFS(Path path, Configuration conf,
FrameBlock src)
+ throws IOException
+ {
+ FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+
+ // Create schema based on frame block metadata
+ MessageType schema = createParquetSchema(src);
+
+ // TODO:Experiment with different batch sizes?
+ int batchSize = 1000;
+ int rowCount = 0;
+
+ // Write data using ParquetWriter //FIXME replace example
writer?
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withType(schema)
+
.withCompressionCodec(ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME)
+ .withRowGroupSize((long)
ParquetWriter.DEFAULT_BLOCK_SIZE)
+ .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
+ .withDictionaryEncoding(true)
+ .build())
+ {
+
+ SimpleGroupFactory groupFactory = new
SimpleGroupFactory(schema);
+
+ List<Group> rowBuffer = new ArrayList<>(batchSize);
+
+ for (int i = 0; i < src.getNumRows(); i++) {
+ Group group = groupFactory.newGroup();
+ for (int j = 0; j < src.getNumColumns(); j++) {
+ Object value = src.get(i, j);
+ if (value != null) {
+ ValueType type =
src.getSchema()[j];
+ switch (type) {
+ case STRING:
+
group.add(src.getColumnNames()[j], value.toString());
+ break;
+ case INT32:
+
group.add(src.getColumnNames()[j], (int) value);
+ break;
+ case INT64:
+
group.add(src.getColumnNames()[j], (long) value);
+ break;
+ case FP32:
+
group.add(src.getColumnNames()[j], (float) value);
+ break;
+ case FP64:
+
group.add(src.getColumnNames()[j], (double) value);
+ break;
+ case BOOLEAN:
+
group.add(src.getColumnNames()[j], (boolean) value);
+ break;
+ default:
+ throw new
IOException("Unsupported value type: " + type);
+ }
+ }
+ }
+ rowBuffer.add(group);
+ rowCount++;
+
+ if (rowCount >= batchSize) {
+ for (Group g : rowBuffer) {
+ writer.write(g);
+ }
+ rowBuffer.clear();
+ rowCount = 0;
+ }
+ }
+
+ for (Group g : rowBuffer) {
+ writer.write(g);
+ }
+ }
+
+ // Delete CRC files created by Hadoop if necessary
+ IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
+ }
+
+ /**
+ * Creates a Parquet schema based on the metadata of a FrameBlock.
+ *
+ * @param src The FrameBlock whose metadata is used to create the
Parquet schema.
+ * @return The generated Parquet MessageType schema.
+ */
+ protected MessageType createParquetSchema(FrameBlock src) {
+ StringBuilder schemaBuilder = new StringBuilder("message
FrameSchema {");
+ String[] columnNames = src.getColumnNames();
+ ValueType[] columnTypes = src.getSchema();
+
+ for (int i = 0; i < src.getNumColumns(); i++) {
+ schemaBuilder.append("optional ");
+ switch (columnTypes[i]) {
+ case STRING:
+ schemaBuilder.append("binary
").append(columnNames[i]).append(" (UTF8);");
+ break;
+ case INT32:
+ schemaBuilder.append("int32
").append(columnNames[i]).append(";");
+ break;
+ case INT64:
+ schemaBuilder.append("int64
").append(columnNames[i]).append(";");
+ break;
+ case FP32:
+ schemaBuilder.append("float
").append(columnNames[i]).append(";");
+ break;
+ case FP64:
+ schemaBuilder.append("double
").append(columnNames[i]).append(";");
+ break;
+ case BOOLEAN:
+ schemaBuilder.append("boolean
").append(columnNames[i]).append(";");
+ break;
+ default:
+ throw new
IllegalArgumentException("Unsupported data type: " + columnTypes[i]);
+ }
+ }
+ schemaBuilder.append("}");
+ return
MessageTypeParser.parseMessageType(schemaBuilder.toString());
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java
new file mode 100644
index 0000000000..0ef4431ef4
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
+
+/**
+ * Multi-threaded frame parquet reader.
+ *
+ */
+public class FrameWriterParquetParallel extends FrameWriterParquet {
+
+ /**
+ * Writes the FrameBlock data to HDFS in parallel.
+ * The method estimates the number of output partitions by comparing
the total number of cells in the FrameBlock with the
+ * HDFS block size. It then determines the number of threads to use
based on the parallelism configuration and the
+ * number of partitions. In case of parallelism, it divides the
FrameBlock into chunks and a thread pool is created to
+ * execute a write task for each partition concurrently.
+ *
+ * @param path The HDFS path where the Parquet files will be written.
+ * @param conf The Hadoop configuration.
+ * @param src The FrameBlock containing the data to write.
+ */
+ @Override
+ protected void writeParquetFrameToHDFS(Path path, Configuration conf,
FrameBlock src)
+ throws IOException, DMLRuntimeException
+ {
+ // Estimate number of output partitions
+ int numPartFiles = Math.max((int) (src.getNumRows() *
src.getNumColumns() / InfrastructureAnalyzer.getHDFSBlockSize()), 1);
+
+ // Determine parallelism
+ int numThreads =
Math.min(OptimizerUtils.getParallelBinaryWriteParallelism(), numPartFiles);
+
+ // Fall back to sequential write if numThreads <= 1
+ if (numThreads <= 1) {
+ super.writeParquetFrameToHDFS(path, conf, src);
+ return;
+ }
+
+ // Create directory for concurrent tasks
+ HDFSTool.createDirIfNotExistOnHDFS(path,
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+
+ // Create and execute write tasks
+ ExecutorService pool = CommonThreadPool.get(numThreads);
+ try {
+ List<WriteFileTask> tasks = new ArrayList<>();
+ int chunkSize = (int) Math.ceil((double)
src.getNumRows() / numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ int startRow = i * chunkSize;
+ int endRow = Math.min((i + 1) * chunkSize,
(int) src.getNumRows());
+ if (startRow < endRow) {
+ Path newPath = new Path(path,
IOUtilFunctions.getPartFileName(i));
+ tasks.add(new WriteFileTask(newPath,
conf, src.slice(startRow, endRow - 1)));
+ }
+ }
+
+ for (Future<Object> task : pool.invokeAll(tasks))
+ task.get();
+ } catch (Exception e) {
+ throw new IOException("Failed parallel write of Parquet
frame.", e);
+ } finally {
+ pool.shutdown();
+ }
+ }
+
+ protected void writeSingleParquetFile(Path path, Configuration conf,
FrameBlock src)
+ throws IOException, DMLRuntimeException
+ {
+ super.writeParquetFrameToHDFS(path, conf, src);
+ }
+
+ private class WriteFileTask implements Callable<Object> {
+ private Path path;
+ private Configuration conf;
+ private FrameBlock src;
+
+ public WriteFileTask(Path path, Configuration conf, FrameBlock
src) {
+ this.path = path;
+ this.conf = conf;
+ this.src = src;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ writeSingleParquetFile(path, conf, src);
+ return null;
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index 6bdc428a90..a4070a119d 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -73,6 +73,10 @@ import org.apache.sysds.runtime.io.WriterTextCell;
import org.apache.sysds.runtime.io.WriterTextCellParallel;
import org.apache.sysds.runtime.io.WriterTextLIBSVM;
import org.apache.sysds.runtime.io.WriterTextLIBSVMParallel;
+import org.apache.sysds.runtime.io.FrameReaderParquet;
+import org.apache.sysds.runtime.io.FrameReaderParquetParallel;
+import org.apache.sysds.runtime.io.FrameWriterParquet;
+import org.apache.sysds.runtime.io.FrameWriterParquetParallel;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
@@ -159,6 +163,10 @@ public class SeqParReadTest2 extends AutomatedTestBase {
{true, "libsvm", false, 0.1},
{true, "libsvm", true, 0.7},
{true, "libsvm", true, 0.1},
+ {false, "parquet", false, 0.7},
+ {false, "parquet", false, 0.1},
+ {false, "parquet", true, 0.7},
+ {false, "parquet", true, 0.1},
};
return Arrays.asList(data);
}
@@ -255,6 +263,7 @@ public class SeqParReadTest2 extends AutomatedTestBase {
new
FrameWriterTextCSVParallel(new FileFormatPropertiesCSV()) :
new
FrameWriterTextCSV(new FileFormatPropertiesCSV());
case BINARY: return par ? new
FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock();
+ case PARQUET: return par ? new
FrameWriterParquetParallel() : new FrameWriterParquet();
}
return null;
}
@@ -268,6 +277,7 @@ public class SeqParReadTest2 extends AutomatedTestBase {
new
FrameReaderTextCSVParallel(new FileFormatPropertiesCSV()) :
new
FrameReaderTextCSV(new FileFormatPropertiesCSV());
case BINARY: return par ? new
FrameReaderBinaryBlockParallel() : new FrameReaderBinaryBlock();
+ case PARQUET: return par ? new
FrameReaderParquetParallel() : new FrameReaderParquet();
}
return null;
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
new file mode 100644
index 0000000000..dc776c8eab
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io.parquet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderParquet;
+import org.apache.sysds.runtime.io.FrameReaderParquetParallel;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterParquet;
+import org.apache.sysds.runtime.io.FrameWriterParquetParallel;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+import java.io.IOException;
+
+/**
+ * This test class verifies that a FrameBlock with different data types is
correctly written and read from Parquet files.
+ * It tests both sequential and parallel implementations. In these tests a
FrameBlock is created, populated with sample
+ * data, written to a Parquet file, and then read back into a new FrameBlock.
The test compares the original and read
+ * data to ensure that schema information is preserved and that data
conversion is performed correctly.
+ */
+public class FrameParquetSchemaTest extends AutomatedTestBase {
+
+ private final static String TEST_NAME = "FrameParquetSchemaTest";
+ private final static String TEST_DIR = "functions/io/parquet";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
FrameParquetSchemaTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR,
TEST_NAME, new String[]{"Rout"}));
+ }
+
+
+ /**
+ * Test for sequential writer and reader
+ *
+ */
+ @Test
+ public void testParquetWriteReadAllSchemaTypes() {
+ String fname = output("Rout");
+
+ // Define a schema with one column per type
+ ValueType[] schema = new ValueType[] {
+ ValueType.FP64,
+ ValueType.FP32,
+ ValueType.INT32,
+ ValueType.INT64,
+ ValueType.BOOLEAN,
+ ValueType.STRING
+ };
+
+ // Create an empty frame block with the above schema
+ FrameBlock fb = new FrameBlock(schema);
+
+ // Populate frame block
+ Object[][] rows = new Object[][] {
+ { 1.0, 1.1f, 10, 100L, true, "A" },
+ { 2.0, 2.1f, 20, 200L, false, "B" },
+ { 3.0, 3.1f, 30, 300L, true, "C" },
+ { 4.0, 4.1f, 40, 400L, false, "D" },
+ { 5.0, 5.1f, 50, 500L, true, "E" }
+ };
+
+ for (Object[] row : rows) {
+ fb.appendRow(row);
+ }
+
+ System.out.println(fb);
+
+ int numRows = fb.getNumRows();
+ int numCols = fb.getNumColumns();
+
+ // Write the FrameBlock to a Parquet file using the sequential writer
+ try {
+ FrameWriter writer = new FrameWriterParquet();
+ writer.writeFrameToHDFS(fb, fname, numRows, numCols);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to write frame block to Parquet: " +
e.getMessage());
+ }
+
+ // Read the Parquet file back into a new FrameBlock
+ FrameBlock fbRead = null;
+ try {
+ FrameReader reader = new FrameReaderParquet();
+ String[] colNames = fb.getColumnNames();
+ fbRead = reader.readFrameFromHDFS(fname, schema, colNames,
numRows, numCols);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to read frame block from Parquet: " +
e.getMessage());
+ }
+
+ // Compare the original and the read frame blocks
+ compareFrameBlocks(fb, fbRead, 1e-6);
+ }
+
+ /**
+ * Test for multithreaded writer and reader
+ *
+ */
+ @Test
+ public void testParquetWriteReadAllSchemaTypesParallel() {
+ String fname = output("Rout_parallel");
+
+ ValueType[] schema = new ValueType[] {
+ ValueType.FP64,
+ ValueType.FP32,
+ ValueType.INT32,
+ ValueType.INT64,
+ ValueType.BOOLEAN,
+ ValueType.STRING
+ };
+
+ FrameBlock fb = new FrameBlock(schema);
+
+ Object[][] rows = new Object[][] {
+ { 1.0, 1.1f, 10, 100L, true, "A" },
+ { 2.0, 2.1f, 20, 200L, false, "B" },
+ { 3.0, 3.1f, 30, 300L, true, "C" },
+ { 4.0, 4.1f, 40, 400L, false, "D" },
+ { 5.0, 5.1f, 50, 500L, true, "E" }
+ };
+
+ for (Object[] row : rows) {
+ fb.appendRow(row);
+ }
+
+ int numRows = fb.getNumRows();
+ int numCols = fb.getNumColumns();
+
+ try {
+ FrameWriter writer = new FrameWriterParquetParallel();
+ writer.writeFrameToHDFS(fb, fname, numRows, numCols);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to write frame block to Parquet (parallel): "
+ e.getMessage());
+ }
+
+ FrameBlock fbRead = null;
+ try {
+ FrameReader reader = new FrameReaderParquetParallel();
+ String[] colNames = fb.getColumnNames();
+ fbRead = reader.readFrameFromHDFS(fname, schema, colNames,
numRows, numCols);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to read frame block from Parquet (parallel): "
+ e.getMessage());
+ }
+
+ compareFrameBlocks(fb, fbRead, 1e-6);
+ }
+
+ private void compareFrameBlocks(FrameBlock expected, FrameBlock actual,
double eps) {
+ Assert.assertEquals("Number of rows mismatch", expected.getNumRows(),
actual.getNumRows());
+ Assert.assertEquals("Number of columns mismatch",
expected.getNumColumns(), actual.getNumColumns());
+
+ int rows = expected.getNumRows();
+ int cols = expected.getNumColumns();
+
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < cols; j++) {
+ Object expVal = expected.get(i, j);
+ Object actVal = actual.get(i, j);
+ ValueType vt = expected.getSchema()[j];
+
+ // Handle nulls first
+ if(expVal == null || actVal == null) {
+ Assert.assertEquals("Mismatch at (" + i + "," + j + ")",
expVal, actVal);
+ } else {
+ switch(vt) {
+ case FP64:
+ case FP32:
+ double dExp = ((Number) expVal).doubleValue();
+ double dAct = ((Number) actVal).doubleValue();
+ Assert.assertEquals("Mismatch at (" + i + "," + j
+ ")", dExp, dAct, eps);
+ break;
+ case INT32:
+ case INT64:
+ long lExp = ((Number) expVal).longValue();
+ long lAct = ((Number) actVal).longValue();
+ Assert.assertEquals("Mismatch at (" + i + "," + j
+ ")", lExp, lAct);
+ break;
+ case BOOLEAN:
+ boolean bExp = (Boolean) expVal;
+ boolean bAct = (Boolean) actVal;
+ Assert.assertEquals("Mismatch at (" + i + "," + j
+ ")", bExp, bAct);
+ break;
+ case STRING:
+ Assert.assertEquals("Mismatch at (" + i + "," + j
+ ")", expVal.toString(), actVal.toString());
+ break;
+ default:
+ Assert.fail("Unsupported type in comparison: " +
vt);
+ }
+ }
+ }
+ }
+ }
+}