This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 1c70242f36 [MINOR] Improved parfor test coverage (new tests, removed
old code)
1c70242f36 is described below
commit 1c70242f3691d14c301751ff96381770691d2e84
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Nov 30 19:12:23 2024 +0100
[MINOR] Improved parfor test coverage (new tests, removed old code)
This patch improves the parfor test coverage via a couple of new test
for ID handling, result merge, and task queues. Additionally, we
remove old code such as redundant cell indexes, and data partitioning
for text cell (which is no longer reachable).
These tests also led to minor improvements (faster obtaining of
process IDs) and correct reset of ID sequences (0 instead of 1).
---
src/main/java/org/apache/sysds/api/DMLScript.java | 6 +-
.../parfor/DataPartitionerLocal.java | 224 +++------------------
.../parfor/TaskPartitionerFactory.java | 2 +-
.../runtime/controlprogram/parfor/util/Cell.java | 72 -------
.../controlprogram/parfor/util/IDHandler.java | 20 +-
.../controlprogram/parfor/util/IDSequence.java | 2 +-
.../parfor/util/StagingFileUtils.java | 205 -------------------
.../test/component/parfor/ParForIDHandling.java | 87 ++++++++
.../test/component/parfor/ResultMergeTest.java | 36 ++--
.../test/component/parfor/TaskPartitionerTest.java | 30 ++-
10 files changed, 164 insertions(+), 520 deletions(-)
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java
b/src/main/java/org/apache/sysds/api/DMLScript.java
index 81ce1f04b0..d6853891e2 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -624,11 +624,11 @@ public class DMLScript
// TODO fix and replace localhost identifyer
with hostname in federated instructions SYSTEMDS-3440
//
https://issues.apache.org/jira/browse/SYSTEMDS-3440
model.host = "localhost";
- model.processId =
Long.parseLong(IDHandler.getProcessID());
+ model.processId = IDHandler.getProcessID();
String requestBody = objectMapper
-
.writerWithDefaultPrettyPrinter()
- .writeValueAsString(model);
+ .writerWithDefaultPrettyPrinter()
+ .writeValueAsString(model);
var client = HttpClient.newHttpClient();
var request =
HttpRequest.newBuilder(URI.create(uriString))
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
index 0644b4d9d3..5be598aa15 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -19,36 +19,26 @@
package org.apache.sysds.runtime.controlprogram.parfor;
-import java.io.BufferedWriter;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.HashMap;
+import java.io.InputStreamReader;
import java.util.LinkedList;
-import java.util.Map.Entry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.parfor.util.Cell;
-import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
@@ -78,7 +68,6 @@ public class DataPartitionerLocal extends DataPartitioner
{
private static final boolean PARALLEL = true;
- private IDSequence _seq = null;
private MatrixBlock _reuseBlk = null;
private int _par = -1;
@@ -93,7 +82,6 @@ public class DataPartitionerLocal extends DataPartitioner
super(dpf._dpf, dpf._N);
if( dpf.isBlockwise() )
throw new DMLRuntimeException("Data partitioning formt
'"+dpf+"' not supported by DataPartitionerLocal" );
- _seq = new IDSequence();
_par = (par > 0) ? par : 1;
}
@@ -107,9 +95,7 @@ public class DataPartitionerLocal extends DataPartitioner
String fnameStaging = LocalFileUtils.getUniqueWorkingDir(
LocalFileUtils.CATEGORY_PARTITIONING );
//reblock input matrix
- if( fmt == FileFormat.TEXT )
- partitionTextCell( fname, fnameStaging, fnameNew, rlen,
clen, blen );
- else if( fmt == FileFormat.BINARY )
+ if( fmt == FileFormat.BINARY )
partitionBinaryBlock( fname, fnameStaging, fnameNew,
rlen, clen, blen );
else
throw new DMLRuntimeException("Cannot create data
partitions of format: "+fmt.toString());
@@ -117,97 +103,6 @@ public class DataPartitionerLocal extends DataPartitioner
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
}
- private void partitionTextCell( String fname, String fnameStaging,
String fnameNew, long rlen, long clen, int blen )
- {
- long row = -1;
- long col = -1;
-
- try
- {
- //STEP 1: read matrix from HDFS and write blocks to
local staging area
- //check and add input path
- JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(fname);
- FileInputFormat.addInputPath(job, path);
- TextInputFormat informat = new TextInputFormat();
- informat.configure(job);
- InputSplit[] splits = informat.getSplits(job, 1);
-
- LinkedList<Cell> buffer = new LinkedList<>();
- LongWritable key = new LongWritable();
- Text value = new Text();
- FastStringTokenizer st = new FastStringTokenizer(' ');
-
- for(InputSplit split: splits)
- {
- RecordReader<LongWritable,Text> reader =
informat.getRecordReader(split, job, Reporter.NULL);
- try
- {
- while(reader.next(key, value))
- {
- st.reset( value.toString() );
//reset tokenizer
- row = st.nextLong();
- col = st.nextLong();
- double lvalue = st.nextDouble();
- Cell tmp = new Cell( row, col,
lvalue );
-
- buffer.addLast( tmp );
- if( buffer.size() >
StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
-
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
-
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
- buffer.clear();
- }
- }
- finally {
- IOUtilFunctions.closeSilently(reader);
- }
- }
-
- //STEP 2: read matrix blocks from staging area and
write matrix to HDFS
- String[] fnamesPartitions = new
File(fnameStaging).list();
- if(PARALLEL)
- {
- int len = Math.min(fnamesPartitions.length,
_par);
- Thread[] threads = new Thread[len];
- for( int i=0;i<len;i++ )
- {
- int start =
i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
- int end =
(i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
- end = Math.min(end,
fnamesPartitions.length-1);
- threads[i] = new Thread(new
DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions,
start, end));
- threads[i].start();
- }
-
- for( Thread t : threads )
- t.join();
- }
- else
- {
- for( String pdir : fnamesPartitions )
- writeTextCellFileToHDFS( job, fnameNew,
fnameStaging+"/"+pdir );
- }
- }
- catch (Exception e)
- {
- //post-mortem error handling and bounds checking
- if( row < 1 || row > rlen || col < 1 || col > clen )
- {
- throw new DMLRuntimeException("Matrix cell
["+(row)+","+(col)+"] " +
-
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
- }
- else
- throw new DMLRuntimeException("Unable to
partition text cell matrix.", e);
- }
- }
-
@SuppressWarnings("deprecation")
private void partitionBinaryBlock( String fname, String fnameStaging,
String fnameNew, long rlen, long clen, int blen )
{
@@ -323,52 +218,6 @@ public class DataPartitionerLocal extends DataPartitioner
LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
}
}
-
- private void appendCellBufferToStagingArea( String dir,
LinkedList<Cell> buffer, int blen )
- throws IOException
- {
- HashMap<Long,LinkedList<Cell>> sortedBuffer = new HashMap<>();
-
- //sort cells in buffer wrt key
- long key = -1;
- for( Cell c : buffer )
- {
- switch(_format)
- {
- case ROW_WISE:
- key = c.getRow();
- c.setRow(1);
- break;
- case ROW_BLOCK_WISE:
- key = (c.getRow()-1)/blen+1;
- c.setRow((c.getRow()-1)%blen+1);
- break;
- case COLUMN_WISE:
- key = c.getCol();
- c.setCol(1);
- break;
- case COLUMN_BLOCK_WISE:
- key = (c.getCol()-1)/blen+1;
- c.setCol((c.getCol()-1)%blen+1);
- break;
- default:
- //do nothing
- }
-
- if( !sortedBuffer.containsKey(key) )
- sortedBuffer.put(key, new LinkedList<Cell>());
- sortedBuffer.get(key).addLast(c);
- }
-
- //write lists of cells to local files
- for( Entry<Long,LinkedList<Cell>> e : sortedBuffer.entrySet() )
- {
- String pdir =
LocalFileUtils.checkAndCreateStagingDir(dir+"/"+e.getKey());
- String pfname = pdir+"/"+"block_"+_seq.getNextID();
- StagingFileUtils.writeCellListToLocal(pfname,
e.getValue());
- }
- }
-
/////////////////////////////////////
// Helper methods for HDFS //
@@ -425,11 +274,11 @@ public class DataPartitionerLocal extends DataPartitioner
String[] fnameBlocks = new File( lpdir ).list();
for( String fnameBlock : fnameBlocks )
{
- LinkedList<Cell> tmp =
StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
- for( Cell c : tmp )
+ LinkedList<IJV> tmp =
readCellListFromLocal(lpdir+"/"+fnameBlock);
+ for( IJV c : tmp )
{
- indexes.setIndexes(c.getRow(),
c.getCol());
- cell.setValue(c.getValue());
+ indexes.setIndexes(c.getI(), c.getJ());
+ cell.setValue(c.getV());
writer.append(indexes, cell);
}
}
@@ -438,38 +287,27 @@ public class DataPartitionerLocal extends DataPartitioner
IOUtilFunctions.closeSilently(writer);
}
}
-
- public void writeTextCellFileToHDFS( JobConf job, String dir, String
lpdir )
+
+ private static LinkedList<IJV> readCellListFromLocal( String fname )
throws IOException
{
- long key = getKeyFromFilePath(lpdir);
- Path path = new Path(dir+"/"+key);
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- try(BufferedWriter out = new BufferedWriter(new
OutputStreamWriter(fs.create(path,true))))
- {
- //for obj reuse and preventing repeated buffer
re-allocations
- StringBuilder sb = new StringBuilder();
-
- String[] fnameBlocks = new File( lpdir ).list();
- for( String fnameBlock : fnameBlocks )
- {
- LinkedList<Cell> tmp =
StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
- for( Cell c : tmp )
- {
- sb.append(c.getRow());
- sb.append(' ');
- sb.append(c.getCol());
- sb.append(' ');
- sb.append(c.getValue());
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- }
+ FileInputStream fis = new FileInputStream( fname );
+ LinkedList<IJV> buffer = new LinkedList<>();
+ try(BufferedReader in = new BufferedReader(new
InputStreamReader(fis))) {
+ String value = null;
+ FastStringTokenizer st = new FastStringTokenizer(' ');
+ while( (value=in.readLine())!=null ) {
+ st.reset( value ); //reset tokenizer
+ int row = (int)st.nextLong();
+ int col = (int)st.nextLong();
+ double lvalue = st.nextDouble();
+ IJV c = new IJV().set( row, col, lvalue );
+ buffer.addLast( c );
}
}
+ return buffer;
}
-
/////////////////////////////////
// Helper methods for local fs //
// read/write //
@@ -525,21 +363,7 @@ public class DataPartitionerLocal extends DataPartitioner
public abstract void writeFileToHDFS( JobConf job, String
fnameNew, String stagingDir )
throws IOException;
}
-
- private class DataPartitionerWorkerTextCell extends
DataPartitionerWorker
- {
- public DataPartitionerWorkerTextCell(JobConf job, String
fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) {
- super(job, fnameNew, fnameStaging, fnamesPartitions,
start, end);
- }
- @Override
- public void writeFileToHDFS(JobConf job, String fnameNew,
String stagingDir)
- throws IOException
- {
- writeTextCellFileToHDFS( job, fnameNew, stagingDir );
- }
- }
-
private class DataPartitionerWorkerBinaryBlock extends
DataPartitionerWorker
{
public DataPartitionerWorkerBinaryBlock(JobConf job, String
fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) {
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
index 66d4961104..5d4bc6274e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
@@ -23,7 +23,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
import
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
import org.apache.sysds.runtime.instructions.cp.IntObject;
-public abstract class TaskPartitionerFactory
+public class TaskPartitionerFactory
{
public static TaskPartitioner createTaskPartitioner(PTaskPartitioner
type,
IntObject from, IntObject to, IntObject incr, long taskSize,
int numThreads, String iterPredVar)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/Cell.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/Cell.java
deleted file mode 100644
index ae68c0f216..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/Cell.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor.util;
-
-/**
- * Helper class for representing text cell and binary cell records in order to
- * allow for buffering and buffered read/write.
- *
- * NOTE: could be replaced by IJV.class but used in order to ensure
independence.
- */
-public class Cell
-{
-
-
- private long _row;
- private long _col;
- private double _value;
-
- public Cell( long r, long c, double v )
- {
- _row = r;
- _col = c;
- _value = v;
- }
-
- public long getRow()
- {
- return _row;
- }
-
- public long getCol()
- {
- return _col;
- }
-
- public double getValue()
- {
- return _value;
- }
-
- public void setRow( long row )
- {
- _row = row;
- }
-
- public void setCol( long col )
- {
- _col = col;
- }
-
- public void setValue( double value )
- {
- _value = value;
- }
-}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
index 37c5834dc0..7e5263ebc0 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDHandler.java
@@ -19,7 +19,6 @@
package org.apache.sysds.runtime.controlprogram.parfor.util;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
@@ -78,28 +77,21 @@ public class IDHandler
* @return distributed unique id
*/
public static String createDistributedUniqueID() {
- String uuid = null;
+ String uuid = "0_0.0.0.0";
try {
- String pid = getProcessID();
+ long pid = getProcessID();
String host = getIPAddress(false);
uuid = pid + "_" + host;
}
- catch(Exception ex) {
- uuid = "0_0.0.0.0";
- }
+ catch(Exception ex) {}
return uuid;
}
- public static String getProcessID() {
- //get process id
- String pname = ManagementFactory.getRuntimeMXBean().getName();
//pid@hostname
- String pid = pname.split("@")[0];
- // TODO: change this as soon as we switch to a java version >= 9
- // import java.lang.ProcessHandle;
- // pid = ProcessHandle.current().pid();
- return pid;
+ public static long getProcessID() {
+ //alternative: ManagementFactory.getRuntimeMXBean().getName()
--> pid@hostname
+ return ProcessHandle.current().pid();
}
public static String getIPAddress(boolean noLocal) throws
SocketException, UnknownHostException {
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDSequence.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDSequence.java
index fe40063b3d..93d7eec4d4 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDSequence.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/IDSequence.java
@@ -66,7 +66,7 @@ public class IDSequence
}
public void reset() {
- reset(0);
+ reset(-1);
}
public void reset(long value) {
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/StagingFileUtils.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/StagingFileUtils.java
deleted file mode 100644
index abd02322d2..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/util/StagingFileUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor.util;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.data.DenseBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.util.FastStringTokenizer;
-
-public class StagingFileUtils
-{
-
- public static final int CELL_BUFFER_SIZE = 100000;
-
- public static void writeCellListToLocal( String fname, LinkedList<Cell>
buffer )
- throws IOException
- {
- FileOutputStream fos = new FileOutputStream( fname, true );
- try(BufferedWriter out = new BufferedWriter(new
OutputStreamWriter(fos))) {
- //for obj reuse and preventing repeated buffer
re-allocations
- StringBuilder sb = new StringBuilder();
- for( Cell c : buffer ) {
- sb.append(c.getRow());
- sb.append(' ');
- sb.append(c.getCol());
- sb.append(' ');
- sb.append(c.getValue());
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- }
- }
- }
-
- public static void writeKeyMappingToLocal( String fname, long[][] keys
)
- throws IOException
- {
- FileOutputStream fos = new FileOutputStream( fname, true );
- try( BufferedWriter out = new BufferedWriter(new
OutputStreamWriter(fos)) ) {
- //for obj reuse and preventing repeated buffer
re-allocations
- StringBuilder sb = new StringBuilder();
-
- for( long[] key : keys )
- {
- sb.append(key[0]);
- sb.append(' ');
- sb.append(key[1]);
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- }
- }
- }
-
- public static BufferedReader openKeyMap( String name )
- throws FileNotFoundException
- {
- FileInputStream fis = new FileInputStream( name );
- BufferedReader in = new BufferedReader(new
InputStreamReader(fis));
- return in;
- }
-
- public static void nextKeyMap( BufferedReader in,
HashMap<Integer,HashMap<Long,Long>> map, int bi, int blen )
- throws NumberFormatException, IOException
- {
- String value = null;
- FastStringTokenizer st = new FastStringTokenizer(' ');
- while( (value=in.readLine())!=null )
- {
- st.reset( value ); //reset tokenizer
- long row1 = st.nextLong();
- long row2 = st.nextLong();
-
- int id = (int)row1/blen;
- if( !map.containsKey(id) )
- map.put(id, new HashMap<Long,Long>());
-
- map.get(id).put(row1, row2);
- if( id > bi )
- break;
- }
- }
-
- public static int nextSizedKeyMap( BufferedReader in,
HashMap<Integer,HashMap<Long,Long>> map, int blen, int size )
- throws NumberFormatException, IOException
- {
- map.clear();
-
- String value = null;
- int len = 0;
- FastStringTokenizer st = new FastStringTokenizer(' ');
- while( (value=in.readLine())!=null )
- {
- st.reset( value ); //reset tokenizer
- long row1 = st.nextLong();
- long row2 = st.nextLong();
-
- int id = (int)row1/blen;
- if( !map.containsKey(id) )
- map.put(id, new HashMap<Long,Long>());
-
- map.get(id).put(row1, row2);
- len++;
-
- if( len >= size )
- break;
- }
-
- return len;
- }
-
- public static LinkedList<Cell> readCellListFromLocal( String fname )
- throws IOException
- {
- FileInputStream fis = new FileInputStream( fname );
- LinkedList<Cell> buffer = new LinkedList<>();
- try(BufferedReader in = new BufferedReader(new
InputStreamReader(fis))) {
- String value = null;
- FastStringTokenizer st = new FastStringTokenizer(' ');
- while( (value=in.readLine())!=null ) {
- st.reset( value ); //reset tokenizer
- long row = st.nextLong();
- long col = st.nextLong();
- double lvalue = st.nextDouble();
- Cell c = new Cell( row, col, lvalue );
- buffer.addLast( c );
- }
- }
- return buffer;
- }
-
- public static MatrixBlock readCellList2BlockFromLocal( String fname,
int blen )
- throws IOException, DMLRuntimeException
- {
- return readCellList2BlockFromLocal( fname, blen, false );
- }
-
- public static MatrixBlock readCellList2BlockFromLocal( String fname,
int blen, boolean sparse )
- throws IOException, DMLRuntimeException
- {
- MatrixBlock tmp = new MatrixBlock( blen, blen, sparse );
- if( !sparse )
- tmp.allocateDenseBlockUnsafe(blen, blen);
-
- FileInputStream fis = new FileInputStream( fname );
- FastStringTokenizer st = new FastStringTokenizer(' ');
- try(BufferedReader in = new BufferedReader(new
InputStreamReader(fis))) {
- String value = null;
- if( sparse ) {
- while( (value=in.readLine())!=null ) {
- st.reset( value ); //reset tokenizer
- int row = st.nextInt();
- int col = st.nextInt();
- double lvalue = st.nextDouble();
- tmp.set(row, col, lvalue);
- }
- }
- else {
- DenseBlock a = tmp.getDenseBlock();
- while( (value=in.readLine())!=null ) {
- st.reset( value ); //reset tokenizer
- int row = st.nextInt();
- int col = st.nextInt();
- double lvalue = st.nextDouble();
- a.set(row, col, lvalue);
- }
- tmp.recomputeNonZeros();
- }
- }
-
- //finally change internal representation if required
- tmp.examSparsity();
-
- return tmp;
- }
-
-}
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/ParForIDHandling.java
b/src/test/java/org/apache/sysds/test/component/parfor/ParForIDHandling.java
new file mode 100644
index 0000000000..cda28e1908
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ParForIDHandling.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.parfor;
+
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
+import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParForIDHandling {
+
+ @Test
+ public void testExtractIntID() {
+ Assert.assertEquals(2000009,
IDHandler.extractIntID("task_local_0002_m_000009"));
+ Assert.assertEquals(898000001,
IDHandler.extractIntID("task_201203111647_0898_m_000001"));
+ }
+
+ @Test
+ public void testDistributedUniqueID() {
+
Assert.assertTrue(IDHandler.createDistributedUniqueID().contains("_"));
+ }
+
+ @Test
+ public void testProcessID() {
+ Assert.assertTrue(IDHandler.getProcessID() < Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testIntConcatenation() {
+ long tmp = IDHandler.concatIntIDsToLong(3, 7);
+ Assert.assertEquals(3, IDHandler.extractIntIDFromLong(tmp, 1));
+ Assert.assertEquals(7, IDHandler.extractIntIDFromLong(tmp, 2));
+ Assert.assertEquals(-1, IDHandler.extractIntIDFromLong(tmp, 3));
+ }
+
+ @Test
+ public void testIPAddress() throws SocketException,
UnknownHostException {
+ Assert.assertNotEquals(null, IDHandler.getIPAddress(false));
+ Assert.assertNotEquals(null, IDHandler.getIPAddress(true));
+ }
+
+ @Test
+ public void testCyclicIDSequence() {
+ testIDSequence(true);
+ }
+
+ @Test
+ public void testNonCyclicIDSequence() {
+ testIDSequence(false);
+ }
+
+ public void testIDSequence(boolean cyclic) {
+ IDSequence seq = new IDSequence(cyclic, 2);
+ Assert.assertEquals(-1, seq.getCurrentID());
+ Assert.assertEquals(0, seq.getNextID());
+ Assert.assertEquals(1, seq.getNextID());
+ try {
+ Assert.assertEquals(2, seq.getNextID());
+ Assert.assertEquals(0, seq.getNextID());
+ if( !cyclic ) // should have raised exception
+ Assert.fail();
+ }
+ catch(Exception ex) {
+ Assert.assertEquals(2, seq.getCurrentID());
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
index a5aa30d1d1..23e94e809c 100644
--- a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
@@ -63,20 +63,27 @@ public class ResultMergeTest extends AutomatedTestBase{
}
private void testResultMergeAll(PResultMerge mtype) {
- testResultMerge(false, false, false, mtype);
- testResultMerge(false, true, false, mtype);
- testResultMerge(true, false, false, mtype);
- testResultMerge(false, false, true, mtype);
+ testResultMerge(false, false, false, false, mtype);
+ testResultMerge(false, true, false, false, mtype);
+ testResultMerge(true, false, false, false, mtype);
+ testResultMerge(false, false, true, false, mtype);
if( mtype != PResultMerge.LOCAL_FILE ) //FIXME
- testResultMerge(false, true, true, mtype);
- testResultMerge(true, false, true, mtype);
-
- //testResultMerge(true, true, false, mtype); invalid
+ testResultMerge(false, true, true, false, mtype);
+ testResultMerge(true, false, true, false, mtype);
+ //testResultMerge(true, true, false, false, mtype); invalid
+
+ /* FIXME sparse compare
+ testResultMerge(false, false, false, true, mtype);
+ testResultMerge(false, true, false, true, mtype);
+ testResultMerge(true, false, false, true, mtype);
+ testResultMerge(false, false, true, true, mtype);
+ testResultMerge(false, true, true, true, mtype);
+ testResultMerge(true, false, true, true, mtype);
+ */
}
- private void testResultMerge(boolean par, boolean accum, boolean
compare, PResultMerge mtype) {
+ private void testResultMerge(boolean par, boolean accum, boolean
compare, boolean sparseCompare, PResultMerge mtype) {
try{
-
loadTestConfiguration(getTestConfiguration(TEST_NAME));
//create input and output objects
@@ -85,16 +92,17 @@ public class ResultMergeTest extends AutomatedTestBase{
toMatrixObject(new MatrixBlock(1200,1100,1d),
output("C")) :
toMatrixObject(new MatrixBlock(1200,1100,true),
output("C"));
MatrixBlock rest = compare ?
- new MatrixBlock(400,1100,1d) : //constant
(also dense)
+ new MatrixBlock(400,1100,sparseCompare?0.2:1.0)
: //constant
new MatrixBlock(400,1100,true); //empty (also
sparse)
- MatrixObject[] Bobj = new MatrixObject[3];
+ MatrixObject[] Bobj = new MatrixObject[4];
Bobj[0] =
toMatrixObject(A.slice(0,399).rbind(rest).rbind(rest), output("B0"));
Bobj[1] =
toMatrixObject(rest.rbind(A.slice(400,799)).rbind(rest), output("B1"));
Bobj[2] =
toMatrixObject(rest.rbind(rest).rbind(A.slice(800,1199)), output("B2"));
-
+ Bobj[3] = toMatrixObject(rest.rbind(rest).rbind(rest),
output("B3"));
+
//create result merge
ExecutionContext ec =
ExecutionContextFactory.createContext();
- int numThreads = 3;
+ int numThreads = 4;
ResultMerge<?> rm =
ParForProgramBlock.createResultMerge(
mtype, Cobj, Bobj, output("R"), accum,
numThreads, ec);
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
index c89c30774c..de16b5b356 100644
---
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
+++
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
@@ -68,24 +68,34 @@ public class TaskPartitionerTest extends AutomatedTestBase{
testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.FACTORING_CMAX);
}
+ @Test
+ public void testUnknown() {
+ testTaskPartitioner(1, PTaskPartitioner.UNSPECIFIED);
+ }
+
private void testTaskPartitioner(int numTasks, PTaskPartitioner type) {
- LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
- TaskPartitioner partitioner =
TaskPartitionerFactory.createTaskPartitioner(
- type, new IntObject(1), new IntObject(numTasks), new
IntObject(1),
- numTasks, InfrastructureAnalyzer.getLocalParallelism(),
"i");
- //asynchronous task creation
-
CommonThreadPool.get().submit(()->partitioner.createTasks(queue));
- //consume tasks and check serialization
- Task t = null;
try {
+ LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
+ TaskPartitioner partitioner =
TaskPartitionerFactory.createTaskPartitioner(
+ type, new IntObject(1), new
IntObject(numTasks), new IntObject(1),
+ numTasks,
InfrastructureAnalyzer.getLocalParallelism(), "i");
+ //asynchronous task creation
+
CommonThreadPool.get().submit(()->partitioner.createTasks(queue));
+ if( type == PTaskPartitioner.STATIC ) {
+ Thread.sleep(10);
+ System.out.println(queue.toString());
+ }
+ //consume tasks and check serialization
+ Task t = null;
while((t =
queue.dequeueTask())!=LocalTaskQueue.NO_MORE_TASKS) {
Task ts1 =
Task.parseCompactString(t.toCompactString());
Task ts2 =
Task.parseCompactString(t.toCompactString(10));
Assert.assertEquals(t.toString(),
ts1.toString());
Assert.assertEquals(t.toString(),
ts2.toString());
}
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ if( type!=PTaskPartitioner.UNSPECIFIED )
+ throw new RuntimeException(e);
}
}
}