This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 4151578e7b [SYSTEMDS-3601] Performance IO benchmark
4151578e7b is described below

commit 4151578e7baab492b469728a4991e90b9aa41da6
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed Sep 6 11:14:00 2023 +0200

    [SYSTEMDS-3601] Performance IO benchmark
    
    Adds IO benchmark for Matrix/compressed matrix writing:
    
    In this commit the numbers are:
     Generator: ConstMatrix ( Rows:10000, Cols:100, Spar:1.0, Unique: 4)
                StandardDisk,   16.821+-  0.829 ms,    475607133 Byte/s
         Compress StandardIO,    9.806+-  1.066 ms,    815833646 Byte/s
    Update&Apply Standard IO,    4.460+-  0.444 ms,   1793930325 Byte/s
    
    Closes #1897
---
 .../java/org/apache/sysds/performance/Main.java    | 72 +++++++++++++++++--
 .../org/apache/sysds/performance/PerfUtil.java     | 34 +++++++++
 .../java/org/apache/sysds/performance/README.md    | 19 +++++
 .../org/apache/sysds/performance/TimingUtils.java  |  6 +-
 .../sysds/performance/compression/APerfTest.java   | 36 +++++++---
 .../sysds/performance/compression/Serialize.java   | 46 +++++++------
 .../generators/{IGenerate.java => Const.java}      | 36 +---------
 .../sysds/performance/generators/ConstFrame.java   | 67 ++++++++++++++++++
 .../sysds/performance/generators/ConstMatrix.java  | 24 ++++---
 .../sysds/performance/generators/FrameFile.java    | 80 ++++++++++++++++++++++
 .../sysds/performance/generators/IGenerate.java    |  5 ++
 .../sysds/performance/generators/MatrixFile.java   | 58 ++++++++++++++++
 12 files changed, 403 insertions(+), 80 deletions(-)

diff --git a/src/test/java/org/apache/sysds/performance/Main.java 
b/src/test/java/org/apache/sysds/performance/Main.java
index fa89a62b53..3fd2def237 100644
--- a/src/test/java/org/apache/sysds/performance/Main.java
+++ b/src/test/java/org/apache/sysds/performance/Main.java
@@ -24,12 +24,18 @@ import org.apache.sysds.performance.compression.SchemaTest;
 import org.apache.sysds.performance.compression.Serialize;
 import org.apache.sysds.performance.compression.StreamCompress;
 import org.apache.sysds.performance.generators.ConstMatrix;
+import org.apache.sysds.performance.generators.FrameFile;
 import org.apache.sysds.performance.generators.GenMatrices;
