This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c606fee  [SYSTEMDS-2987] Read/write of composite list objects (all 
formats)
c606fee is described below

commit c606feedef1da26187e2722f936a43c7d6f2f448
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Jun 5 22:04:40 2021 +0200

    [SYSTEMDS-2987] Read/write of composite list objects (all formats)
    
    This patch adds the missing support for list read and write for regular
    and named lists. Includes objects can be matrices, tensors, frames,
    scalars and even recursively nested list objects. In detail, we write a
    directory with proper mtd file, and then use the existing
    readers/writers for storing the individual objects. This has the
    advantage that we naturally (and consistently) support all formats,
    single- and multi-threaded read/write, and the separate files (all with
    their mtd files) can be even processed individually.
---
 src/main/java/org/apache/sysds/common/Types.java   |   1 +
 src/main/java/org/apache/sysds/lops/Data.java      |   2 +-
 .../java/org/apache/sysds/lops/compile/Dag.java    |   3 +-
 .../org/apache/sysds/parser/DataExpression.java    |  22 ++--
 .../java/org/apache/sysds/parser/Expression.java   |   4 +
 .../sysds/runtime/instructions/cp/ListObject.java  |   8 ++
 .../instructions/cp/VariableCPInstruction.java     |  43 +++----
 .../apache/sysds/runtime/io/IOUtilFunctions.java   |  18 +++
 .../org/apache/sysds/runtime/io/ListReader.java    | 131 +++++++++++++++++++++
 .../org/apache/sysds/runtime/io/ListWriter.java    |  85 +++++++++++++
 .../org/apache/sysds/runtime/meta/MetaDataAll.java |   5 +
 .../org/apache/sysds/runtime/util/HDFSTool.java    |  15 ++-
 .../sysds/test/functions/io/FullDynWriteTest.java  |   2 +-
 .../sysds/test/functions/io/ReadWriteListTest.java | 128 ++++++++++++++++++++
 src/test/scripts/functions/io/ListRead.dml         |  41 +++++++
 src/test/scripts/functions/io/ListWrite.dml        |  34 ++++++
 16 files changed, 504 insertions(+), 38 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index 5f077f6..77e79f6 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -109,6 +109,7 @@ public class Types
                                case "INT":      return INT64;
                                case "BOOLEAN":  return BOOLEAN;
                                case "STRING":   return STRING;
+                               case "UNKNOWN":  return UNKNOWN;
                                default:
                                        throw new DMLRuntimeException("Unknown 
value type: "+value);
                        }
