This is an automated email from the ASF dual-hosted git repository.

janniklinde pushed a commit to branch pr-2420
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit ca3b64d61b58663f650a1ad5841ce12a0b1de2da
Author: mori49 <[email protected]>
AuthorDate: Tue Apr 14 10:05:52 2026 +0200

    [SYSTEMDS-3543] Piece-wise linear compression of column groups
    
    Closes #2420.
---
 .../runtime/compress/CompressionSettings.java      |  26 +-
 .../sysds/runtime/compress/colgroup/AColGroup.java |   7 +-
 .../runtime/compress/colgroup/ColGroupFactory.java |  87 ++-
 .../ColGroupPiecewiseLinearCompressed.java         | 678 +++++++++++++++++++++
 .../colgroup/functional/PiecewiseLinearUtils.java  | 306 ++++++++++
 .../PiecewiseLinearCompressionPerformanceTest.java | 168 +++++
 ...oupPiecewiseLinearCompressedOperationsTest.java | 308 ++++++++++
 .../ColGroupPiecewiseLinearCompressedTest.java     | 455 ++++++++++++++
 8 files changed, 2016 insertions(+), 19 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index af944fce75..99c4b9c2ec 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -55,16 +55,16 @@ public class CompressionSettings {
        /**
         * The sampling ratio used when choosing ColGroups. Note that, default 
behavior is to use exact estimator if the
         * number of elements is below 1000.
-        * 
+        *
         * DEPRECATED
         */
        public final double samplingRatio;
 
        /**
         * The sampling ratio power to use when choosing sample size. This is 
used in accordance to the function:
-        * 
+        *
         * sampleSize += nRows^samplePower;
-        * 
+        *
         * The value is bounded to be in the range of 0 to 1, 1 giving a sample 
size of everything, and 0 adding 1.
         */
        public final double samplePower;
@@ -114,8 +114,9 @@ public class CompressionSettings {
        /**
         * Transpose input matrix, to optimize access when extracting bitmaps. 
This setting is changed inside the script
         * based on the transposeInput setting.
-        * 
-        * This is intentionally left as a mutable value, since the 
transposition of the input matrix is decided in phase 3.
+        *
+        * This is intentionally left as a mutable value, since the 
transposition of the input matrix is decided in phase
+        * 3.
         */
        public boolean transposed = false;
 
@@ -135,6 +136,19 @@ public class CompressionSettings {
 
        public final boolean preferDeltaEncoding;
 
+       // Handling Targetloss for piecewise linear Kompression
+
+       private double piecewiseTargetLoss = Double.NaN;
+
+       public void setPiecewiseTargetLoss(double piecewiseTargetLoss) {
+               this.piecewiseTargetLoss = piecewiseTargetLoss;
+
+       }
+
+       public double getPiecewiseTargetLoss() {
+               return piecewiseTargetLoss;
+       }
+
        protected CompressionSettings(double samplingRatio, double samplePower, 
boolean allowSharedDictionary,
                String transposeInput, int seed, boolean lossy, 
EnumSet<CompressionType> validCompressions,
                boolean sortValuesByLength, PartitionerType columnPartitioner, 
int maxColGroupCoCode, double coCodePercentage,
@@ -161,7 +175,7 @@ public class CompressionSettings {
                this.sdcSortType = sdcSortType;
                this.scaleFactors = scaleFactors;
                this.preferDeltaEncoding = preferDeltaEncoding;
-               
+
                if(!printedStatus && LOG.isDebugEnabled()) {
                        printedStatus = true;
                        LOG.debug(this.toString());
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index fbe04c732e..57bba0bef1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -65,7 +65,8 @@ public abstract class AColGroup implements Serializable {
 
        /** Public super types of compression ColGroups supported */
        public static enum CompressionType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, 
DeltaDDC, DDCLZW, LinearFunctional;
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, 
DeltaDDC, DDCLZW, LinearFunctional,
+               PiecewiseLinear, PiecewiseLinearSuccessive;
 
                public boolean isDense() {
                        return this == DDC || this == CONST || this == DDCFOR 
|| this == DDCFOR || this == DDCLZW;
@@ -86,8 +87,8 @@ public abstract class AColGroup implements Serializable {
         * Protected such that outside the ColGroup package it should be 
unknown which specific subtype is used.
         */
        protected static enum ColGroupType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DDCLZW, DeltaDDC,
-               LinearFunctional;
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC,
+               LinearFunctional, PiecewiseLinear;
        }
 
        /** The ColGroup indexes contained in the ColGroup */
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 7699d7b7c1..89bb040d44 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -43,6 +43,7 @@ import 
org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
 import org.apache.sysds.runtime.compress.colgroup.functional.LinearRegression;
+import 
org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils;
 import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
 import 
org.apache.sysds.runtime.compress.colgroup.insertionsort.AInsertionSorter;
@@ -106,7 +107,7 @@ public class ColGroupFactory {
 
        /**
         * The actual compression method, that handles the logic of compressing 
multiple columns together.
-        * 
+        *
         * @param in  The input matrix, that could have been transposed. If it 
is transposed the compSettings should specify
         *            this.
         * @param csi The compression information extracted from the 
estimation, this contains which groups of columns to
@@ -120,7 +121,7 @@ public class ColGroupFactory {
 
        /**
         * The actual compression method, that handles the logic of compressing 
multiple columns together.
-        * 
+        *
         * @param in  The input matrix, that could have been transposed. If it 
is transposed the compSettings should specify
         *            this.
         * @param csi The compression information extracted from the 
estimation, this contains which groups of columns to
@@ -135,7 +136,7 @@ public class ColGroupFactory {
        }
 
        /**
-        * 
+        *
         * @param in  The input matrix, that could have been transposed. If it 
is transposed the compSettings should specify
         *            this.
         * @param csi The compression information extracted from the 
estimation, this contains which groups of columns to
@@ -232,8 +233,9 @@ public class ColGroupFactory {
                                        time, retType, estC, actC, 
act.getNumValues(), cols, wanted, warning));
                }
                else {
-                       LOG.debug(String.format("time[ms]: %10.2f %25s est 
%10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s",
-                               time, retType, estC, actC, act.getNumValues(), 
cols, wanted));
+                       LOG.debug(
+                               String.format("time[ms]: %10.2f %25s est %10.0f 
-- act %10.0f distinct:%5d cols:%s wanted:%s", time,
+                                       retType, estC, actC, 
act.getNumValues(), cols, wanted));
                }
 
        }
@@ -306,6 +308,12 @@ public class ColGroupFactory {
                                return compressLinearFunctional(colIndexes, in, 
cs);
                        }
                }
+               else if(ct == CompressionType.PiecewiseLinear) {
+                       return compressPiecewiseLinearFunctional(colIndexes, 
in, cs);
+               }
+               else if(ct == CompressionType.PiecewiseLinearSuccessive) {
+                       return 
compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs);
+               }
                else if(ct == CompressionType.DDCFOR) {
                        AColGroup g = directCompressDDC(colIndexes, cg);
                        if(g instanceof ColGroupDDC)
@@ -710,7 +718,7 @@ public class ColGroupFactory {
                if(cs.scaleFactors != null) {
                        throw new NotImplementedException("Delta encoding with 
quantization not yet implemented");
                }
-               
+
                if(colIndexes.size() > 1) {
                        return directCompressDeltaDDCMultiCol(colIndexes, cg);
                }
@@ -742,7 +750,7 @@ public class ColGroupFactory {
 
                if(map.size() == 0)
                        return new ColGroupEmpty(colIndexes);
-               
+
                final double[] dictValues = map.getDictionary();
                IDictionary dict = new DeltaDictionary(dictValues, 1);
 
@@ -751,7 +759,8 @@ public class ColGroupFactory {
                return ColGroupDeltaDDC.create(colIndexes, dict, resData, null);
        }
 
-       private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, 
CompressedSizeInfoColGroup cg) throws Exception {
+       private AColGroup directCompressDeltaDDCMultiCol(IColIndex colIndexes, 
CompressedSizeInfoColGroup cg)
+               throws Exception {
                final AMapToData d = MapToFactory.create(nRow, 
Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126));
                final int fill = d.getUpperBoundValue();
                d.fill(fill);
@@ -830,8 +839,8 @@ public class ColGroupFactory {
                int fill) {
 
                ReaderColumnSelection reader = (cs.scaleFactors == null) ? 
ReaderColumnSelection.createReader(in, colIndexes,
-                       cs.transposed, rl,
-                       ru) : ReaderColumnSelection.createQuantizedReader(in, 
colIndexes, cs.transposed, rl, ru, cs.scaleFactors);
+                       cs.transposed, rl, ru) : 
ReaderColumnSelection.createQuantizedReader(in, colIndexes, cs.transposed, rl, 
ru,
+                       cs.scaleFactors);
 
                DblArray cellVals = reader.nextRow();
                boolean extra = false;
@@ -1078,6 +1087,64 @@ public class ColGroupFactory {
                return ColGroupLinearFunctional.create(colIndexes, 
coefficients, numRows);
        }
 
+       /**
+        * This method is the entry point to compress a matrix with piecewise 
linear compression The first method uses a
+        * segmented least squares with dynamic programming to compress the 
columns The second method uses a successive
+        * compression method, which compares each values in linear time and 
checks if the targetloss exceeded
+        *
+        * @param colIndexes the column indices to compress
+        * @param in         the input Matrixblock containing the data
+        * @param cs         compression settings to define the target loss, 
which should be considered
+        * @return a piecewise linear compressed column group
+        */
+
+       public static AColGroup compressPiecewiseLinearFunctional(IColIndex 
colIndexes, MatrixBlock in,
+               CompressionSettings cs) {
+
+               final int numRows = in.getNumRows();
+               final int numCols = colIndexes.size();
+               int[][] breakpointsPerCol = new int[numCols][];
+               double[][] slopesPerCol = new double[numCols][];
+               double[][] interceptsPerCol = new double[numCols][];
+
+               for(int col = 0; col < numCols; col++) {
+                       final int colIdx = colIndexes.get(col);
+                       double[] column = PiecewiseLinearUtils.getColumn(in, 
colIdx);
+                       PiecewiseLinearUtils.SegmentedRegression fit = 
PiecewiseLinearUtils.compressSegmentedLeastSquares(column,
+                               cs);
+                       breakpointsPerCol[col] = fit.getBreakpoints();
+                       interceptsPerCol[col] = fit.getIntercepts();
+                       slopesPerCol[col] = fit.getSlopes();
+
+               }
+               return ColGroupPiecewiseLinearCompressed.create(colIndexes, 
breakpointsPerCol, slopesPerCol, interceptsPerCol,
+                       numRows);
+
+       }
+
+       public static AColGroup 
compressPiecewiseLinearFunctionalSuccessive(IColIndex colIndexes, MatrixBlock 
in,
+               CompressionSettings cs) {
+               final int numRows = in.getNumRows();
+               final int numCols = colIndexes.size();
+               int[][] breakpointsPerCol = new int[numCols][];
+               double[][] slopesPerCol = new double[numCols][];
+               double[][] interceptsPerCol = new double[numCols][];
+
+               for(int col = 0; col < numCols; col++) {
+                       final int colIdx = colIndexes.get(col);
+                       double[] column = PiecewiseLinearUtils.getColumn(in, 
colIdx);
+                       PiecewiseLinearUtils.SegmentedRegression fit = 
PiecewiseLinearUtils.compressSuccessivePiecewiseLinear(
+                               column, cs);
+                       breakpointsPerCol[col] = fit.getBreakpoints();
+                       interceptsPerCol[col] = fit.getIntercepts();
+                       slopesPerCol[col] = fit.getSlopes();
+
+               }
+               return ColGroupPiecewiseLinearCompressed.create(colIndexes, 
breakpointsPerCol, slopesPerCol, interceptsPerCol,
+                       numRows);
+
+       }
+
        private AColGroup compressSDCFromSparseTransposedBlock(IColIndex cols, 
int nrUniqueEstimate, double tupleSparsity) {
                if(cols.size() > 1)
                        return 
compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, 
tupleSparsity);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java
new file mode 100644
index 0000000000..f05a5d46e7
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupPiecewiseLinearCompressed.java
@@ -0,0 +1,678 @@
+package org.apache.sysds.runtime.compress.colgroup;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.functionobjects.*;
+import org.apache.sysds.runtime.instructions.cp.CmCovObject;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.CMOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.utils.MemoryEstimates;
+
+import java.util.Arrays;
+
+/**
+ * This class represents a new ColGroup which is compresses column into 
segments (piecewise linear) to represent the
+ * original Data each column is approximate by a set of linear segments 
defined by breakpoints, slopes and intercepts
+ */
+
+public class ColGroupPiecewiseLinearCompressed extends AColGroupCompressed {
+       /**
+        * breakpoints indices per column to define the segment boundaries 
slopes of the regression line per segment per
+        * column intercepts of the regression line per segment per column
+        */
+       int[][] breakpointsPerCol;
+       double[][] slopesPerCol;
+       double[][] interceptsPerCol;
+       int numRows;
+
+       protected ColGroupPiecewiseLinearCompressed(IColIndex colIndices) {
+               super(colIndices);
+       }
+
+       public ColGroupPiecewiseLinearCompressed(IColIndex colIndices, int[][] 
breakpoints, double[][] slopes,
+               double[][] intercepts, int numRows) {
+               super(colIndices);
+               this.breakpointsPerCol = breakpoints;
+               this.slopesPerCol = slopes.clone();
+               this.interceptsPerCol = intercepts.clone();
+               this.numRows = numRows;
+       }
+
+       /**
+        * creates a new piecewise linear compress column group validates 
inputs and copies all arrays before storing
+        *
+        * @param colIndices        the column indices this group represents
+        * @param breakpointsPerCol breakpoint indices per column
+        * @param slopesPerCol      slope of each segment per column
+        * @param interceptsPerCol  intercept of each segment per column
+        * @param numRows           number of rows in the original matrix
+        * @return a new ColGroupPiecewiseLinearCompressed instance
+        * @throws IllegalArgumentException if breakpoints are invalid or 
arrays are inconsistent
+        */
+
+       public static AColGroup create(IColIndex colIndices, int[][] 
breakpointsPerCol, double[][] slopesPerCol,
+               double[][] interceptsPerCol, int numRows) {
+               final int numCols = colIndices.size();
+               if(breakpointsPerCol.length != numCols)
+                       throw new IllegalArgumentException(
+                               "bp.length=" + breakpointsPerCol.length + " != 
colIndices.size()=" + numCols);
+
+               for(int c = 0; c < numCols; c++) {
+                       if(breakpointsPerCol[c].length < 1 || 
breakpointsPerCol[c][0] != 0 ||
+                               
breakpointsPerCol[c][breakpointsPerCol[c].length - 1] != numRows)
+                               throw new IllegalArgumentException(
+                                       "Invalid breakpoints for col " + c + ": 
must start=0, end=numRows, >=1 pts");
+
+                       if(slopesPerCol[c].length != interceptsPerCol[c].length 
||
+                               slopesPerCol[c].length != 
breakpointsPerCol[c].length - 1)
+                               throw new 
IllegalArgumentException("Inconsistent array lengths col " + c);
+               }
+
+               int[][] bpCopy = new int[numCols][];
+               double[][] slopeCopy = new double[numCols][];
+               double[][] interceptCopy = new double[numCols][];
+               // defensive copy to prevent external modification
+               for(int c = 0; c < numCols; c++) {
+                       bpCopy[c] = Arrays.copyOf(breakpointsPerCol[c], 
breakpointsPerCol[c].length);
+                       slopeCopy[c] = Arrays.copyOf(slopesPerCol[c], 
slopesPerCol[c].length);
+                       interceptCopy[c] = Arrays.copyOf(interceptsPerCol[c], 
interceptsPerCol[c].length);
+               }
+
+               return new ColGroupPiecewiseLinearCompressed(colIndices, 
bpCopy, slopeCopy, interceptCopy, numRows);
+
+       }
+
+       /**
+        * Decompresses a ColGroupPiecewiseLinearCompress into a DenseBlock 
Each value is reconstructed via slopes[seg]*row
+        * + intercept[seg]
+        *
+        * @param db   Target DenseBlock
+        * @param rl   Row to start decompression from
+        * @param ru   Row to end decompression at (not inclusive)
+        * @param offR Row offset into the target to decompress
+        * @param offC Column offset into the target to decompress
+        */
+       @Override
+       public void decompressToDenseBlock(DenseBlock db, int rl, int ru, int 
offR, int offC) {
+               if(db == null || _colIndexes == null || _colIndexes.size() == 0 
|| breakpointsPerCol == null ||
+                       slopesPerCol == null || interceptsPerCol == null) {
+                       return;
+               }
+               for(int col = 0; col < _colIndexes.size(); col++) {
+                       final int colIndex = _colIndexes.get(col);
+                       int[] breakpoints = breakpointsPerCol[col];
+                       double[] slopes = slopesPerCol[col];
+                       double[] intercepts = interceptsPerCol[col];
+                       // per segment in this column
+                       for(int seg = 0; seg + 1 < breakpoints.length; seg++) {
+                               int segStart = breakpoints[seg];
+                               int segEnd = breakpoints[seg + 1];
+                               if(segStart >= segEnd)
+                                       continue;
+
+                               double currentSlopeInSegment = slopes[seg];
+                               double currentInterceptInSegment = 
intercepts[seg];
+                               // intersect segment with requested row range 
[rl, ru)
+
+                               int rowStart = Math.max(segStart, rl);
+                               int rowEnd = Math.min(segEnd, ru);
+                               if(rowStart >= rowEnd)
+                                       continue;
+
+                               //Fill DenseBlock für this column and Segment
+                               for(int row = rowStart; row < rowEnd; row++) {
+                                       double yhat = currentSlopeInSegment * 
row + currentInterceptInSegment;
+                                       int dbRow = offR + row;
+                                       int dbCol = offC + colIndex;
+
+                                       if(dbRow >= 0 && dbRow < db.numRows() 
&& dbCol >= 0 && dbCol < db.numCols()) {
+                                               db.set(dbRow, dbCol, yhat);
+                                       }
+                               }
+
+                       }
+
+               }
+       }
+
+       public int[][] getBreakpointsPerCol() {
+               return breakpointsPerCol;
+       }
+
+       public double[][] getSlopesPerCol() {
+               return slopesPerCol;
+       }
+
+       public double[][] getInterceptsPerCol() {
+               return interceptsPerCol;
+       }
+
+       /**
+        * Return a decompressed value at row r and column colIdx uses binary 
search to find the correct segment
+        *
+        * @param r      row
+        * @param colIdx column index in the _colIndexes.
+        * @return reconstructed value with slope[segment]*r+intercepts[segment]
+        */
+       @Override
+       public double getIdx(int r, int colIdx) {
+               //safety check
+               if(r < 0 || r >= numRows || colIdx < 0 || colIdx >= 
_colIndexes.size()) {
+                       return 0.0;
+               }
+               int[] breakpoints = breakpointsPerCol[colIdx];
+               double[] slopes = slopesPerCol[colIdx];
+               double[] intercepts = interceptsPerCol[colIdx];
+               // binary search for the segment containing row r
+               int lowerBound = 0;
+               int higherBound = breakpoints.length - 2;
+               while(lowerBound <= higherBound) {
+                       int mid = (lowerBound + higherBound) / 2;
+                       if(r < breakpoints[mid + 1]) {
+                               higherBound = mid - 1;
+                       }
+                       else
+                               lowerBound = mid + 1;
+               }
+               int segment = Math.min(lowerBound, breakpoints.length - 2);
+               return slopes[segment] * (double) r + intercepts[segment];
+       }
+
+       /**
+        * Returns a total number of stored values remaining all columns 
counting breakpoints, slopes and intercepts per
+        * column
+        *
+        * @return total number of stored compression values
+        */
+       @Override
+       public int getNumValues() {
+               int total = 0;
+               for(int c = 0; c < _colIndexes.size(); c++) {
+                       total += breakpointsPerCol[c].length + 
slopesPerCol[c].length + interceptsPerCol[c].length;
+               }
+               return total;
+       }
+
+       /**
+        * Returns the exact size on disk in bytes includes per column arrays 
for breakpoints, slopes, intercepts
+        *
+        * @return size in bytes
+        */
+       @Override
+       public long getExactSizeOnDisk() {
+               long ret = super.getExactSizeOnDisk();
+               int numCols = _colIndexes.size();
+               ret += 8L * numCols * 3; //array reference pointers
+               ret += 24L * 3; // outer array headers
+               ret += 4L; //numRows field
+
+               for(int c = 0; c < numCols; c++) {
+                       ret += (long) 
MemoryEstimates.intArrayCost(breakpointsPerCol[c].length);
+                       ret += (long) 
MemoryEstimates.doubleArrayCost(slopesPerCol[c].length);
+                       ret += (long) 
MemoryEstimates.doubleArrayCost(interceptsPerCol[c].length);
+               }
+
+               return ret;
+
+       }
+
+       /**
+        * Computes the column sums of the decompressed matrix using sum of 
arithmetic series Where sumX = len * (2*start +
+        * len - 1) / 2
+        *
+        * @param c     output array to accumulate column sums into
+        * @param nRows number of rows, which is used because it is covered by 
the breakpoints
+        */
+       @Override
+       public void computeSum(double[] c, int nRows) {
+               for(int col = 0; col < _colIndexes.size(); col++) {
+                       double sum = 0.0;
+                       int[] breakpoints = breakpointsPerCol[col];
+                       double[] intercepts = interceptsPerCol[col];
+                       double[] slopes = slopesPerCol[col];
+
+                       for(int seg = 0; seg < slopes.length; seg++) {
+                               int start = breakpoints[seg];
+                               int end = breakpoints[seg + 1];
+                               int len = end - start;
+                               if(len <= 0)
+                                       continue;
+
+                               double sumX = (double) len * (2.0 * start + 
(len - 1)) / 2.0;
+                               sum += slopes[seg] * sumX + intercepts[seg] * 
len;
+                       }
+                       c[col] += sum;
+               }
+       }
+
+       /**
+        * Computes column sums by delegating to computeSum Methods are 
identical because every ColGroup just knows its own
+        * column
+        *
+        * @param c     The array to add the column sum to.
+        * @param nRows The number of rows in the column group.
+        */
+
+       @Override
+       public void computeColSums(double[] c, int nRows) {
+               computeSum(c, nRows);
+       }
+
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.PiecewiseLinear;
+       }
+
+       @Override
+       protected ColGroupType getColGroupType() {
+               return ColGroupType.PiecewiseLinear;
+       }
+
+       /**
+        * Applies a scalar operation to all segments of this column group For 
plus/minus operation are only the intercepts
+        * modified For Multiply/Divide slopes and intercepts are scaled
+        *
+        * @param op operation to perform
+        * @return a new ColGroupPiecewiseLinearCompressed with updated 
coefficients
+        * @throws NotImplementedException if the operator is not plus, minus, 
multiply or divide
+        */
+       @Override
+       public AColGroup scalarOperation(ScalarOperator op) {
+               final int numCols = _colIndexes.size();
+
+               if(!(op.fn instanceof Plus || op.fn instanceof Minus || op.fn 
instanceof Multiply || op.fn instanceof Divide)) {
+                       throw new NotImplementedException("Unsupported scalar 
op: " + op.fn.getClass().getSimpleName());
+               }
+
+               double[][] newIntercepts = new double[numCols][];
+               double[][] newSlopes = new double[numCols][];
+
+               for(int col = 0; col < numCols; col++) {
+                       final int numSegments = interceptsPerCol[col].length;
+                       newIntercepts[col] = new double[numSegments];
+                       newSlopes[col] = new double[numSegments];
+
+                       for(int seg = 0; seg < numSegments; seg++) {
+                               if(op.fn instanceof Plus || op.fn instanceof 
Minus) {
+                                       // only intercepts changes
+                                       newSlopes[col][seg] = 
slopesPerCol[col][seg];
+                                       newIntercepts[col][seg] = 
op.executeScalar(interceptsPerCol[col][seg]);
+                               }
+                               else {  // Multiply/Divide
+                                       newSlopes[col][seg] = 
op.executeScalar(slopesPerCol[col][seg]);
+                                       newIntercepts[col][seg] = 
op.executeScalar(interceptsPerCol[col][seg]);
+                               }
+                       }
+               }
+
+               return new ColGroupPiecewiseLinearCompressed(_colIndexes, 
breakpointsPerCol, newSlopes, newIntercepts, numRows);
+       }
+
+       /**
+        * Applies a row vector operation from the left For plus/minus are the 
intercepts shifted For multiply/divide slopes
+        * and intercepts are scaled
+        *
+        * @param op        The operation to execute
+        * @param v         The vector of values to apply the values contained 
should be at least the length of the highest
+        *                  value in the column index
+        * @param isRowSafe True if the binary op is applied to an entire zero 
row and all results are zero
+        * @return a new ColGroupPiecewiseLinearCompressed with updated 
coefficients
+        */
+
+       @Override
+       public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean 
isRowSafe) {
+               final int numCols = _colIndexes.size();
+               double[][] newIntercepts = new double[numCols][];
+               double[][] newSlopes = new double[numCols][];
+               final boolean isAddSub = op.fn instanceof Plus || op.fn 
instanceof Minus;
+
+               if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof 
Divide))
+                       throw new NotImplementedException("Unsupported binary 
op: " + op.fn.getClass().getSimpleName());
+
+               for(int col = 0; col < numCols; col++) {
+                       double rowValue = v[_colIndexes.get(col)];
+                       int numSegs = interceptsPerCol[col].length;
+                       newIntercepts[col] = new double[numSegs];
+
+                       // Plus/Minus: slope is translation-invariant, only 
intercept shifts
+                       newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : 
new double[numSegs];
+
+                       for(int seg = 0; seg < numSegs; seg++) {
+                               newIntercepts[col][seg] = 
op.fn.execute(rowValue, interceptsPerCol[col][seg]);
+                               if(!isAddSub)
+                                       newSlopes[col][seg] = 
op.fn.execute(rowValue, slopesPerCol[col][seg]);
+                       }
+               }
+               return new ColGroupPiecewiseLinearCompressed(_colIndexes, 
breakpointsPerCol, newSlopes, newIntercepts, numRows);
+       }
+
+       /**
+        * Applies a row vector operation from the right For plus/minus are the 
intercepts shifted For multiply/divide
+        * slopes and intercepts are scaled
+        *
+        * @param op        The operation to execute
+        * @param v         The vector of values to apply the values contained 
should be at least the length of the highest
+        *                  value in the column index
+        * @param isRowSafe True if the binary op is applied to an entire zero 
row and all results are zero
+        * @return a new ColGroupPiecewiseLinearCompressed with updated 
coefficients
+        */
+       @Override
+       public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, 
boolean isRowSafe) {
+               final int numCols = _colIndexes.size();
+               final boolean isAddSub = op.fn instanceof Plus || op.fn 
instanceof Minus;
+
+               if(!isAddSub && !(op.fn instanceof Multiply || op.fn instanceof 
Divide))
+                       throw new NotImplementedException("Unsupported scalar 
op: " + op.fn.getClass().getSimpleName());
+
+               double[][] newSlopes = new double[numCols][];
+               double[][] newIntercepts = new double[numCols][];
+
+               for(int col = 0; col < numCols; col++) {
+                       double val = v[_colIndexes.get(col)];
+                       int numSegs = interceptsPerCol[col].length;
+                       // Plus/Minus shifts intercept only, slopes are 
unchanged
+                       newSlopes[col] = isAddSub ? slopesPerCol[col].clone() : 
new double[numSegs];
+                       newIntercepts[col] = new double[numSegs];
+
+                       for(int seg = 0; seg < numSegs; seg++) {
+                               newIntercepts[col][seg] = 
op.fn.execute(interceptsPerCol[col][seg], val);
+                               if(!isAddSub)
+                                       newSlopes[col][seg] = 
op.fn.execute(slopesPerCol[col][seg], val);
+                       }
+               }
+               return new ColGroupPiecewiseLinearCompressed(_colIndexes, 
breakpointsPerCol, newSlopes, newIntercepts, numRows);
+       }
+
+       /**
+        * Returns true if any decompressed value in this column group equals 
the given pattern
+        *
+        * @param pattern The value to look for.
+        * @return true if pattern is found, else false
+        */
+       @Override
+       public boolean containsValue(double pattern) {
+               for(int col = 0; col < _colIndexes.size(); col++) {
+                       if(colContainsValue(col, pattern))
+                               return true;
+               }
+               return false;
+       }
+
+       /**
+        * checks if any reconstructed value in column col equals the pattern 
for each segment, solves the m * x + b =
+        * pattern instead of scanning all rows
+        *
+        * @param col     column index
+        * @param pattern the value to search for
+        * @return true if the pattern is found
+        */
+
+       private boolean colContainsValue(int col, double pattern) {
+               int[] breakpoints = breakpointsPerCol[col];
+               double[] intercepts = interceptsPerCol[col];
+               double[] slopes = slopesPerCol[col];
+               for(int seg = 0; seg < breakpoints.length - 1; seg++) {
+                       int start = breakpoints[seg];
+                       int len = breakpoints[seg + 1] - start;
+                       if(len <= 0)
+                               continue;
+
+                       double b = intercepts[seg];
+                       double m = slopes[seg];
+
+                       if(m == 0.0) {
+                               // constant segment: all values equal b
+                               if(Double.compare(b, pattern) == 0)
+                                       return true;
+                               continue;
+                       }
+
+                       // check if pattern lies on the line: solve m*x + b = 
pattern for x
+                       double x = (pattern - b) / m;
+                       int xi = (int) x;
+                       if(xi >= start && xi < start + len && Double.compare(m 
* xi + b, pattern) == 0)
+                               return true;
+               }
+               return false;
+       }
+
+       @Override
+       public AColGroup unaryOperation(UnaryOperator op) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup replace(double pattern, double replace) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected double computeMxx(double c, Builtin builtin) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected void computeColMxx(double[] c, Builtin builtin) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected void computeSumSq(double[] c, int nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeColSumsSq(double[] c, int nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeRowSums(double[] c, int rl, int ru, double[] 
preAgg) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeRowMxx(double[] c, Builtin builtin, int rl, int 
ru, double[] preAgg) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeProduct(double[] c, int nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeRowProduct(double[] c, int rl, int ru, double[] 
preAgg) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected void computeColProduct(double[] c, int nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected double[] preAggSumRows() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected double[] preAggSumSqRows() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected double[] preAggProductRows() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected double[] preAggBuiltinRows(Builtin builtin) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public boolean sameIndexStructure(AColGroupCompressed that) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected void tsmm(double[] result, int numColumns, int nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public AColGroup copyAndSet(IColIndex colIndexes) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public void decompressToDenseBlockTransposed(DenseBlock db, int rl, int 
ru) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public void decompressToSparseBlockTransposed(SparseBlockMCSR sb, int 
nColOut) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public void decompressToSparseBlock(SparseBlock sb, int rl, int ru, int 
offR, int offC) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public AColGroup rightMultByMatrix(MatrixBlock right, IColIndex 
allCols, int k) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock 
result, int rl, int ru, int cl, int cu) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int 
nRows) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       public void tsmmAColGroup(AColGroup other, MatrixBlock result) {
+               throw new NotImplementedException();
+
+       }
+
+       @Override
+       protected AColGroup sliceSingleColumn(int idx) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex 
outputCols) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup sliceRows(int rl, int ru) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public long getNumberNonZeros(int nRows) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public CmCovObject centralMoment(CMOperator op, int nRows) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int 
nRows) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public double getCost(ComputationCostEstimator e, int nRows) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup append(AColGroup g) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected AColGroup appendNInternal(AColGroup[] groups, int blen, int 
rlen) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public ICLAScheme getCompressionScheme() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup recompress() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected AColGroup fixColIndexes(IColIndex newColIndex, int[] 
reordering) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup reduceCols() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public double getSparsity() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected void sparseSelection(MatrixBlock selection, ColGroupUtils.P[] 
points, MatrixBlock ret, int rl, int ru) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected void denseSelection(MatrixBlock selection, ColGroupUtils.P[] 
points, MatrixBlock ret, int rl, int ru) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) {
+               throw new NotImplementedException();
+       }
+
+}
+
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java
new file mode 100644
index 0000000000..7b0b4bfa96
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/functional/PiecewiseLinearUtils.java
@@ -0,0 +1,306 @@
+package org.apache.sysds.runtime.compress.colgroup.functional;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class PiecewiseLinearUtils {
+       /**
+        * Utility methods for piecewise linear compression of matric columns
+        * supports compression used the segmented least squares algorithm 
which is implemented with dynamic programming
+        * and a successive method, which puts all values in a segment till the 
target loss is exceeded
+        */
+
+       private PiecewiseLinearUtils() {
+
+       }
+
+       public static final class SegmentedRegression {
+               private final int[] breakpoints;
+               private final double[] slopes;
+               private final double[] intercepts;
+
+               public SegmentedRegression(int[] breakpoints, double[] slopes, 
double[] intercepts) {
+                       this.breakpoints = breakpoints;
+                       this.slopes = slopes;
+                       this.intercepts = intercepts;
+               }
+
+               public int[] getBreakpoints() {
+                       return breakpoints;
+               }
+
+               public double[] getSlopes() {
+                       return slopes;
+               }
+
+               public double[] getIntercepts() {
+                       return intercepts;
+               }
+       }
+
+       public static double[] getColumn(MatrixBlock in, int colIndex) {
+               final int numRows = in.getNumRows();
+               final double[] column = new double[numRows];
+
+               for(int row = 0; row < numRows; row++) {
+                       column[row] = in.get(row, colIndex);
+               }
+               return column;
+       }
+
+       public static SegmentedRegression 
compressSegmentedLeastSquares(double[] column, CompressionSettings cs) {
+               //compute Breakpoints for a Column with dynamic Programming
+               final List<Integer> breakpointsList = computeBreakpoints(cs, 
column);
+               final int[] breakpoints = 
breakpointsList.stream().mapToInt(Integer::intValue).toArray();
+
+               //get values for Regression
+               final int numSeg = breakpoints.length - 1;
+               final double[] slopes = new double[numSeg];
+               final double[] intercepts = new double[numSeg];
+
+               // Regress per Segment
+               for(int seg = 0; seg < numSeg; seg++) {
+                       final int SegStart = breakpoints[seg];
+                       final int SegEnd = breakpoints[seg + 1];
+
+                       final double[] line = regressSegment(column, SegStart, 
SegEnd);
+                       slopes[seg] = line[0]; //slope regession line
+                       intercepts[seg] = line[1]; //intercept regression line
+               }
+
+               return new SegmentedRegression(breakpoints, slopes, intercepts);
+       }
+
+       public static SegmentedRegression 
compressSuccessivePiecewiseLinear(double[] column, CompressionSettings cs) {
+               //compute Breakpoints for a Column with a sukzessive 
breakpoints algorithm
+
+               final List<Integer> breakpointsList = 
computeBreakpointSuccessive(column, cs);
+               final int[] breakpoints = 
breakpointsList.stream().mapToInt(Integer::intValue).toArray();
+
+               //get values for Regression
+               final int numSeg = breakpoints.length - 1;
+               final double[] slopes = new double[numSeg];
+               final double[] intercepts = new double[numSeg];
+
+               // Regress per Segment
+               for(int seg = 0; seg < numSeg; seg++) {
+                       final int segstart = breakpoints[seg];
+                       final int segEnd = breakpoints[seg + 1];
+                       final double[] line = regressSegment(column, segstart, 
segEnd);
+                       slopes[seg] = line[0];
+                       intercepts[seg] = line[1];
+               }
+               return new SegmentedRegression(breakpoints, slopes, intercepts);
+       }
+
+       /**
+        * Computes breakpoints for a column using segmented least squares with 
dynamic programming
+        * Iteratively reduces lambda to increase the number of segments until 
the target MSE is met.
+        *
+        * @param cs     compression settings containing the target loss
+        * @param column the column values to segment
+        * @return list of breakpoint indices, starting with 0
+        */
+       public static List<Integer> computeBreakpoints(CompressionSettings cs, 
double[] column) {
+               final int numElements = column.length;
+               final double targetMSE = cs.getPiecewiseTargetLoss();
+               final double sseMax = numElements * targetMSE; // max allowed 
total SSE
+
+               //start with high lambda an reduce iteratively
+               double lambda = Math.max(10.0, sseMax * 2.0);
+               List<Integer> bestBreaks = Arrays.asList(0, numElements);
+               double bestSSE = computeTotalSSE(column, bestBreaks);
+
+               for (int iter = 0; iter < 50; iter++) {
+                       List<Integer> breaks = computeBreakpointsLambda(column, 
lambda);
+                       double totalSSE = computeTotalSSE(column, breaks);
+                       int numSegs = breaks.size() - 1;
+
+                       if (totalSSE < bestSSE) {
+                               bestSSE = totalSSE;
+                               bestBreaks = new ArrayList<>(breaks);
+                       }
+                       //target loss reached
+                       if (bestSSE <= sseMax) {
+                               return bestBreaks;
+                       }
+
+                       // only one segment left, break condition
+                       if (numSegs <= 1) {
+                               break;
+                       }
+                       // reducing lambda to allow more segments in next 
iteration
+                       lambda *= 0.8;
+               }
+
+               return bestBreaks;
+       }
+
+       /**
+        * Computes optimal breakpoints, each segment has a SEE plus a
+
+        */
+
+       public static List<Integer> computeBreakpointsLambda(double[] column, 
double lambda) {
+               final int n = column.length;
+               final double[] costs = new double[n + 1];  // min cost to reach 
i
+               final int[] prev = new int[n + 1];
+
+               Arrays.fill(costs, Double.POSITIVE_INFINITY);
+               costs[0] = 0.0;
+               // precompute all segment costs to avoid recomputation in 
dynamic programming
+               double[][] segCosts = new double[n+1][n+1];
+               for(int i = 0; i < n; i++) {
+                       for(int j = i+1; j <= n; j++) {
+                               segCosts[i][j] = computeSegmentCost(column, i, 
j);
+                       }
+               }
+               // for each point j, find the cheapest previous breakpoint i
+               for(int j = 1; j <= n; j++) {
+                       for(int i = 0; i < j; i++) {
+                               // cost equals the SSE of segment [i,j] plus 
penalty plus best costs
+                               double cost = costs[i] + segCosts[i][j] + 
lambda;
+                               if(cost < costs[j]) {
+                                       costs[j] = cost;
+                                       prev[j] = i;
+                               }
+                       }
+               }
+
+               // Backtrack to previous points to recover the breakpoints
+               List<Integer> breaks = new ArrayList<>();
+               int j = n;
+               while(j > 0) {
+                       breaks.add(j);
+                       j = prev[j];
+               }
+               breaks.add(0);
+               Collections.reverse(breaks);
+               return breaks;
+       }
+
+       /**
+        * computes the segment cost
+        * @param column column values
+        * @param start start index
+        * @param end end index
+        * @return SSE of the regression line over the segment
+        */
+       public static double computeSegmentCost(double[] column, int start, int 
end) {
+               final int segSize = end - start;
+               if(segSize <= 1)
+                       return 0.0;
+
+               final double[] ab = regressSegment(column, start, end);
+               final double slope = ab[0];
+               final double intercept = ab[1];
+
+               double sse = 0.0;
+               for(int i = start; i < end; i++) {
+                       double err = column[i] - (slope * i + intercept);
+                       sse += err * err;
+               }
+               return sse;
+       }
+
+       /**
+        * computes the total SSE over all segments defined by the given 
breakpoints
+        * @param column
+        * @param breaks
+        * @return sum of the total SSE
+        */
+       public static double computeTotalSSE(double[] column, List<Integer> 
breaks) {
+               double total = 0.0;
+               for(int s = 0; s < breaks.size() - 1; s++) {
+                       final int start = breaks.get(s);
+                       final int end = breaks.get(s + 1);
+                       total += computeSegmentCost(column, start, end);
+               }
+               return total;
+       }
+
+       public static double[] regressSegment(double[] column, int start, int 
end) {
+               final int numElements = end - start;
+               if(numElements <= 0)
+                       return new double[] {0.0, 0.0};
+
+               double sumOfRowIndices = 0, sumOfColumnValues = 0, 
sumOfRowIndicesSquared = 0, productRowIndexTimesColumnValue = 0;
+               for(int i = start; i < end; i++) {
+                       sumOfRowIndices += i;
+                       sumOfColumnValues += column[i];
+                       sumOfRowIndicesSquared += i * i;
+                       productRowIndexTimesColumnValue += i * column[i];
+               }
+
+
+               final double denominatorForSlope =
+                       numElements * sumOfRowIndicesSquared - sumOfRowIndices 
* sumOfRowIndices;
+               final double slope;
+               final double intercept;
+               if(denominatorForSlope == 0) {
+                       slope = 0.0;
+                       intercept = sumOfColumnValues / numElements;
+               }
+               else {
+                       slope = (numElements * productRowIndexTimesColumnValue 
- sumOfRowIndices * sumOfColumnValues) /
+                               denominatorForSlope;
+                       intercept = (sumOfColumnValues - slope * 
sumOfRowIndices) / numElements;
+               }
+               return new double[] {slope, intercept};
+       }
+
+       /**
+        * computes breakpoints for a column using a successive algorithm
+        * extends each segment until the SEE reaches the target loss, then 
start a new segment
+        * @param column column values
+        * @param cs compression setting for setting the target loss
+        * @return list of breakpoint indices
+        */
+       public static List<Integer> computeBreakpointSuccessive(double[] 
column, CompressionSettings cs) {
+               final int numElements = column.length;
+               final double targetMSE = cs.getPiecewiseTargetLoss();
+               if (Double.isNaN(targetMSE) || targetMSE <= 0) {
+                       return Arrays.asList(0, numElements);  // fallback 
single segment
+               }
+
+               List<Integer> breakpoints = new ArrayList<>();
+               breakpoints.add(0);
+               int currentStart = 0;
+
+               while (currentStart < numElements) {
+                       int bestEnd = -1; // no end found
+
+                       for (int end = currentStart + 1; end <= numElements; 
end++) {
+                               double sse = computeSegmentCost(column, 
currentStart, end);
+                               if(sse > (end - currentStart) * targetMSE) {
+                                       // end-1 is last valid end; if end == 
segStart+1 force min segment of length 1
+                                       bestEnd = (end == currentStart + 1) ? 
end : end - 1;
+                                       break;
+                               }
+                       }
+
+                       if (bestEnd == -1) {
+                               bestEnd = numElements;// all remaining points 
fitting within budget
+                       }
+
+                       // safety guard not allow zero segments
+                       if (bestEnd <= currentStart) {
+                               bestEnd = Math.min(currentStart + 1, 
numElements);
+                       }
+
+                       breakpoints.add(bestEnd);
+                       currentStart = bestEnd;
+               }
+
+               // make sure, that the last breakpoint equals numElements
+               int last = breakpoints.get(breakpoints.size() - 1);
+               if (last != numElements) {
+                       breakpoints.add(numElements);
+               }
+
+               return breakpoints;
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java
 
b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java
new file mode 100644
index 0000000000..6046bdfb20
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/performance/PiecewiseLinearCompressionPerformanceTest.java
@@ -0,0 +1,168 @@
+package org.apache.sysds.performance;
+
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import 
org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.utils.stats.Timing;
+import java.util.Random;
+
+/**
+ * Performance benchmark for piecewise linear compression.
+ * Successive is benchmarked across large matrices to show scalability.
+ * DP is only used as a quality reference on small matrices due to quadratic 
complexity
+
+ */
+public class PiecewiseLinearCompressionPerformanceTest {
+
+       //different target losses : loose, avg, strict
+       private static final double[] LOSSES = {1e-1, 1e-2, 1e-4};
+       // how often compressed
+       private static final int      REPS   = 3;
+
+       /**
+        * generate of a time series matrix to have a realistic test set up
+        * @param nr number of rows
+        * @param nc number of columns
+        * @return matrix with random generated data
+        */
+       private static MatrixBlock generateTestMatrix(int nr, int nc) {
+               MatrixBlock mb = new MatrixBlock(nr, nc, true);
+               mb.allocateDenseBlock();
+               Random rng = new Random(42);
+               for(int c = 0; c < nc; c++) {
+                       double trend      = 0.001 * c;
+                       double level      = rng.nextDouble() * 5.0;
+                       double volatility = 0.1 + 0.01 * c;
+                       double residual   = 0.0;
+
+                       for(int row = 0; row < nr; row++) {
+                               // random level shift every 75-150 rows
+                               if(row % (75 + (int)(75 * rng.nextDouble())) == 
0) {
+                                       level += (rng.nextDouble() - 0.5) * 2.0;
+                                       trend += (rng.nextDouble() - 0.5) * 
0.0005;
+                               }
+                               // noise: residual = 0.7 * prev + random
+                               residual = 0.7 * residual + rng.nextGaussian() 
* volatility;
+                               mb.set(row, c, Math.max(0, trend * row + level 
+ residual));
+                       }
+               }
+               return mb;
+       }
+       ///  returns a average number of segments per column
+       private static double avgSegments(AColGroup cg) {
+               int[][] breakpoints = ((ColGroupPiecewiseLinearCompressed) 
cg).getBreakpointsPerCol();
+               int total = 0;
+               for(int[] bp : breakpoints) total += bp.length - 1;
+               return total / (double) breakpoints.length;
+       }
+
+       /**
+        * computes MSE between the compression, the original data and 
decompression
+        * @param orig original matrix
+        * @param cg piecewise linear compressed column group
+        * @return MSE
+        */
+       private static double reconstructionMSE(MatrixBlock orig, AColGroup cg) 
{
+               int nr = orig.getNumRows(), nc = orig.getNumColumns();
+               MatrixBlock recon = new MatrixBlock(nr, nc, false);
+               recon.allocateDenseBlock();
+               cg.decompressToDenseBlock(recon.getDenseBlock(), 0, nr, 0, 0);
+               double sse = 0;
+               for(int r = 0; r < nr; r++)
+                       for(int c = 0; c < nc; c++) {
+                               double diff = orig.get(r, c) - recon.get(r, c);
+                               sse += diff * diff;
+                       }
+               return sse / (nr * nc);
+       }
+
+       /**
+        * benchmarks successive compression for a given matrix and target loss
+        * reports segments, compressed data size, runtime and reconstruction
+        * @param mb original matrix to compress
+        * @param loss target loss param
+        */
+       private static void benchmarkSuccessive(MatrixBlock mb, double loss) {
+               long origSize = mb.getInMemorySize();
+               int numRows = mb.getNumRows(), numCol = mb.getNumColumns();
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(loss);
+               IColIndex colIndexes = ColIndexFactory.create(numCol);
+
+               
ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs);
+
+               Timing t = new Timing();
+               AColGroup cg = null;
+               t.start();
+               for(int i = 0; i < REPS; i++)
+                       cg = 
ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, mb, cs);
+               double time = t.stop() / REPS;
+
+               long size = cg.getExactSizeOnDisk();
+               String saving = size < origSize
+                       ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / 
origSize)
+                       : String.format("larger +%.0f%%", 100.0 * size / 
origSize - 100);
+
+               System.out.printf("  successive  loss=%.0e  %5.1f segs  %6.2f 
MB (%s)  %6.1f ms  MSE=%.2e%n",
+                       loss, avgSegments(cg), size / 1e6, saving, time, 
reconstructionMSE(mb, cg));
+       }
+
+       /**
+        * benchmarks dynamic programming compression for a given matrix and 
target loss
+        * no repetition, because DP is too slow due complexity
+        * reports segments, compressed data size, runtime and reconstruction
+        * @param mb original matrix to compress
+        * @param loss target loss param
+        */
+       private static void benchmarkDP(MatrixBlock mb, double loss) {
+               long origSize = mb.getInMemorySize();
+               int numColumns = mb.getNumColumns();
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(loss);
+               IColIndex colIndexes = ColIndexFactory.create(numColumns);
+
+               Timing t = new Timing();
+               t.start();
+               AColGroup cg = 
ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, mb, cs);
+               double time = t.stop();
+
+               long size = cg.getExactSizeOnDisk();
+               String saving = size < origSize
+                       ? String.format("saved %3.0f%%", 100.0 - 100.0 * size / 
origSize)
+                       : String.format("LARGER +%.0f%%", 100.0 * size / 
origSize - 100);
+
+               System.out.printf("  DP          loss=%.0e  %5.1f segs  %6.2f 
MB (%s)  %6.1f ms  MSE=%.2e%n",
+                       loss, avgSegments(cg), size / 1e6, saving, time, 
reconstructionMSE(mb, cg));
+       }
+
+       public static void main(String[] args) {
+               System.out.println("=== Piecewise Linear Compression Benchmark 
===\n");
+
+               // Successive scalability across large matrices
+               System.out.println("=== Successive: scalability ===");
+               int[][] configs = {{1000, 10}, {1000, 100}, {1000, 500},
+                       {5000, 10}, {5000, 100}, {5000, 500},
+                       {10000, 10}, {10000, 100}, {10000, 500}};
+
+               for(int[] cfg : configs) {
+                       int nr = cfg[0], nc = cfg[1];
+                       MatrixBlock mb = generateTestMatrix(nr, nc);
+                       System.out.printf("%nnrows=%d  ncols=%d  original=%.2f 
MB%n",
+                               nr, nc, mb.getInMemorySize() / 1e6);
+                       for(double loss : LOSSES)
+                               benchmarkSuccessive(mb, loss);
+               }
+
+               // DP quality reference on small matrix
+               System.out.println("\n=== DP: quality reference (nrows=1000, 
ncols=10) ===");
+               MatrixBlock mbSmall = generateTestMatrix(1000, 10);
+               System.out.printf("original=%.2f MB%n", 
mbSmall.getInMemorySize() / 1e6);
+               for(double loss : LOSSES)
+                       benchmarkDP(mbSmall, loss);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java
new file mode 100644
index 0000000000..53ae3a1277
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedOperationsTest.java
@@ -0,0 +1,308 @@
+package org.apache.sysds.test.component.compress.colgroup;
+
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import 
org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.functionobjects.Divide;
+import org.apache.sysds.runtime.functionobjects.Minus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for ColGroupPiecewiseLinearCompressed operations containing: 
scalarOperation, binaryRowOps, computeSum,
+ * containsValue, getIdx, getExactSizeOnDisk.
+ */
+public class ColGroupPiecewiseLinearCompressedOperationsTest extends 
AutomatedTestBase {
+
+       private static final long SEED = 42L;
+       private static final int NROWS = 50;
+       private static final int NCOLS = 3;
+       private static final double TARGET_LOSS = 1e-8;
+       private static final double DELTA = 1e-9;
+
+       private ColGroupPiecewiseLinearCompressed piecewiseLinearColGroup;
+       private MatrixBlock orignalMB;
+       private MatrixBlock decompressedMB;
+       private IColIndex colIndexes;
+       private int numRows;
+       private int numCols;
+
+       @Before
+       public void setUp() {
+               numRows = NROWS;
+               numCols = NCOLS;
+
+               ///  generate random matrix
+               double[][] data = getRandomMatrix(numRows, numCols, -3, 3, 1.0, 
SEED);
+               orignalMB = DataConverter.convertToMatrixBlock(data);
+               orignalMB.allocateDenseBlock();
+
+               colIndexes = ColIndexFactory.create(buildColArray(numCols));
+
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(TARGET_LOSS);
+
+               ///  create ColGroupPiecewiseLinearCompressed instance
+               AColGroup result = 
ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orignalMB, cs);
+               assertTrue(result instanceof ColGroupPiecewiseLinearCompressed);
+               piecewiseLinearColGroup = (ColGroupPiecewiseLinearCompressed) 
result;
+
+               /// decompress again
+               decompressedMB = decompress(piecewiseLinearColGroup);
+       }
+
+       private MatrixBlock decompress(AColGroup cg) {
+               MatrixBlock mb = new MatrixBlock(numRows, numCols, false);
+               mb.allocateDenseBlock();
+               cg.decompressToDenseBlock(mb.getDenseBlock(), 0, numRows, 0, 0);
+               return mb;
+       }
+
+       /// check elementwise to compare results from compressed and 
decompressed matrixblock
+       private void checkMatrixEquals(String msg, MatrixBlock mb1, MatrixBlock 
mb2) {
+               if(mb1.getNumRows() != mb2.getNumRows() || mb1.getNumColumns() 
!= mb2.getNumColumns())
+                       fail(msg + " dimension mismatch");
+               for(int r = 0; r < numRows; r++)
+                       for(int c = 0; c < numCols; c++)
+                               assertEquals(msg + "[" + r + "," + c + "]", 
mb1.get(r, c), mb2.get(r, c), DELTA);
+       }
+
+       /// compute column sum to validate
+       private double[] computeSums(MatrixBlock mb) {
+               double[] sums = new double[numCols];
+               for(int c = 0; c < numCols; c++)
+                       for(int r = 0; r < numRows; r++)
+                               sums[c] += mb.get(r, c);
+               return sums;
+       }
+
+       /// create row vector
+       private double[] buildRowVector() {
+               double[] v = new double[numCols];
+               for(int i = 0; i < numCols; i++)
+                       v[i] = 0.5 * (i + 1);
+               return v;
+       }
+
+       private int[] buildColArray(int n) {
+               int[] cols = new int[n];
+               for(int i = 0; i < n; i++)
+                       cols[i] = i;
+               return cols;
+       }
+
+       private MatrixBlock applyBinaryRowOpLeft(MatrixBlock mb, BinaryOperator 
op, double[] v) {
+               MatrixBlock result = new MatrixBlock(numRows, numCols, false);
+               result.allocateDenseBlock();
+               for(int r = 0; r < numRows; r++)
+                       for(int c = 0; c < numCols; c++)
+                               result.getDenseBlock().set(r, c, 
op.fn.execute(v[c], mb.get(r, c)));
+               return result;
+       }
+
+       private MatrixBlock applyBinaryRowOpRight(MatrixBlock mb, 
BinaryOperator op, double[] v) {
+               MatrixBlock result = new MatrixBlock(numRows, numCols, false);
+               result.allocateDenseBlock();
+               for(int r = 0; r < numRows; r++)
+                       for(int c = 0; c < numCols; c++)
+                               result.getDenseBlock().set(r, c, 
op.fn.execute(mb.get(r, c), v[c]));
+               return result;
+       }
+
+       @Test
+       public void testComputeSum() {
+               double[] sumsComp = new double[numCols];
+               piecewiseLinearColGroup.computeSum(sumsComp, numRows);
+               assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA);
+       }
+
+       @Test
+       public void testComputeColSums() {
+               double[] sumsComp = new double[numCols];
+               piecewiseLinearColGroup.computeColSums(sumsComp, numRows);
+               assertArrayEquals(sumsComp, computeSums(decompressedMB), DELTA);
+       }
+
+       private void testScalarOp(ScalarOperator op, double scalar) {
+               MatrixBlock expected = new MatrixBlock(numRows, numCols, false);
+               expected.allocateDenseBlock();
+               for(int r = 0; r < numRows; r++)
+                       for(int c = 0; c < numCols; c++)
+                               expected.getDenseBlock().set(r, c, 
op.fn.execute(decompressedMB.get(r, c), scalar));
+
+               checkMatrixEquals("scalarOp " + 
op.fn.getClass().getSimpleName(), expected,
+                       
decompress(piecewiseLinearColGroup.scalarOperation(op)));
+       }
+
+       @Test
+       public void testScalarPlus() {
+               testScalarOp(new RightScalarOperator(Plus.getPlusFnObject(), 
3.7), 3.7);
+       }
+
+       @Test
+       public void testScalarMinus() {
+               testScalarOp(new RightScalarOperator(Minus.getMinusFnObject(), 
1.5), 1.5);
+       }
+
+       @Test
+       public void testScalarMultiply() {
+               testScalarOp(new 
RightScalarOperator(Multiply.getMultiplyFnObject(), 2.0), 2.0);
+       }
+
+       @Test
+       public void testScalarDivide() {
+               testScalarOp(new 
RightScalarOperator(Divide.getDivideFnObject(), 4.0), 4.0);
+       }
+
+       @Test
+       public void testBinaryRowOpLeftPlus() {
+               BinaryOperator op = new BinaryOperator(Plus.getPlusFnObject());
+               double[] v = buildRowVector();
+               checkMatrixEquals("binaryRowOpLeft Plus", 
applyBinaryRowOpLeft(decompressedMB, op, v),
+                       decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, 
v, false)));
+       }
+
+       @Test
+       public void testBinaryRowOpLeftMultiply() {
+               BinaryOperator op = new 
BinaryOperator(Multiply.getMultiplyFnObject());
+               double[] v = buildRowVector();
+               checkMatrixEquals("binaryRowOpLeft Multiply", 
applyBinaryRowOpLeft(decompressedMB, op, v),
+                       decompress(piecewiseLinearColGroup.binaryRowOpLeft(op, 
v, false)));
+       }
+
+       @Test
+       public void testBinaryRowOpRightMinus() {
+               BinaryOperator op = new 
BinaryOperator(Minus.getMinusFnObject());
+               double[] v = buildRowVector();
+               checkMatrixEquals("binaryRowOpRight Minus", 
applyBinaryRowOpRight(decompressedMB, op, v),
+                       decompress(piecewiseLinearColGroup.binaryRowOpRight(op, 
v, false)));
+       }
+
+       @Test
+       public void testBinaryRowOpRightDivide() {
+               BinaryOperator op = new 
BinaryOperator(Divide.getDivideFnObject());
+               double[] v = buildRowVector();
+               checkMatrixEquals("binaryRowOpRight Divide", 
applyBinaryRowOpRight(decompressedMB, op, v),
+                       decompress(piecewiseLinearColGroup.binaryRowOpRight(op, 
v, false)));
+       }
+
+       @Test
+       public void testContainsValueIntercept() {
+               double pattern = 
piecewiseLinearColGroup.getInterceptsPerCol()[0][0];
+               assertTrue("intercept of col 0 seg 0 should exist", 
piecewiseLinearColGroup.containsValue(pattern));
+       }
+
+       @Test
+       public void testContainsValueEndpoint() {
+               int[] breakpoints = 
piecewiseLinearColGroup.getBreakpointsPerCol()[0];
+               double[] intercepts = 
piecewiseLinearColGroup.getInterceptsPerCol()[0];
+               double[] slopes = piecewiseLinearColGroup.getSlopesPerCol()[0];
+               if(breakpoints.length > 1) {
+                       double pattern = intercepts[0] + slopes[0] * 
(breakpoints[1] - breakpoints[0] - 1);
+                       assertTrue("endpoint of col 0 seg 0 should exist", 
piecewiseLinearColGroup.containsValue(pattern));
+               }
+       }
+
+       @Test
+       public void testContainsValueConstantSegment() {
+               ColGroupPiecewiseLinearCompressed cg = 
(ColGroupPiecewiseLinearCompressed) ColGroupPiecewiseLinearCompressed.create(
+                       ColIndexFactory.create(new int[] {0}), new int[][] {{0, 
numRows}}, new double[][] {{0.0}},
+                       new double[][] {{1.23}}, numRows);
+
+               assertTrue("constant value 1.23 should exist", 
cg.containsValue(1.23));
+               assertFalse("value 2.0 should not exist", 
cg.containsValue(2.0));
+       }
+
+       @Test
+       public void testContainsValueOutsideRange() {
+               assertFalse("value -10 outside data range", 
piecewiseLinearColGroup.containsValue(-10.0));
+               assertFalse("value +10 outside data range", 
piecewiseLinearColGroup.containsValue(10.0));
+       }
+
+       @Test
+       public void testGetIdxMatchesDecompress() {
+               for(int c = 0; c < numCols; c++)
+                       for(int r = 0; r < numRows; r++)
+                               assertEquals("getIdx(" + r + "," + c + ")", 
decompressedMB.get(r, c),
+                                       piecewiseLinearColGroup.getIdx(r, c), 
1e-10);
+       }
+
+       @Test
+       public void testGetIdxInvalidBounds() {
+               assertEquals("row < 0", 0.0, piecewiseLinearColGroup.getIdx(-1, 
0), DELTA);
+               assertEquals("row >= numRows", 0.0, 
piecewiseLinearColGroup.getIdx(numRows, 0), DELTA);
+               assertEquals("col < 0", 0.0, piecewiseLinearColGroup.getIdx(0, 
-1), DELTA);
+               assertEquals("col >= ncols", 0.0, 
piecewiseLinearColGroup.getIdx(0, numCols), DELTA);
+       }
+
+       @Test
+       public void testGetNumValues() {
+               int expected = 0;
+               for(int c = 0; c < numCols; c++) {
+                       int breakpointsLen = 
piecewiseLinearColGroup.getBreakpointsPerCol()[c].length;
+                       int slopesLen = 
piecewiseLinearColGroup.getSlopesPerCol()[c].length;
+                       int interceptsLen = 
piecewiseLinearColGroup.getInterceptsPerCol()[c].length;
+                       assertEquals("breakpoints != slopes+1 for col " + c, 
breakpointsLen, slopesLen + 1);
+                       assertEquals("slopes != intercepts for col " + c, 
slopesLen, interceptsLen);
+                       expected += breakpointsLen + slopesLen + interceptsLen;
+               }
+               assertEquals("getNumValues() mismatch", expected, 
piecewiseLinearColGroup.getNumValues());
+       }
+
+       @Test
+       public void testGetExactSizeOnDisk() {
+               Random rng = new Random(SEED);
+               int rows = 80 + rng.nextInt(40);
+               int numSegs = 1 + rng.nextInt(3);
+
+               int[] breakpoints = new int[numSegs + 1];
+               breakpoints[0] = 0;
+               breakpoints[numSegs] = rows;
+               for(int s = 1; s < numSegs; s++)
+                       breakpoints[s] = rng.nextInt(rows * 2 / 3) + rows / 10;
+
+               double[] slopes = new double[numSegs];
+               double[] intercepts = new double[numSegs];
+               for(int s = 0; s < numSegs; s++) {
+                       slopes[s] = rng.nextDouble() * 4 - 2;
+                       intercepts[s] = rng.nextDouble() * 4 - 2;
+               }
+               ///  PLC Piecewise Linear Compressed
+               AColGroup colGroupPLC = 
ColGroupPiecewiseLinearCompressed.create(
+                       ColIndexFactory.create(new int[] {rng.nextInt(20)}), 
new int[][] {breakpoints}, new double[][] {slopes},
+                       new double[][] {intercepts}, rows);
+
+               assertTrue("disk size should be positive", 
colGroupPLC.getExactSizeOnDisk() > 0);
+               assertTrue("num values should be positive", 
colGroupPLC.getNumValues() > 0);
+       }
+
+       @Override
+       public double[][] getRandomMatrix(int rows, int cols, double min, 
double max, double sparsity, long seed) {
+               Random rng = new Random(seed);
+               double[][] data = new double[rows][cols];
+               for(int r = 0; r < rows; r++)
+                       for(int c = 0; c < cols; c++)
+                               data[r][c] = min + rng.nextDouble() * (max - 
min);
+               return data;
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java
new file mode 100644
index 0000000000..e05745bc97
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupPiecewiseLinearCompressedTest.java
@@ -0,0 +1,455 @@
+package org.apache.sysds.test.component.compress.colgroup;
+
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import 
org.apache.sysds.runtime.compress.colgroup.ColGroupPiecewiseLinearCompressed;
+import 
org.apache.sysds.runtime.compress.colgroup.functional.PiecewiseLinearUtils;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests of ColGroupPiecewiseLinearCompression Covers Validation, 
Compression and decompression
+ */
+public class ColGroupPiecewiseLinearCompressedTest extends AutomatedTestBase {
+
+       private static final long SEED = 42L;
+
+       @Override
+       public void setUp() {
+
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void testCreateNullBreakpoints() {
+               IColIndex cols = ColIndexFactory.create(new int[] {0});
+               int[][] nullBp = {null};
+               ColGroupPiecewiseLinearCompressed.create(cols, nullBp, new 
double[][] {{1.0}}, new double[][] {{0.0}}, 10);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testCreateTooFewBreakpoints() {
+               int[][] singleBp = {new int[] {0}};
+               
ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), 
singleBp,
+                       new double[][] {new double[] {1.0}}, new double[][] 
{new double[] {0.0}}, 10);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testCreateInconsistentSlopes() {
+               int[] bp = {0, 5, 10};
+               
ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), 
new int[][] {bp},
+                       new double[][] {new double[] {1.0, 2.0, 3.0}}, new 
double[][] {new double[] {0.0, 1.0}}, 10);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testCreateInconsistentIntercepts() {
+               int[] bp = {0, 5, 10};
+               
ColGroupPiecewiseLinearCompressed.create(ColIndexFactory.create(new int[] {0}), 
new int[][] {bp},
+                       new double[][] {new double[] {1.0, 2.0}}, new 
double[][] {new double[] {0.0}}, 10);
+       }
+
+       @Test
+       public void testCompressAndDecompressDP() {
+
+               // create random matrix
+               final int nrows = 50, ncols = 3;
+               double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, 
SEED);
+               MatrixBlock in = DataConverter.convertToMatrixBlock(data);
+               in.allocateDenseBlock();
+
+               IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 
2});
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(1e-8);
+
+               AColGroup result = 
ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, in, cs);
+               assertTrue(result instanceof ColGroupPiecewiseLinearCompressed);
+               ColGroupPiecewiseLinearCompressed plGroup = 
(ColGroupPiecewiseLinearCompressed) result;
+
+               // check the structure
+               int[][] breakpoints = plGroup.getBreakpointsPerCol();
+               double[][] slopes = plGroup.getSlopesPerCol();
+               double[][] intercepts = plGroup.getInterceptsPerCol();
+
+               assertEquals("wrong number of columns in breakpoints", ncols, 
breakpoints.length);
+               for(int c = 0; c < ncols; c++) {
+                       assertTrue("breakpoints[" + c + "] needs at least 2 
entries", breakpoints[c].length >= 2);
+                       assertEquals("breakpoints[" + c + "] must start at 0", 
0, breakpoints[c][0]);
+                       assertEquals("breakpoints[" + c + "] must end at 
nrows", nrows, breakpoints[c][breakpoints[c].length - 1]);
+                       int numSegs = breakpoints[c].length - 1;
+                       assertEquals("slopes[" + c + "] length mismatch", 
numSegs, slopes[c].length);
+                       assertEquals("intercepts[" + c + "] length mismatch", 
numSegs, intercepts[c].length);
+               }
+
+               // decompress and check reconstruction of column group
+               MatrixBlock recon = new MatrixBlock(nrows, ncols, false);
+               recon.allocateDenseBlock();
+               plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 
0, 0);
+               DenseBlock db = recon.getDenseBlock();
+
+               for(int r = 0; r < nrows; r++) {
+                       for(int c = 0; c < ncols; c++) {
+                               double val = db.get(r, c);
+                               assertFalse("NaN at [" + r + "," + c + "]", 
Double.isNaN(val));
+                               assertFalse("Infinite at [" + r + "," + c + 
"]", Double.isInfinite(val));
+                               assertEquals("reconstruction error too large at 
[" + r + "," + c + "]", data[r][c], val, 1e-6);
+                       }
+               }
+       }
+
+       @Test
+       public void testCompressAndDecompressSuccessive() {
+
+               //create random matrix
+               final int nrows = 50, ncols = 3;
+               double[][] data = getRandomMatrix(nrows, ncols, -3, 3, 1.0, 
SEED);
+               MatrixBlock in = DataConverter.convertToMatrixBlock(data);
+               in.allocateDenseBlock();
+
+               IColIndex colIndexes = ColIndexFactory.create(new int[] {0, 1, 
2});
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(1e-8);
+
+               // create ColGroupPiecewiseLinearCompressed with successive 
compression
+               AColGroup result = 
ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, in, cs);
+               assertTrue(result instanceof ColGroupPiecewiseLinearCompressed);
+               ColGroupPiecewiseLinearCompressed plGroup = 
(ColGroupPiecewiseLinearCompressed) result;
+
+               // structure checks
+               int[][] bp = plGroup.getBreakpointsPerCol();
+               double[][] slopes = plGroup.getSlopesPerCol();
+               double[][] intercepts = plGroup.getInterceptsPerCol();
+
+               assertEquals("wrong number of columns in bp", ncols, bp.length);
+               for(int c = 0; c < ncols; c++) {
+                       assertTrue("bp[" + c + "] needs at least 2 entries", 
bp[c].length >= 2);
+                       assertEquals("bp[" + c + "] must start at 0", 0, 
bp[c][0]);
+                       assertEquals("bp[" + c + "] must end at nrows", nrows, 
bp[c][bp[c].length - 1]);
+                       int numSegs = bp[c].length - 1;
+                       assertEquals("slopes[" + c + "] length mismatch", 
numSegs, slopes[c].length);
+                       assertEquals("intercepts[" + c + "] length mismatch", 
numSegs, intercepts[c].length);
+               }
+
+               // validate decompression
+               MatrixBlock recon = new MatrixBlock(nrows, ncols, false);
+               recon.allocateDenseBlock();
+               plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 
0, 0);
+               DenseBlock db = recon.getDenseBlock();
+
+               for(int r = 0; r < nrows; r++) {
+                       for(int c = 0; c < ncols; c++) {
+                               double val = db.get(r, c);
+                               assertFalse("NaN at [" + r + "," + c + "]", 
Double.isNaN(val));
+                               assertFalse("Infinite at [" + r + "," + c + 
"]", Double.isInfinite(val));
+                               assertEquals("reconstruction error too large at 
[" + r + "," + c + "]", data[r][c], val, 1e-6);
+                       }
+               }
+       }
+
+       /// Wrapper-Classes: Test setup for DP and successive compression
+
+       private void testRoundtripDP(double[][] data, int nrows, int ncols, 
double targetLoss, double tolerance,
+               int maxFailures) {
+               testRoundtrip(data, nrows, ncols, targetLoss, tolerance, 
maxFailures, false);
+       }
+
+       private void testRoundtripSuccessive(double[][] data, int nrows, int 
ncols, double targetLoss, double tolerance,
+               int maxFailures) {
+               testRoundtrip(data, nrows, ncols, targetLoss, tolerance, 
maxFailures, true);
+       }
+
+       /**
+        * Set test setup: converting data in matrix block, set compression 
setting does compression, decompression,
+        * validation
+        */
+       private void testRoundtrip(double[][] data, int nrows, int ncols, 
double targetLoss, double tolerance,
+               int maxFailures, boolean successive) {
+
+               ///create a matrix
+               MatrixBlock orig = DataConverter.convertToMatrixBlock(data);
+               orig.allocateDenseBlock();
+
+               IColIndex colIndexes = 
ColIndexFactory.create(buildColArray(ncols));
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(targetLoss);
+
+               /// choose compression
+               AColGroup result = successive ? 
ColGroupFactory.compressPiecewiseLinearFunctionalSuccessive(colIndexes, orig,
+                       cs) : 
ColGroupFactory.compressPiecewiseLinearFunctional(colIndexes, orig, cs);
+
+               assertTrue(result instanceof ColGroupPiecewiseLinearCompressed);
+               ColGroupPiecewiseLinearCompressed plGroup = 
(ColGroupPiecewiseLinearCompressed) result;
+
+               /// structure checks
+               checkStructure(plGroup, nrows, ncols);
+
+               /// decompression check
+               MatrixBlock recon = new MatrixBlock(nrows, ncols, false);
+               recon.allocateDenseBlock();
+               plGroup.decompressToDenseBlock(recon.getDenseBlock(), 0, nrows, 
0, 0);
+               DenseBlock db = recon.getDenseBlock();
+
+               int failures = 0;
+               for(int r = 0; r < nrows; r++) {
+                       for(int c = 0; c < ncols; c++) {
+                               double val = db.get(r, c);
+                               assertFalse("NaN at [" + r + "," + c + "]", 
Double.isNaN(val));
+                               assertFalse("Infinite at [" + r + "," + c + 
"]", Double.isInfinite(val));
+                               if(Math.abs(data[r][c] - val) > tolerance)
+                                       failures++;
+                       }
+               }
+               assertTrue("too many reconstruction failures: " + failures, 
failures <= maxFailures);
+       }
+
+       private void checkStructure(ColGroupPiecewiseLinearCompressed plGroup, 
int nrows, int ncols) {
+               int[][] breakpoints = plGroup.getBreakpointsPerCol();
+               double[][] slopes = plGroup.getSlopesPerCol();
+               double[][] intercepts = plGroup.getInterceptsPerCol();
+
+               assertEquals("wrong number of columns in breakpoints", ncols, 
breakpoints.length);
+               assertEquals("wrong number of col indices", ncols, 
plGroup.getColIndices().size());
+
+               for(int c = 0; c < ncols; c++) {
+                       assertTrue("breakpoints[" + c + "] needs at least 2 
entries", breakpoints[c].length >= 2);
+                       assertEquals("breakpoints[" + c + "] must start at 0", 
0, breakpoints[c][0]);
+                       assertEquals("breakpoints[" + c + "] must end at 
nrows", nrows, breakpoints[c][breakpoints[c].length - 1]);
+                       int numSegs = breakpoints[c].length - 1;
+                       assertEquals("slopes[" + c + "] length mismatch", 
numSegs, slopes[c].length);
+                       assertEquals("intercepts[" + c + "] length mismatch", 
numSegs, intercepts[c].length);
+               }
+       }
+
+       private double[][] buildMultiSegmentData(int nrows, int ncols) {
+               Random rng = new Random(SEED);
+               double[][] data = new double[nrows][ncols];
+               int[] segStarts = {0, 15, 30, 45, 60};
+               double[] slopes = {0.5, -1.2, 2.0, -0.8};
+
+               for(int c = 0; c < ncols; c++) {
+                       double offset = c;
+                       for(int r = 0; r < nrows; r++) {
+                               int seg = 0;
+                               while(seg < segStarts.length - 1 && r >= 
segStarts[seg + 1])
+                                       seg++;
+                               data[r][c] = slopes[seg] * (r - segStarts[seg]) 
+ offset + rng.nextGaussian() * 0.8;
+                               offset += 0.01;
+                       }
+               }
+               return data;
+       }
+
+       private int[] buildColArray(int ncols) {
+               int[] cols = new int[ncols];
+               for(int i = 0; i < ncols; i++)
+                       cols[i] = i;
+               return cols;
+       }
+
+       @Test
+       public void testTrendWithNoise() {
+               final int nrows = 100, ncols = 2;
+               Random rng = new Random(SEED);
+               double[][] data = new double[nrows][ncols];
+               for(int r = 0; r < nrows; r++) {
+                       double trend = 0.05 * r;
+                       for(int c = 0; c < ncols; c++)
+                               data[r][c] = trend + rng.nextGaussian() * 1.5 + 
c * 2.0;
+               }
+               testRoundtripDP(data, nrows, ncols, 1.0, 4.0, 45);
+               testRoundtripSuccessive(data, nrows, ncols, 1.0, 4.0, 45);
+       }
+
+       @Test
+       public void testAbruptJumps() {
+               final int nrows = 80, ncols = 3;
+               double[][] data = getRandomMatrix(nrows, ncols, -2, 2, 1.0, 
SEED);
+               for(int c = 0; c < ncols; c++) {
+                       for(int r = 25; r < 55; r++)
+                               data[r][c] += 8.0;
+                       for(int r = 55; r < nrows; r++)
+                               data[r][c] += 15.0;
+               }
+               // successive needs looser tolerance on jumps
+               testRoundtripDP(data, nrows, ncols, 5.0, 10.0, 50);
+               testRoundtripSuccessive(data, nrows, ncols, 25.0, 18.0, 55);
+       }
+
+       @Test
+       public void testHighFrequency() {
+               final int nrows = 100, ncols = 50;
+               Random rng = new Random(SEED);
+               double[][] data = new double[nrows][ncols];
+               for(int r = 0; r < nrows; r++) {
+                       double sine = Math.sin(r * 0.4) * 4.0;
+                       for(int c = 0; c < ncols; c++)
+                               data[r][c] = sine + rng.nextGaussian() * 0.8 + 
Math.sin(r * 0.2 + c) * 2.0;
+               }
+               // both struggle with high frequency; successive slightly worse
+               testRoundtripDP(data, nrows, ncols, 2.0, 2.0, 3500);
+               testRoundtripSuccessive(data, nrows, ncols, 2.0, 2.5, 2500);
+       }
+
+       @Test
+       public void testLowVarianceSingleColumn() {
+               double[][] data = getRandomMatrix(50, 1, -1, 1, 0.3, SEED);
+               testRoundtripDP(data, 50, 1, 0.1, 0.5, 5);
+               testRoundtripSuccessive(data, 50, 1, 0.05, 0.4, 3);
+       }
+
+       @Test
+       public void testSingleColumn() {
+               double[][] data = getRandomMatrix(50, 1, -1, 1, 1.0, SEED);
+               testRoundtripDP(data, 50, 1, 0.5, 1.0, 8);
+               testRoundtripSuccessive(data, 50, 1, 0.5, 1.0, 8);
+       }
+
+       @Test
+       public void testKnownSegmentBoundaries() {
+               final int nrows = 60, ncols = 2;
+               double[][] data = buildMultiSegmentData(nrows, ncols);
+               // successive needs slightly higher targetLoss for same data
+               testRoundtripDP(data, nrows, ncols, 0.8, 5.0, 35);
+               testRoundtripSuccessive(data, nrows, ncols, 1.0, 5.0, 35);
+       }
+
+       @Test
+       public void testMultipleColumns() {
+               double[][] data = getRandomMatrix(80, 5, -5, 5, 1.5, SEED);
+               testRoundtripDP(data, 80, 5, 3.0, 4.0, 120);
+               testRoundtripSuccessive(data, 80, 5, 3.0, 4.0, 120);
+       }
+
+       private CompressedSizeInfo createTestCompressedSizeInfo() {
+               IColIndex cols = ColIndexFactory.create(new int[] {0});
+               EstimationFactors facts = new EstimationFactors(2, 10);
+
+               CompressedSizeInfoColGroup info = new 
CompressedSizeInfoColGroup(cols, facts,
+                       AColGroup.CompressionType.PiecewiseLinear);
+
+               List<CompressedSizeInfoColGroup> infos = Arrays.asList(info);
+               CompressedSizeInfo csi = new CompressedSizeInfo(infos);
+
+               return csi;
+       }
+
+       @Test
+       public void testCompressPiecewiseLinearViaRealAPI() {
+
+               MatrixBlock in = new MatrixBlock(10, 1, false);
+               in.allocateDenseBlock();
+               for(int r = 0; r < 10; r++) {
+                       in.set(r, 0, r * 0.5);
+               }
+
+               CompressionSettings cs = new 
CompressionSettingsBuilder().addValidCompression(
+                       AColGroup.CompressionType.PiecewiseLinear).create();
+
+               CompressedSizeInfo csi = createTestCompressedSizeInfo();
+
+               List<AColGroup> colGroups = 
ColGroupFactory.compressColGroups(in, csi, cs);
+
+               boolean hasPiecewise = colGroups.stream().anyMatch(cg -> cg 
instanceof ColGroupPiecewiseLinearCompressed);
+               assertTrue(hasPiecewise);
+       }
+
+       @Test
+       public void testSuccessiveLinearColumnSingleSegment() {
+               double[] col = {1.0, 2.0, 3.0, 4.0, 5.0};
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(1e-6);
+
+               List<Integer> breaks = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs);
+               assertEquals("[0, 5]", breaks.toString());
+       }
+
+       @Test
+       public void testSuccessiveNoisyColumnMultipleSegments() {
+               double[] col = {1.1, 1.9, 2.2, 10.1, 10.8, 11.3};
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(1.0);
+
+               List<Integer> breaks = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs);
+               assertTrue("expected at least 3 breakpoints", breaks.size() >= 
3);
+       }
+
+       @Test
+       public void testSuccessiveStrictLossProducesMoreSegments() {
+               double[] col = {1, 2, 3, 10, 11, 12, 20, 21, 22};
+
+               CompressionSettings strict = new 
CompressionSettingsBuilder().create();
+               strict.setPiecewiseTargetLoss(0.01);
+
+               CompressionSettings loose = new 
CompressionSettingsBuilder().create();
+               loose.setPiecewiseTargetLoss(10.0);
+
+               List<Integer> strictBreaks = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, strict);
+               List<Integer> looseBreaks = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, loose);
+
+               assertTrue("strict loss should produce more segments", 
strictBreaks.size() > looseBreaks.size());
+       }
+
+       @Test
+       public void testSuccessiveBreakpointDetectedAtJump() {
+               double[] col = getRandomColumn(30, SEED);
+               for(int r = 10; r < 20; r++)
+                       col[r] += 8.0;
+
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(2.0);
+
+               int[] bps = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, 
cs).stream().mapToInt(Integer::intValue)
+                       .toArray();
+
+               assertTrue("expected at least 3 segments", bps.length >= 3);
+               assertTrue("expected breakpoint near jump [10,20]", 
hasBreakInRange(bps, 8, 22));
+       }
+
+       @Test
+       public void testSuccessiveGlobalMSEWithinTarget() {
+               double[] col = getRandomColumn(40, SEED + 1);
+               CompressionSettings cs = new 
CompressionSettingsBuilder().create();
+               cs.setPiecewiseTargetLoss(1.5);
+
+               List<Integer> bps = 
PiecewiseLinearUtils.computeBreakpointSuccessive(col, cs);
+               double sse = 0.0;
+               for(int i = 0; i < bps.size() - 1; i++)
+                       sse += PiecewiseLinearUtils.computeSegmentCost(col, 
bps.get(i), bps.get(i + 1));
+
+               double mse = sse / col.length;
+               assertTrue("global MSE=" + mse + " exceeds target=" + 
cs.getPiecewiseTargetLoss(),
+                       mse <= cs.getPiecewiseTargetLoss() + 1e-10);
+       }
+
+       private boolean hasBreakInRange(int[] bps, int min, int max) {
+               for(int i = 1; i < bps.length - 1; i++)
+                       if(bps[i] >= min && bps[i] <= max)
+                               return true;
+               return false;
+       }
+
+       private double[] getRandomColumn(int len, long seed) {
+               Random rng = new Random(seed);
+               double[] col = new double[len];
+               for(int i = 0; i < len; i++)
+                       col[i] = rng.nextGaussian() * 2 + i * 0.01;
+               return col;
+       }
+
+}
+
+

Reply via email to