+import org.apache.sysds.performance.generators.IGenerate;
+import org.apache.sysds.performance.generators.MatrixFile;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.test.TestUtils;
 
 public class Main {
 
-       private static void exec(int prog, String[] args) throws 
InterruptedException, Exception {
+       private static void exec(int prog, String[] args) throws Exception {
                switch(prog) {
                        case 1:
                                new StreamCompress(100, new GenMatrices(10000, 
100, 32, 1.0)).run();
@@ -79,12 +85,25 @@ public class Main {
                        case 12:
                                run11(args, Integer.parseInt(args[7]));
                                break;
+                       case 13:
+                               run13(args);
+                               break;
+                       case 14:
+                               run14(args);
+                               break;
+
+                       case 15:
+                               run15(args);
+                               break;
+                       case 16:
+                               run16(args);
+                               break;
                        default:
                                break;
                }
        }
 
-       private static void run9(String[] args) throws InterruptedException, 
Exception {
+       private static void run9(String[] args) throws Exception {
                int rows = Integer.parseInt(args[1]);
                int cols = Integer.parseInt(args[2]);
                int unique = Integer.parseInt(args[3]);
@@ -94,7 +113,7 @@ public class Main {
                new IOBandwidth(n, new ConstMatrix(rows, cols, unique, 
sparsity), k).run();
        }
 
-       private static void run10(String[] args) throws InterruptedException, 
Exception {
+       private static void run10(String[] args) throws Exception {
                int rows = Integer.parseInt(args[1]);
                int cols = Integer.parseInt(args[2]);
                int unique = Integer.parseInt(args[3]);
@@ -104,7 +123,7 @@ public class Main {
                new IOBandwidth(n, new ConstMatrix(rows, cols, unique, 
sparsity), k).runVector();
        }
 
-       private static void run11(String[] args, int id) throws 
InterruptedException, Exception {
+       private static void run11(String[] args, int id) throws Exception {
                int rows = Integer.parseInt(args[1]);
                int cols = Integer.parseInt(args[2]);
                int unique = Integer.parseInt(args[3]);
@@ -120,13 +139,56 @@ public class Main {
                        s.run(id);
        }
 
+       private static void run13(String[] args) throws Exception {
+               int k = Integer.parseInt(args[1]);
+               int n = Integer.parseInt(args[2]);
+               String p = args[3];
+               int id = Integer.parseInt(args[4]);
+               run13A(n, MatrixFile.create(p), k, id);
+       }
+
+       private static void run14(String[] args) throws Exception {
+               int k = Integer.parseInt(args[1]);
+               int n = Integer.parseInt(args[2]);
+               String p = args[3]; // input frame
+               String s = args[4]; // spec
+               int id = Integer.parseInt(args[5]);
+               // run13A(n, FrameTransformFile.create(p, s), k, id);
+       }
+
+       private static void run13A(int n, IGenerate<MatrixBlock> g, int k, int 
id) throws Exception {
+
+               Serialize s = new Serialize(n, g, k);
+
+               if(id == -1)
+                       s.run();
+               else
+                       s.run(id);
+       }
+
+       private static void run15(String[] args) throws Exception {
+               int k = Integer.parseInt(args[1]);
+               int n = Integer.parseInt(args[2]);
+               IGenerate<FrameBlock> g = FrameFile.create(args[3]);
+               String spec = args[4];
+               // new TransformPerf(n, k, g, spec).run();
+       }
+
+       private static void run16(String[] args) {
+               int len = Integer.parseInt(args[1]);
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(len, len, 0, 100, 0.01, len 
+1));
+               System.out.println(mb);
+       }
+
+
        public static void main(String[] args) {
                try {
                        exec(Integer.parseInt(args[0]), args);
                }
                catch(Exception e) {
                        e.printStackTrace();
-               }finally{
+               }
+               finally {
                        CommonThreadPool.get().shutdown();
                }
        }
diff --git a/src/test/java/org/apache/sysds/performance/PerfUtil.java 
b/src/test/java/org/apache/sysds/performance/PerfUtil.java
new file mode 100644
index 0000000000..f93b03bdb3
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/PerfUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface PerfUtil {
+
+    public static String readSpec(String path) throws IOException {
+        InputStream in = new FileInputStream(path);
+        String spec = new String(in.readAllBytes());
+        in.close();
+        return spec;
+    }
+}
diff --git a/src/test/java/org/apache/sysds/performance/README.md 
b/src/test/java/org/apache/sysds/performance/README.md
index 206bdedbc0..7e7edbb805 100644
--- a/src/test/java/org/apache/sysds/performance/README.md
+++ b/src/test/java/org/apache/sysds/performance/README.md
@@ -51,3 +51,22 @@ With profiler:
 ```bash
 java -jar 
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
 target/systemds-3.2.0-SNAPSHOT-perf.jar 12 10000 100 4 1.0 16 1000 -1
 ```
+
+Take a Matrix and perform serialization
+
+```bash 
+java -jar 
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
 target/systemds-3.2.0-SNAPSHOT-perf.jar 13 16 100 "temp/test.csv" -1
+```
+
+Take a Frame and transform into a Matrix and perform serialization.
+
+```bash 
+java -jar 
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
 target/systemds-3.2.0-SNAPSHOT-perf.jar 14 16 1000 
"src/test/resources/datasets/titanic/titanic.csv" 
"src/test/resources/datasets/titanic/tfspec.json" -1
+```
+
+Frame Operation timings
+
+```bash
+java -jar 
-agentpath:$HOME/Programs/profiler/lib/libasyncProfiler.so=start,event=cpu,file=temp/log.html
 target/systemds-3.2.0-SNAPSHOT-perf.jar 15 16 10 
"src/test/resources/datasets/titanic/titanic.csv" 
"src/test/resources/datasets/titanic/tfspec.json"
+```
+
diff --git a/src/test/java/org/apache/sysds/performance/TimingUtils.java 
b/src/test/java/org/apache/sysds/performance/TimingUtils.java
index 9a02854a5d..11e2c1dca5 100644
--- a/src/test/java/org/apache/sysds/performance/TimingUtils.java
+++ b/src/test/java/org/apache/sysds/performance/TimingUtils.java
@@ -80,15 +80,17 @@ public interface TimingUtils {
         * it in the timing of the operation
         * 
         * @param f   The function to time
-        * @param c   A cleanup funtion or part that should not be timed.
+        * @param c   A cleanup function or part that should not be timed.
+        * @param b   A setup function that should not be timed.
         * @param rep The number of repetitions to make
         * @param bq  The generator for the input
         * @return A list of the individual repetitions execution time
         * @throws InterruptedException An exception in case the job gets 
interrupted
         */
-       public static double[] time(F f, F c, int rep, IGenerate<?> bq) throws 
InterruptedException {
+       public static double[] time(F f, F c, F b, int rep, IGenerate<?> bq) 
throws InterruptedException {
                double[] times = new double[rep];
                for(int i = 0; i < rep; i++) {
+                       b.run();
                        while(bq.isEmpty())
                                Thread.sleep(bq.defaultWaitTime());
                        time(f, times, i);
diff --git 
a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java 
b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
index 5533205f2d..74114bf84e 100644
--- a/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
+++ b/src/test/java/org/apache/sysds/performance/compression/APerfTest.java
@@ -43,16 +43,20 @@ public abstract class APerfTest<T, G> {
        }
 
        protected void execute(F f, String name) throws InterruptedException {
-               execute(f, () -> {
-                       return;
-               }, name);
+               N n = new N();
+               execute(f, n, n, name);
        }
 
        protected void execute(F f, F c, String name) throws 
InterruptedException {
+               N n = new N();
+               execute(f, c, n, name);
+       }
+
+       protected void execute(F f, F c, F b, String name) throws 
InterruptedException {
                warmup(f, 10);
                gen.generate(N);
                ret.clear();
-               double[] times = TimingUtils.time(f, c, N, gen);
+               double[] times = TimingUtils.time(f, c, b, N, gen);
                String retS = makeResString(times);
                System.out.println(String.format("%35s, %s, %10s", name, 
TimingUtils.stats(times), retS));
        }
@@ -63,15 +67,18 @@ public abstract class APerfTest<T, G> {
        }
 
        protected void execute(F f, String name, int N) throws 
InterruptedException {
-               execute(f, () -> {
-                       return;
-               }, name, N);
+               N none = new N();
+               execute(f, none, none, name, N);
        }
 
        protected void execute(F f, F c, String name, int N) throws 
InterruptedException {
+               execute(f, c, new N(), name, N);
+       }
+
+       protected void execute(F f, F c, F b, String name, int N) throws 
InterruptedException {
                gen.generate(N);
                ret.clear();
-               double[] times = TimingUtils.time(f, c, N, gen);
+               double[] times = TimingUtils.time(f, c, b, N, gen);
                String retS = makeResString(times);
                System.out.println(String.format("%35s, %s, %10s", name, 
TimingUtils.stats(times), retS));
        }
@@ -86,8 +93,19 @@ public abstract class APerfTest<T, G> {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(String.format("%20s ", 
this.getClass().getSimpleName()));
-               sb.append(" Repetitions: ").append(N).append(" ");
+               sb.append(" Repetitions: ").append(N).append("\n");
+               sb.append(String.format("%20s ","Generator:"));
                sb.append(gen);
+               sb.append("\n");
                return sb.toString();
        }
+
+       private class N implements F {
+
+               @Override
+               public void run() {
+                       // co nothing
+               }
+
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/performance/compression/Serialize.java 
b/src/test/java/org/apache/sysds/performance/compression/Serialize.java
index 149b1811f2..12316874c1 100644
--- a/src/test/java/org/apache/sysds/performance/compression/Serialize.java
+++ b/src/test/java/org/apache/sysds/performance/compression/Serialize.java
@@ -27,6 +27,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Files;
+import java.util.ArrayList;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
@@ -65,42 +66,42 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                super(N, gen);
                this.file = file;
                this.k = k;
-               
+
        }
 
        public void run() throws Exception, InterruptedException {
                CompressedMatrixBlock.debug = true;
+               CompressedMatrixBlock.debug = false;
                System.out.println(this);
                File directory = new File(file).getParentFile();
                if(!directory.exists()) {
                        directory.mkdir();
                }
-               if(k == 1){
+
+               if(k == 1) {
                        
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS,
 false);
                }
 
                warmup(() -> sumTask(k), N);
-               cleanup();
-               execute(() -> writeUncompressed(k), "Serialize");
+
+               // execute(() -> writeUncompressed(k), "Serialize");
                // execute(() -> diskUncompressed(k), "CustomDisk");
-               cleanup();
-               execute(() -> standardIO(k), () -> setFileSize(), 
"StandardDisk");
-               cleanup();
 
-               execute(() -> compressTask(k), "Compress Normal");
+               execute(() -> standardIO(k), () -> setFileSize(), () -> 
cleanup(), "StandardDisk");
+
+               // execute(() -> compressTask(k), "Compress Normal");
                // execute(() -> writeCompressTask(k), "Compress Normal 
Serialize");
                // execute(() -> diskCompressTask(k), "Compress Normal 
CustomDisk");
-               cleanup();
-               execute(() -> standardCompressedIO(k), () -> setFileSize(), 
"Compress StandardIO");
-               cleanup();
+
+               execute(() -> standardCompressedIO(k), () -> setFileSize(), () 
-> cleanup(), "Compress StandardIO");
 
                final CompressionScheme sch2 = CLALibScheme.getScheme(getC());
-               execute(() -> updateAndApplySchemeFused(sch2, k), "Update&Apply 
Scheme Fused");
+               // execute(() -> updateAndApplySchemeFused(sch2, k), 
"Update&Apply Scheme Fused");
                // execute(() -> writeUpdateAndApplySchemeFused(sch2, k), 
"Update&Apply Scheme Fused Serialize");
-               // cleanup();
                // execute(() -> diskUpdateAndApplySchemeFused(sch2, k), 
"Update&Apply Scheme Fused Disk");
-               cleanup();
-               execute(() -> standardCompressedIOUpdateAndApply(sch2, k), () 
-> setFileSize(), "Update&Apply Standard IO");
+
+               execute(() -> standardCompressedIOUpdateAndApply(sch2, k), () 
-> setFileSize(), () -> cleanup(),
+                       "Update&Apply Standard IO");
        }
 
        public void run(int i) throws Exception, InterruptedException {
@@ -110,7 +111,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                        
ConfigurationManager.getCompilerConfig().set(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS,
 false);
                }
 
-               final CompressionScheme sch = CLALibScheme.getScheme(getC());
+               final CompressionScheme sch = (i == 8 || i == 9 || i == 10 || i 
== 11) ? CLALibScheme.getScheme(getC()) : null;
                cleanup();
                switch(i) {
                        case 1:
@@ -120,7 +121,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                                execute(() -> diskUncompressed(k), 
"CustomDisk");
                                break;
                        case 3:
-                               execute(() -> standardIO(k), () -> 
setFileSize(), "StandardDisk");
+                               execute(() -> standardIO(k), () -> 
setFileSize(), () -> cleanup(), "StandardDisk");
                                break;
                        case 4:
                                execute(() -> compressTask(k), "Compress 
Normal");
@@ -132,7 +133,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                                execute(() -> diskCompressTask(k), "Compress 
Normal CustomDisk");
                                break;
                        case 7:
-                               execute(() -> standardCompressedIO(k), () -> 
setFileSize(), "Compress StandardIO");
+                               execute(() -> standardCompressedIO(k), () -> 
setFileSize(), () -> cleanup(), "Compress StandardIO");
                                break;
                        case 8:
                                execute(() -> updateAndApplySchemeFused(sch, 
k), "Update&Apply Scheme Fused");
@@ -144,7 +145,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                                execute(() -> 
diskUpdateAndApplySchemeFused(sch, k), "Update&Apply Scheme Fused Disk");
                                break;
                        case 11:
-                               execute(() -> 
standardCompressedIOUpdateAndApply(sch, k), () -> setFileSize(),
+                               execute(() -> 
standardCompressedIOUpdateAndApply(sch, k), () -> setFileSize(), () -> 
cleanup(),
                                        "Update&Apply Standard IO");
                                break;
                }
@@ -296,6 +297,10 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
 
        @Override
        protected String makeResString(double[] times) {
+               return makeResString(ret, times);
+       }
+
+       public static String makeResString(ArrayList<InOut> ret, double[] 
times) {
                double totalIn = 0;
                double totalOut = 0;
                double totalTime = 0.0;
@@ -334,6 +339,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                double stdOut = Math.sqrt(varOut / el);
 
                return String.format("%12.0f+-%12.0f Byte/s, %12.0f+-%12.0f 
Byte/s", bytePerMsIn, stdIn, bytePerMsOut, stdOut);
+
        }
 
        public static int compare(InOut a, InOut b) {
@@ -473,7 +479,7 @@ public class Serialize extends APerfTest<Serialize.InOut, 
MatrixBlock> {
                return super.toString() + " threads: " + k;
        }
 
-       protected class InOut {
+       protected static class InOut {
                protected long in;
                protected long out;
                protected double time;
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java 
b/src/test/java/org/apache/sysds/performance/generators/Const.java
similarity index 52%
copy from src/test/java/org/apache/sysds/performance/generators/IGenerate.java
copy to src/test/java/org/apache/sysds/performance/generators/Const.java
index ee39590bf3..2d3adc1ace 100644
--- a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
+++ b/src/test/java/org/apache/sysds/performance/generators/Const.java
@@ -19,38 +19,6 @@
 
 package org.apache.sysds.performance.generators;
 
-/**
- * Generator interface for task generation.
- */
-public interface IGenerate<T> {
-
-       /**
-        * Validate if the generator is empty, and we have to wait for elements.
-        * 
-        * @return If the generator is empty
-        */
-       public boolean isEmpty();
-
-       /**
-        * Default wait time for the generator to fill
-        * 
-        * @return The wait time
-        */
-       public int defaultWaitTime();
-
-       /**
-        * A Blocking take operation that waits for the Generator to fill that 
element
-        * 
-        * @return An task element
-        */
-       public T take();
-
-       /**
-        * A Non blocking async operation that generates elements for the task 
que
-        * 
-        * @param N The number of elements to create
-        * @throws InterruptedException An exception if the task is interrupted
-        */
-       public void generate(int N) throws InterruptedException;
-
+public interface Const<T> extends IGenerate<T> {
+    public void change(T t);
 }
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java 
b/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java
new file mode 100644
index 0000000000..13f7392380
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/ConstFrame.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import java.util.Arrays;
+
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+
+public class ConstFrame implements Const<FrameBlock> {
+
+    protected FrameBlock fb;
+
+    public ConstFrame(FrameBlock fb) {
+        this.fb = fb;
+    }
+
+    @Override
+    public FrameBlock take() {
+        return fb;
+    }
+
+    @Override
+    public void generate(int N) throws InterruptedException {
+        // do nothing
+    }
+
+    @Override
+    public final boolean isEmpty() {
+        return false;
+    }
+
+    @Override
+    public final int defaultWaitTime() {
+        return 0;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.getClass().getSimpleName());
+        sb.append(" Schema:");
+        sb.append(Arrays.toString(fb.getSchema()));
+        return sb.toString();
+    }
+
+    @Override
+    public void change(FrameBlock t) {
+        fb = t;
+    }
+}
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java 
b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
index f01d0a2075..f43e48caa7 100644
--- a/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
+++ b/src/test/java/org/apache/sysds/performance/generators/ConstMatrix.java
@@ -27,9 +27,9 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
 import org.apache.sysds.test.TestUtils;
 
-public class ConstMatrix implements IGenerate<MatrixBlock> {
+public class ConstMatrix implements Const<MatrixBlock> {
 
-       protected final MatrixBlock mb;
+       protected MatrixBlock mb;
        protected final int nVal;
 
        public ConstMatrix(MatrixBlock mb) {
@@ -55,6 +55,16 @@ public class ConstMatrix implements IGenerate<MatrixBlock> {
                // do nothing
        }
 
+       @Override
+       public final boolean isEmpty() {
+               return false;
+       }
+       
+       @Override
+       public final int defaultWaitTime() {
+               return 0;
+       }
+       
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
@@ -68,13 +78,7 @@ public class ConstMatrix implements IGenerate<MatrixBlock> {
        }
 
        @Override
-       public boolean isEmpty() {
-               return false;
-       }
-
-       @Override
-       public int defaultWaitTime() {
-               return 0;
+       public void change(MatrixBlock t) {
+               mb = t;
        }
-
 }
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/FrameFile.java 
b/src/test/java/org/apache/sysds/performance/generators/FrameFile.java
new file mode 100644
index 0000000000..d89a2589d7
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/FrameFile.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+
+public class FrameFile extends ConstFrame {
+
+    final private String path;
+
+    private FrameFile(String path, FrameBlock fb) {
+        super(fb);
+        this.path = path;
+        System.out.println("First 10 rows:");
+        System.out.println(fb.slice(0, 10));
+    }
+
+    public static FrameFile create(String path) throws Exception {
+
+        MetaDataAll mba = new MetaDataAll(path + ".mtd", false, true);
+        if(mba.mtdExists()) {
+            LOG.error(mba);
+
+            // DataCharacteristics ds = mba.getDataCharacteristics();
+            FileFormat f = 
FileFormat.valueOf(mba.getFormatTypeString().toUpperCase());
+            ValueType[] schema = FrameObject.parseSchema(mba.getSchema());
+            FileFormatProperties p = null;
+            if(f.equals(FileFormat.CSV)){
+                p = new FileFormatPropertiesCSV();
+                ((FileFormatPropertiesCSV)p).setHeader(mba.getHasHeader());
+            }
+            FrameReader r = FrameReaderFactory.createFrameReader(f, p);
+            FrameBlock fb = r.readFrameFromHDFS(path, schema, mba.getDim1(), 
mba.getDim2());
+            return new FrameFile(path, fb);
+        }
+        else {
+            LOG.error("No Mtd file found.. please add one. Fallback to CSV 
reading with header");
+            // we assume csv
+            FrameReader r = 
FrameReaderFactory.createFrameReader(FileFormat.CSV);
+            FrameBlock fb = r.readFrameFromHDFS(path, -1, -1);
+            return new FrameFile(path, fb);
+        }
+
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(super.toString());
+        sb.append(" From file: ");
+        sb.append(path);
+        return sb.toString();
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java 
b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
index ee39590bf3..7da382b7b8 100644
--- a/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
+++ b/src/test/java/org/apache/sysds/performance/generators/IGenerate.java
@@ -19,11 +19,16 @@
 
 package org.apache.sysds.performance.generators;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * Generator interface for task generation.
  */
 public interface IGenerate<T> {
 
+       public static final Log LOG = 
LogFactory.getLog(IGenerate.class.getName());
+
        /**
         * Validate if the generator is empty, and we have to wait for elements.
         * 
diff --git 
a/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java 
b/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java
new file mode 100644
index 0000000000..0f85528ad2
--- /dev/null
+++ b/src/test/java/org/apache/sysds/performance/generators/MatrixFile.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.performance.generators;
+
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.io.MatrixReader;
+import org.apache.sysds.runtime.io.MatrixReaderFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.DataCharacteristics;
+import org.apache.sysds.runtime.meta.MetaDataAll;
+
+public class MatrixFile extends ConstMatrix {
+
+    final private String path;
+
+    private MatrixFile(String path, MatrixBlock mb) {
+        super(mb);
+        this.path = path;
+    }
+
+    public static MatrixFile create(String path) throws Exception {
+
+        MetaDataAll mba = new MetaDataAll(path + ".mtd", false, true);
+        DataCharacteristics ds = mba.getDataCharacteristics();
+        FileFormat f = 
FileFormat.valueOf(mba.getFormatTypeString().toUpperCase());
+
+        MatrixReader r = MatrixReaderFactory.createMatrixReader(f);
+        MatrixBlock mb = r.readMatrixFromHDFS(path, ds.getRows(), 
ds.getCols(), ds.getBlocksize(), ds.getNonZeros());
+        return new MatrixFile(path, mb);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.getClass().getSimpleName());
+        sb.append(" From file: ");
+        sb.append(path);
+        return sb.toString();
+    }
+
+}

Reply via email to