diff --git a/src/main/java/org/apache/sysds/lops/Data.java 
b/src/main/java/org/apache/sysds/lops/Data.java
index f40af4e..2bd302d 100644
--- a/src/main/java/org/apache/sysds/lops/Data.java
+++ b/src/main/java/org/apache/sysds/lops/Data.java
@@ -382,7 +382,7 @@ public class Data extends Lop
        }
 
        public String getCreateVarInstructions(String outputFileName, String 
outputLabel) {
-               if ( getDataType() == DataType.MATRIX || getDataType() == 
DataType.FRAME ) {
+               if ( getDataType() == DataType.MATRIX || getDataType() == 
DataType.FRAME || getDataType() == DataType.LIST ) {
 
                        if ( _op.isTransient() )
                                throw new LopsException("getInstructions() 
should not be called for transient nodes.");
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java 
b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index cfe409e..469c600 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -343,7 +343,8 @@ public class Dag<N extends Lop>
                        if (n.isDataExecLocation() 
                                && !((Data) n).getOperationType().isTransient()
                                && ((Data) n).getOperationType().isRead()
-                               && (n.getDataType() == DataType.MATRIX || 
n.getDataType() == DataType.FRAME) )
+                               && (n.getDataType() == DataType.MATRIX || 
n.getDataType() == DataType.FRAME 
+                                  || n.getDataType() == DataType.LIST) )
                        {
                                if ( !((Data)n).isLiteral() ) {
                                        try {
diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java 
b/src/main/java/org/apache/sysds/parser/DataExpression.java
index 2bc1376..0f4352a 100644
--- a/src/main/java/org/apache/sysds/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysds/parser/DataExpression.java
@@ -1298,8 +1298,14 @@ public class DataExpression extends DataIdentifier
                                getOutput().setNnz(-1L);
                                setPrivacy();
                        }
+                       else if ( 
dataTypeString.equalsIgnoreCase(DataType.LIST.name())) {
+                               getOutput().setDataType(DataType.LIST);
+                               setPrivacy();
+                       }
                        else{
-                               raiseValidateError("Unknown Data Type " + 
dataTypeString + ". Valid  values: " + Statement.SCALAR_DATA_TYPE +", " + 
Statement.MATRIX_DATA_TYPE, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+                               raiseValidateError("Unknown Data Type " + 
dataTypeString + ". Valid  values: " 
+                                       + Statement.SCALAR_DATA_TYPE +", " + 
Statement.MATRIX_DATA_TYPE+", " + Statement.FRAME_DATA_TYPE
+                                       +", " + 
DataType.LIST.name().toLowerCase(), conditional, 
LanguageErrorCodes.INVALID_PARAMETERS);
                        }
                        
                        // handle value type parameter
@@ -1310,17 +1316,19 @@ public class DataExpression extends DataIdentifier
                        // Identify the value type (used only for read method)
                        String valueTypeString = getVarParam(VALUETYPEPARAM) == 
null ? null :  getVarParam(VALUETYPEPARAM).toString();
                        if (valueTypeString != null) {
-                               if 
(valueTypeString.equalsIgnoreCase(Statement.DOUBLE_VALUE_TYPE)) {
+                               if 
(valueTypeString.equalsIgnoreCase(Statement.DOUBLE_VALUE_TYPE))
                                        
getOutput().setValueType(ValueType.FP64);
-                               } else if 
(valueTypeString.equalsIgnoreCase(Statement.STRING_VALUE_TYPE)) {
+                               else if 
(valueTypeString.equalsIgnoreCase(Statement.STRING_VALUE_TYPE))
                                        
getOutput().setValueType(ValueType.STRING);
-                               } else if 
(valueTypeString.equalsIgnoreCase(Statement.INT_VALUE_TYPE)) {
+                               else if 
(valueTypeString.equalsIgnoreCase(Statement.INT_VALUE_TYPE))
                                        
getOutput().setValueType(ValueType.INT64);
-                               } else if 
(valueTypeString.equalsIgnoreCase(Statement.BOOLEAN_VALUE_TYPE)) {
+                               else if 
(valueTypeString.equalsIgnoreCase(Statement.BOOLEAN_VALUE_TYPE))
                                        
getOutput().setValueType(ValueType.BOOLEAN);
-                               } else {
+                               else if 
(valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name()))
+                                       
getOutput().setValueType(ValueType.UNKNOWN);
+                               else {
                                        raiseValidateError("Unknown Value Type 
" + valueTypeString
-                                                       + ". Valid values are: 
" + Statement.DOUBLE_VALUE_TYPE +", " + Statement.INT_VALUE_TYPE + ", " + 
Statement.BOOLEAN_VALUE_TYPE + ", " + Statement.STRING_VALUE_TYPE, conditional);
+                                               + ". Valid values are: " + 
Statement.DOUBLE_VALUE_TYPE +", " + Statement.INT_VALUE_TYPE + ", " + 
Statement.BOOLEAN_VALUE_TYPE + ", " + Statement.STRING_VALUE_TYPE, conditional);
                                }
                        } else {
                                getOutput().setValueType(ValueType.FP64);
diff --git a/src/main/java/org/apache/sysds/parser/Expression.java 
b/src/main/java/org/apache/sysds/parser/Expression.java
index e7b49ca..d8f0580 100644
--- a/src/main/java/org/apache/sysds/parser/Expression.java
+++ b/src/main/java/org/apache/sysds/parser/Expression.java
@@ -302,6 +302,10 @@ public abstract class Expression implements ParseInfo
        public static ValueType computeValueType(Expression expr1, ValueType 
v1, ValueType v2, boolean cast) {
                if (v1 == v2)
                        return v1;
+               if (v1 == ValueType.UNKNOWN && v2 != ValueType.UNKNOWN)
+                       return v2;
+               if (v1 != ValueType.UNKNOWN && v2 == ValueType.UNKNOWN)
+                       return v1;
 
                if (cast) {
                        if (v1 == ValueType.FP64 && v2 == ValueType.INT64)
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
index 392e9cf..edfd6cc 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListObject.java
@@ -61,10 +61,18 @@ public class ListObject extends Data implements 
Externalizable {
        public ListObject(List<Data> data) {
                this(data, null, null);
        }
+       
+       public ListObject(Data[] data) {
+               this(Arrays.asList(data), null, null);
+       }
 
        public ListObject(List<Data> data, List<String> names) {
                this(data, names, null);
        }
+       
+       public ListObject(Data[] data, String[] names) {
+               this(Arrays.asList(data), Arrays.asList(names), null);
+       }
 
        public ListObject(List<Data> data, List<String> names, 
List<LineageItem> lineage) {
                super(DataType.LIST, ValueType.UNKNOWN);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 3457dd4..a806741 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -52,6 +52,8 @@ import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.ListReader;
+import org.apache.sysds.runtime.io.ListWriter;
 import org.apache.sysds.runtime.io.WriterMatrixMarket;
 import org.apache.sysds.runtime.io.WriterTextCSV;
 import org.apache.sysds.runtime.lineage.LineageItem;
@@ -371,7 +373,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        }
 
                        MetaDataFormat iimd = null;
-                       if (dt == DataType.MATRIX || dt == DataType.FRAME) {
+                       if (dt == DataType.MATRIX || dt == DataType.FRAME || dt 
== DataType.LIST) {
                                DataCharacteristics mc = new 
MatrixCharacteristics();
                                if (parts.length == 6) {
                                        // do nothing
@@ -660,6 +662,12 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                                ec.setVariable(getInput1().getName(), fobj);
                                break;
                        }
+                       case LIST: {
+                               ListObject lo = 
ListReader.readListFromHDFS(getInput2().getName(),
+                                       
((MetaDataFormat)metadata).getFileFormat().name(), _formatProperties);
+                               ec.setVariable(getInput1().getName(), lo);
+                               break;
+                       }
                        case SCALAR: {
                                //created variable not called for scalars
                                ec.setScalarOutput(getInput1().getName(), null);
@@ -912,29 +920,8 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
         * @param ec execution context
         */
        private void processReadInstruction(ExecutionContext ec){
-               ScalarObject res = null;
-                       try {
-                               switch(getInput1().getValueType()) {
-                                       case FP64:
-                                               res = new 
DoubleObject(HDFSTool.readDoubleFromHDFSFile(getInput2().getName()));
-                                               break;
-                                       case INT64:
-                                               res = new 
IntObject(HDFSTool.readIntegerFromHDFSFile(getInput2().getName()));
-                                               break;
-                                       case BOOLEAN:
-                                               res = new 
BooleanObject(HDFSTool.readBooleanFromHDFSFile(getInput2().getName()));
-                                               break;
-                                       case STRING:
-                                               res = new 
StringObject(HDFSTool.readStringFromHDFSFile(getInput2().getName()));
-                                               break;
-                                       default:
-                                               throw new 
DMLRuntimeException("Invalid value type ("
-                                                       + 
getInput1().getValueType() + ") while processing readScalar instruction.");
-                               }
-                       } catch ( IOException e ) {
-                               throw new DMLRuntimeException(e);
-                       }
-                       ec.setScalarOutput(getInput1().getName(), res);
+               ec.setScalarOutput(getInput1().getName(),
+                       
HDFSTool.readScalarObjectFromHDFSFile(getInput2().getName(), 
getInput1().getValueType()));
        }
 
        /**
@@ -1012,6 +999,10 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        setPrivacyConstraint(to.getPrivacyConstraint());
                        to.exportData(fname, fmtStr, _formatProperties);
                }
+               else if( getInput1().getDataType() == DataType.LIST ) {
+                       ListObject lo = ec.getListObject(getInput1().getName());
+                       ListWriter.writeListToHDFS(lo, fname, fmtStr, 
_formatProperties);
+               }
        }
 
        /**
@@ -1162,8 +1153,8 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                                Path path = new Path(fname);
                                
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
                        }
-
-               } catch ( IOException e ) {
+               }
+               catch ( IOException e ) {
                        throw new DMLRuntimeException(e);
                }
        }
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index de74e81..df71dea 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -559,6 +559,24 @@ public class IOUtilFunctions
                return ret;
        }
        
+       public static Path[] getMetadataFilePaths( FileSystem fs, Path file ) 
+               throws IOException
+       {
+               Path[] ret = null;
+               if( fs.isDirectory(file) || 
IOUtilFunctions.isObjectStoreFileScheme(file) ) {
+                       LinkedList<Path> tmp = new LinkedList<>();
+                       FileStatus[] dStatus = fs.listStatus(file);
+                       for( FileStatus fdStatus : dStatus )
+                               if( 
fdStatus.getPath().toString().endsWith(".mtd") ) //mtd file
+                                       tmp.add(fdStatus.getPath());
+                       ret = tmp.toArray(new Path[0]);
+               }
+               else {
+                       throw new DMLRuntimeException("Unable to read meta data 
files from directory "+file.toString());
+               }
+               return ret;
+       }
+       
        /**
         * Delete the CRC files from the local file system associated with a
         * particular file and its metadata file.
diff --git a/src/main/java/org/apache/sysds/runtime/io/ListReader.java 
b/src/main/java/org/apache/sysds/runtime/io/ListReader.java
new file mode 100644
index 0000000..1569658
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/ListReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+public class ListReader 
+{
+       /**
+        * Reads a list object and all contained objects from a folder with 
related meta data.
+        * The individual objects (including nested lists) are read with 
existing matrix/frame 
+        * readers and meta data such that the entire list and separate objects 
can be restored.
+        * By using the existing readers, all formats are naturally supported 
and we can ensure
+        * consistency of the on-disk representation.
+        * 
+        * @param fname directory name 
+        * @param fmtStr format string
+        * @param props file format properties
+        * @return list object
+        * @throws DMLRuntimeException
+        */
+       public static ListObject readListFromHDFS(String fname, String fmtStr, 
FileFormatProperties props)
+               throws DMLRuntimeException
+       {
+               MetaDataAll meta = new MetaDataAll(fname+".mtd", false, true);
+               int numObjs = (int) meta.getDim1();
+               boolean named = false;
+               
+               Data[] data = null;
+               String[] names = null;
+               try {
+                       // read all meta data files
+                       Path dirPath = new Path(fname);
+                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+                       FileSystem fs = IOUtilFunctions.getFileSystem(dirPath, 
job);
+                       Path[] mtdFiles = 
IOUtilFunctions.getMetadataFilePaths(fs, dirPath);
+                       if( numObjs != mtdFiles.length ) {
+                               throw new DMLRuntimeException("List meta data 
does not match "
+                                       + "available mtd files: "+numObjs+" vs 
"+mtdFiles.length);
+                       }
+                       
+                       // determine if regular or named list
+                       named = Arrays.stream(mtdFiles).map(p -> p.toString())
+                               .anyMatch(s -> 
!s.substring(s.lastIndexOf('_')).equals("null"));
+                       data = new Data[numObjs];
+                       names = named ? new String[numObjs] : null;
+                       
+                       // read all individual files (but only create objects 
for 
+                       // matrices and frames, which are then read on demand 
via acquire())
+                       for( int i=0; i<numObjs; i++ ) {
+                               MetaDataAll lmeta = new 
MetaDataAll(mtdFiles[i].toString(), false, true);
+                               String lfname = 
lmeta.getFilename().substring(0, lmeta.getFilename().length()-4);
+                               DataCharacteristics dc = 
lmeta.getDataCharacteristics();
+                               FileFormat fmt = lmeta.getFileFormat();
+                               Data dat = null;
+                               switch( lmeta.getDataType() ) {
+                                       case MATRIX:
+                                               dat = new 
MatrixObject(lmeta.getValueType(), lfname);
+                                               break;
+                                       case TENSOR:
+                                               dat = new 
TensorObject(lmeta.getValueType(), lfname);
+                                               break;
+                                       case FRAME:
+                                               dat = new FrameObject(lfname);
+                                               if( lmeta.getSchema() != null )
+                                                       
((FrameObject)dat).setSchema(lmeta.getSchema());
+                                               break;
+                                       case LIST:
+                                               dat = 
ListReader.readListFromHDFS(lfname, fmt.toString(), props);
+                                               break;
+                                       case SCALAR:
+                                               dat = 
HDFSTool.readScalarObjectFromHDFSFile(lfname, lmeta.getValueType());
+                                               break;
+                                       default:
+                                               throw new 
DMLRuntimeException("Unexpected data type: " + lmeta.getDataType());
+                               }
+                               
+                               if(dat instanceof CacheableData<?>) {
+                                       ((CacheableData<?>)dat).setMetaData(new 
MetaDataFormat(dc, fmt));
+                                       
((CacheableData<?>)dat).enableCleanup(false); // disable delete
+                               }
+
+                               String[] parts = 
lfname.substring(lfname.lastIndexOf("/")+1).split("_");
+                               data[Integer.parseInt(parts[0])] = dat;
+                               if( named )
+                                       names[Integer.parseInt(parts[0])] = 
parts[1];
+                       }
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(
+                               "Failed to write list object of length 
"+numObjs+".", ex);
+               }
+               
+               // construct list object
+               return named ? new ListObject(data, names) : new 
ListObject(data);
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/ListWriter.java 
b/src/main/java/org/apache/sysds/runtime/io/ListWriter.java
new file mode 100644
index 0000000..d5b95d7
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/ListWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+public class ListWriter
+{
+       /**
+        * Writes a list object and all contained objects to a folder with 
related meta data.
+        * The individual objects (including nested lists) are written with 
existing matrix/frame 
+        * writers and meta data such that the entire list and separate objects 
can be read back.
+        * By using the existing writers, all formats are naturally supported 
and we can ensure
+        * consistency of the on-disk representation.
+        * 
+        * @param lo list object
+        * @param fname directory name 
+        * @param fmtStr format string
+        * @param props file format properties
+        * @throws DMLRuntimeException
+        */
+       public static void writeListToHDFS(ListObject lo, String fname, String 
fmtStr, FileFormatProperties props)
+               throws DMLRuntimeException
+       {
+               DataCharacteristics dc = new 
MatrixCharacteristics(lo.getLength(), 1, 0, 0);
+               
+               try {
+                       //write basic list meta data
+                       HDFSTool.writeMetaDataFile(fname + ".mtd", 
lo.getValueType(), null,
+                               lo.getDataType(), dc, 
FileFormat.safeValueOf(fmtStr),
+                               props, lo.getPrivacyConstraint());
+                       
+                       //create folder for list w/ appropriate permissions
+                       HDFSTool.createDirIfNotExistOnHDFS(fname,
+                               DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+                       
+                       //write regular/named list by position/position_name
+                       //TODO additional parallelization over objects (in 
addition to parallel writers)
+                       for(int i=0; i<lo.getLength(); i++) {
+                               Data dat = lo.getData(i);
+                               String lfname = fname 
+"/"+i+"_"+(lo.isNamedList()?lo.getName(i):"null");
+                               if( dat instanceof CacheableData<?> )
+                                       
((CacheableData<?>)dat).exportData(lfname, fmtStr, props);
+                               else if( dat instanceof ListObject )
+                                       writeListToHDFS((ListObject)dat, 
lfname, fmtStr, props);
+                               else { //scalar
+                                       ScalarObject so = (ScalarObject) dat;
+                                       
HDFSTool.writeObjectToHDFS(so.getValue(), lfname);
+                                       HDFSTool.writeScalarMetaDataFile(lfname 
+".mtd",
+                                               so.getValueType(), 
so.getPrivacyConstraint());
+                               }
+                       }
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(
+                               "Failed to write list object of length 
"+dc.getRows()+".", ex);
+               }
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java 
b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
index 62bd1f9..d312289 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
@@ -87,6 +87,7 @@ public class MetaDataAll extends DataIdentifier {
        }
 
        public MetaDataAll(String mtdFileName, boolean conditional, boolean 
parseMeta) {
+               setFilename(mtdFileName);
                _metaObj = readMetadataFile(mtdFileName, conditional);
                setPrivacy(PrivacyConstraint.PrivacyLevel.None);
                if(parseMeta)
@@ -235,6 +236,10 @@ public class MetaDataAll extends DataIdentifier {
                if(_formatTypeString != null && 
EnumUtils.isValidEnum(Types.FileFormat.class, _formatTypeString.toUpperCase()))
                        
setFileFormat(Types.FileFormat.safeValueOf(_formatTypeString));
        }
+       
+       public DataCharacteristics getDataCharacteristics() {
+               return new MatrixCharacteristics(getDim1(), getDim2(), 
getBlocksize(), getNnz());
+       }
 
        @SuppressWarnings("unchecked")
        public HashMap<String, Expression> parseMetaDataFileParameters(String 
mtdFileName, boolean conditional, HashMap<String, Expression> varParams)
diff --git a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java 
b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
index f19f1ca..b44b7ce 100644
--- a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
@@ -41,6 +41,8 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
 import org.apache.sysds.runtime.io.BinaryBlockSerialization;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -308,11 +310,20 @@ public class HDFSTool
                        default: return line;
                }
        }
-               
+       
+       public static ScalarObject readScalarObjectFromHDFSFile(String fname, 
ValueType vt) {
+               try {
+                       return ScalarObjectFactory.createScalarObject(vt, 
readObjectFromHDFSFile(fname, vt));
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+       
        private static BufferedWriter setupOutputFile ( String filename ) 
throws IOException {
                Path path = new Path(filename);
                FileSystem fs = IOUtilFunctions.getFileSystem(path);
-               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));             
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
                return br;
        }
        
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java 
b/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
index b1798f7..e2f65cb 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/FullDynWriteTest.java
@@ -107,7 +107,7 @@ public class FullDynWriteTest extends AutomatedTestBase
                programArgs = new String[]{ "-explain","-args",
                        input("A"), fmt.toString(), outputDir()};
                
-               try 
+               try
                {
                        long seed1 = System.nanoTime();
                        double[][] A = getRandomMatrix(rows, cols, 0, 1, 1.0, 
seed1);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java 
b/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java
new file mode 100644
index 0000000..ad3415c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/io/ReadWriteListTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+public class ReadWriteListTest extends AutomatedTestBase
+{
+       private final static String TEST_NAME1 = "ListWrite";
+       private final static String TEST_NAME2 = "ListRead";
+       private final static String TEST_DIR = "functions/io/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ReadWriteListTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-6;
+       
+       private final static int rows = 350;
+       private final static int cols = 110;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"L","R1"}) );
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"R2"}) );
+       }
+
+       @Test
+       public void testListBinarySinglenode() {
+               runListReadWriteTest(false, FileFormat.BINARY, 
ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testListBinaryHybrid() {
+               runListReadWriteTest(false, FileFormat.BINARY, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testListTextSinglenode() {
+               runListReadWriteTest(false, FileFormat.TEXT, 
ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testListTextHybrid() {
+               runListReadWriteTest(false, FileFormat.TEXT, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testNamedListBinarySinglenode() {
+               runListReadWriteTest(true, FileFormat.BINARY, 
ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testNamedListBinaryHybrid() {
+               runListReadWriteTest(true, FileFormat.BINARY, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testNamedListTextSinglenode() {
+               runListReadWriteTest(true, FileFormat.TEXT, 
ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testNamedListTextHybrid() {
+               runListReadWriteTest(true, FileFormat.TEXT, ExecMode.HYBRID);
+       }
+       
+       //TODO support for Spark write/read
+       
+       private void runListReadWriteTest(boolean named, FileFormat format, 
ExecMode mode)
+       {
+               ExecMode modeOld = setExecMode(mode);
+               
+               try {
+                       TestConfiguration config = 
getTestConfiguration(TEST_NAME1);
+                       loadTestConfiguration(config);
+                       
+                       //run write
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[]{"-args", 
String.valueOf(rows),
+                               String.valueOf(cols), output("R1"), 
output("L"), format.toString(), String.valueOf(named)};
+                       
+                       runTest(true, false, null, -1);
+                       double val1 = 
HDFSTool.readDoubleFromHDFSFile(output("R1"));
+                       
+                       //run read
+                       fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
+                       programArgs = new String[]{"-args", output("L"), 
output("R2")};
+                       
+                       runTest(true, false, null, -1);
+                       double val2 = 
HDFSTool.readDoubleFromHDFSFile(output("R2"));
+                       
+                       Assert.assertEquals(new Double(val1), new Double(val2), 
eps);
+               }
+               catch(IOException e) {
+                       e.printStackTrace();
+                       throw new RuntimeException(e);
+               }
+               finally {
+                       resetExecMode(modeOld);
+               }
+       }
+}
diff --git a/src/test/scripts/functions/io/ListRead.dml 
b/src/test/scripts/functions/io/ListRead.dml
new file mode 100644
index 0000000..6a5910a
--- /dev/null
+++ b/src/test/scripts/functions/io/ListRead.dml
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+L = read($1);
+W1 = as.matrix(L[1]);
+W2 = as.matrix(L[2]);
+s3 = as.scalar(L[3]);
+W4 = as.matrix(L[4]);
+R1 = sum(W1 * W2 + s3 * W4);
+
+# cut and reexecute to ensure that
+# individual files were not deleted
+while(FALSE){} 
+Lb = read($1);
+W1b = as.matrix(Lb[1]);
+W2b = as.matrix(Lb[2]);
+s3b = as.scalar(Lb[3]);
+W4b = as.matrix(Lb[4]);
+
+R2 = sum(W1b * W2b + s3b * W4b);
+R3 = R1/2 + R2/2
+
+write(R3, $2, format="text");
diff --git a/src/test/scripts/functions/io/ListWrite.dml 
b/src/test/scripts/functions/io/ListWrite.dml
new file mode 100644
index 0000000..bb55635
--- /dev/null
+++ b/src/test/scripts/functions/io/ListWrite.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+nr = $1
+nc = $2
+
+W1 = rand(rows=nr, cols=nc, seed=1);
+W2 = rand(rows=nr, cols=nc, seed=2);
+s3 = nr * nc;
+W4 = rand(rows=nr, cols=nc, seed=3);
+
+R1 = sum(W1 * W2 + s3 * W4);
+L = list(W1, W2, s3, W4);
+
+write(R1, $3, format="text");
+write(L, $4, format=$5)

Reply via email to