This is an automated email from the ASF dual-hosted git repository.

ssiddiqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 9901b72  [SYSTEMDS-3100] Refactor pipelines (adding pruning function)  
 Porterstemming inclusion and various other improvements.
9901b72 is described below

commit 9901b72499321005cd2324d569c74ac6dde5f3e1
Author: Shafaq Siddiqi <[email protected]>
AuthorDate: Tue Aug 3 17:08:00 2021 +0200

    [SYSTEMDS-3100] Refactor pipelines (adding pruning function)
      Porterstemming inclusion and various other improvements.
    
    Closes #1372.
---
 scripts/builtin/abstain.dml                        |  23 +-
 scripts/builtin/applyAndEvaluate.dml               | 148 +++++++++++
 scripts/builtin/bandit.dml                         | 287 +++++++++++++++------
 scripts/builtin/executePipeline.dml                | 190 ++++++++++----
 scripts/builtin/ppca.dml                           | 163 ++++++------
 scripts/builtin/tomeklink.dml                      |  21 +-
 scripts/builtin/topk_cleaning.dml                  | 267 +++++++++++--------
 scripts/pipelines/properties/param.csv             |  34 +--
 scripts/pipelines/properties/primitives.csv        |  14 +-
 scripts/pipelines/properties/testPrimitives.csv    |   6 +-
 scripts/pipelines/scripts/enumerateLogical.dml     |  24 +-
 scripts/pipelines/scripts/utils.dml                |  68 +----
 .../java/org/apache/sysds/common/Builtins.java     |   1 +
 .../pipelines/BuiltinExecutePipelineTest.java      |  55 ++++
 .../BuiltinTopkCleaningClassificationTest.java     |  19 +-
 .../BuiltinTopkCleaningRegressionTest.java         |  21 +-
 ...ssionTest.java => BuiltinTopkEvaluateTest.java} |  37 ++-
 src/test/scripts/functions/builtin/tomeklink.dml   |   2 +
 .../functions/pipelines/applyEvaluateTest.dml      |  89 +++++++
 .../functions/pipelines/executePipelineTest.dml    | 101 ++++++++
 .../intermediates/classification/bestAcc.csv       |   3 +
 .../intermediates/classification/dirtyScore.csv    |   1 +
 .../intermediates/classification/evalHp.csv        |   1 +
 .../pipelines/intermediates/classification/hp.csv  |   3 +
 .../pipelines/intermediates/classification/lp.csv  |   1 +
 .../pipelines/intermediates/classification/pip.csv |   3 +
 .../functions/pipelines/topkLogicalTest.dml        |  80 +++---
 .../pipelines/topkcleaningClassificationTest.dml   | 217 +++++-----------
 .../pipelines/topkcleaningRegressionTest.dml       | 118 ++-------
 29 files changed, 1239 insertions(+), 758 deletions(-)

diff --git a/scripts/builtin/abstain.dml b/scripts/builtin/abstain.dml
index 91730a3..b990498 100644
--- a/scripts/builtin/abstain.dml
+++ b/scripts/builtin/abstain.dml
@@ -25,16 +25,19 @@ return (Matrix[Double] abstain)
 {
 
   # for(i in 1:100) {
-  betas = multiLogReg(X=X, Y=Y, icpt=1, reg=0, maxi=100, maxii=0, 
verbose=FALSE)
-  [prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE)
-  print("accuracy "+accuracy)
-  abstain = cbind(X, Y)
-  inc = ((yhat != Y) & (rowMaxs(prob) > threshold))
-
-  if(sum(inc) > 0)
+  if(min(Y) != max(Y))
   {
-    # print("inc vector "+toString(inc))
-    abstain = removeEmpty(target = cbind(X, Y), margin = "rows", select = (inc 
== 0) )
+    betas = multiLogReg(X=X, Y=Y, icpt=1, reg=1e-4, maxi=100, maxii=0, 
verbose=FALSE)
+    [prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE)
+    print("accuracy "+accuracy)
+    abstain = cbind(X, Y)
+    inc = ((yhat != Y) & (rowMaxs(prob) > threshold))
+    if(sum(inc) > 0)
+    {
+      # print("inc vector "+toString(inc))
+      abstain = removeEmpty(target = cbind(X, Y), margin = "rows", select = 
(inc == 0) )
+    }
   }
-
+  else 
+    abstain = cbind(X, Y)
 }
diff --git a/scripts/builtin/applyAndEvaluate.dml 
b/scripts/builtin/applyAndEvaluate.dml
new file mode 100644
index 0000000..5cabfd6
--- /dev/null
+++ b/scripts/builtin/applyAndEvaluate.dml
@@ -0,0 +1,148 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("scripts/pipelines/scripts/utils.dml") as utils;
+source("scripts/builtin/bandit.dml") as bandit;
+s_applyAndEvaluate = function(Frame[Unknown] trainData, Frame[Unknown] 
testData, Frame[Unknown] metaData = as.frame("NULL"),
+  Frame[Unknown] lp, Frame[Unknown] pip, Matrix[Double] hp, String 
evaluationFunc, Matrix[Double] evalFunHp,
+  Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE)
+return (Matrix[Double] result)
+{
+  no_of_flag_vars = 5
+  schema = metaData[1, 1:ncol(metaData) - 1]
+  mask = as.matrix(metaData[2, 1:ncol(metaData) - 1])
+  fdMask = as.matrix(metaData[3, 1:ncol(metaData) - 1])
+  maskY = as.integer(as.scalar(metaData[2, ncol(metaData)]))
+  metaList = list(mask=mask, schema=schema, fd=fdMask)
+
+  # separate the label
+  [Xtrain, Ytrain] = getLabel(trainData, isLastLabel)
+  [Xtest, Ytest] = getLabel(testData, isLastLabel)
+    
+  # always recode the label 
+  if(maskY == 1) {
+    [eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, 
recode:[1]}");
+    eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", 
meta=M);
+  }
+  else
+  {
+    eYtrain = as.matrix(Ytrain)
+    eYtest = as.matrix(Ytest)
+  }
+    # # # when the evaluation function is called first we also compute and 
keep hyperparams of target application
+  dirtyScore = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, 
metaList=metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp)
+  print("dirty score: "+dirtyScore)
+  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, FALSE, 
correctTypos)
+  
+  # # # if mask has 1s then there are categorical features
+  [eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, FALSE, "recode")
+
+  # construct the parameter list for best hyper-parameters if the oversampling 
technique is part of 
+  # pipeline then take it out because oversampling is not applied on test 
dataset
+  # this condition is unnecessary here in this case because the input dataset 
is balanced and 
+  # instead of diving the dataset into train/test I am doing cross validations
+
+  no_of_param = as.scalar(hp[1, 1]) + 1
+  hp_width= hp[1, 2:no_of_param]
+  hp_matrix = matrix(hp_width, rows=ncol(pip), cols=ncol(hp_width)/ncol(pip))
+  print("hp matrix:\n"+toString(hp_matrix))
+  pipList = list(lp = lp, ph = pip, hp = hp_matrix, flags = no_of_flag_vars)
+  # argList = list(X=X, Y=Y, Xtest=Xtest, Ytest=Ytest, Xorig=clone_X, 
pipList=pipList, metaList=metaList, evalFunHp=evalFunHp, trainML=0)
+  # # # now test accuracy
+  [eXtrain, eYtrain, eXtest, eYtest, a, b,Tr] = executePipeline(logical=lp, 
pipeline=pip, X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, 
metaList=metaList,
+    hyperParameters=hp_matrix, flagsCount=no_of_flag_vars, test=TRUE, 
verbose=FALSE)
+  
+  if(max(eYtrain) == min(eYtrain)) 
+    stop("Y contains only one class")
+
+  score = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtrain, 
Ytest=eYtrain, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = FALSE))
+  trainAccuracy = as.scalar(score[1, 1])
+  
+  score = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, 
Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = FALSE))
+  testAccuracy = as.scalar(score[1, 1])
+
+  
+  result = matrix(0, rows=1, cols=3)
+  result[1, 1] = dirtyScore
+  result[1, 2] = trainAccuracy
+  result[1, 3] = testAccuracy  
+}
+
+runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, 
Frame[String] schema,
+  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE)
+return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
+{
+  if(cv)
+    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, 
CorrectTypos=correctTypos)
+  else
+  {
+    # # # binding train and test to use same dictionary for both
+    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, 
schema=schema, CorrectTypos=correctTypos)
+    Xtrain = XAll[1:nrow(Xtrain),]
+    Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
+  }
+}
+
+recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, 
Matrix[Double] mask, Boolean cv, String code)
+return(Matrix[Double] eXtrain, Matrix[Double] eXtest)
+{
+  if(sum(mask) > 0)
+  {
+    index = vectorToCsv(mask)
+    jspecR = "{ids:true, "+code+":["+index+"]}"
+    [eXtrain, X_meta] = transformencode(target=Xtrain, spec=jspecR);
+    if(!cv)
+      eXtest = transformapply(target=Xtest, spec=jspecR, meta=X_meta);
+    else eXtest = as.matrix(Xtest)
+  } 
+  # if no categorical value exist then just cast the frame into matrix
+  else {
+    eXtrain = as.matrix(Xtrain)
+    eXtest = as.matrix(Xtest)
+  }
+}
+
+getLabel = function(Frame[Unknown] data, Boolean isLastLabel)
+return(Frame[Unknown] X, Frame[Unknown] Y)
+{
+  if(isLastLabel) {
+    X = data[, 1:ncol(data) - 1]
+    Y = data[, ncol(data)]
+  }
+  else 
+  {
+    X = data
+    Y = as.frame("0")
+  }
+}
+
+getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] 
Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc,  
+  Matrix[Double] evalFunHp)
+return(Double dirtyScore)
+{
+  mask = as.matrix(metaList['mask']) 
+  [eXtrain, eXtest] = recodeData(X, Xtest, mask, FALSE, "recode")
+  eXtrain = replace(target=eXtrain, pattern=NaN, replacement=1)
+  eXtest = replace(target=eXtest, pattern=NaN, replacement=1)
+  [eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, 
FALSE, "dummycode")
+  score = eval(evaluationFunc, list(X=eXtrain, Y=Y, Xtest=eXtest, Ytest=Ytest, 
Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = FALSE))
+  dirtyScore = as.scalar(score[1, 1])
+}
diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index ea0da56..5bfed9e 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -19,14 +19,16 @@
 #
 #-------------------------------------------------------------
 
-m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, 
Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList, String 
evaluationFunc, Matrix[Double] evalFunHp,
-  Frame[Unknown] lp, Frame[Unknown] primitives, Frame[Unknown] param, Integer 
k = 3, Integer R=50, Double baseLineScore,
-  Boolean verbose = TRUE)
-  return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, 
Matrix[Double] bestAccuracy, Frame[String] feaFrameOuter) 
+m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, 
Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList,
+  String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] lp, 
Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3,
+  Integer R=50, Double baseLineScore, Boolean cv,  Integer cvk = 2, Boolean 
verbose = TRUE, String output="")
+  return(Boolean perf)
+  # return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, 
Matrix[Double] bestAccuracy, Frame[String] feaFrameOuter) 
 {
   print("Starting optimizer")
   NUM_FEATURES = 14
-  HYPERPARAM_LENGTH = 110
+  FLAG_VARIABLE = 5
+  HYPERPARAM_LENGTH = (ncol(lp) * FLAG_VARIABLE * 3) + 1 ## num of col in 
logical * 5 meat flag vars * max hyperparam per op + 1 accuracy col
   bestPipeline = frame("", rows=1, cols=1)
   bestHyperparams = as.matrix(0)
   bestAccuracy = as.matrix(0)
@@ -35,17 +37,23 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] 
Y_train, Matrix[Doubl
   eta = 2  # the halving ratio is fixed to 2
   s_max = floor(log(R,eta));
   B = (s_max + 1) * R;
-  
+  # [conf, m] = get_physical_configurations(lp, 100, primitives)
+  # index = vectorToCsv(matrix(1, rows=1, cols=ncol(lp)))
+  # jspecR = "{ids:true, recode :["+index+"]}"
+  # [rConf, conf_meta] = transformencode(target=conf, spec=jspecR);
+
   # initialize output variables
   hparam = matrix(0, rows=k*(s_max+1), cols=HYPERPARAM_LENGTH)
   pipeline = frame(0, rows=k*(s_max+1), cols=ncol(lp)+1)
+  pipelineMatrix = matrix(0, rows=k*(s_max+1), cols=ncol(lp)+1)
   startOut=0; endOut=0;
   feaFrameOuter = frame(data=["#MissingValues", "MinVla", "MaxVal", 
"AverageMin", "AverageMax", 
   "#CategoricalFeatures", "#NumericFeatures", "Mean", "#Outliers", 
"#OHEfeatures", "#Classes",
   "Imbalance", "#rows", "#cols", "pipelines", "accuracy", "execution time in 
ms", "CV time in ms"],
   rows = 1, cols = NUM_FEATURES + 4 )
-
-  for(s in s_max:0) {
+  frameList = list()
+  
+  for(s in s_max:0) { # TODO convert to parfor
     
    # result variables
     bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH)
@@ -67,7 +75,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] 
Y_train, Matrix[Doubl
     if(verbose)
       print("n "+ n +"\nR "+ R +"\ns_max "+ s_max +"\nB "+ B +"\nn "+ n +"\nr 
"+ r)
     
-    for( i in 0:s) {
+    for(i in 0:s) {
       # successive halving
       n_i = min(max(as.integer(floor(n * eta^(-i))), 1), nrow(configurations));
       r_i = as.integer(floor(r * eta^i));
@@ -79,8 +87,8 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] 
Y_train, Matrix[Doubl
       }
       
       configurations = configurations[1:n_i, ]
-      [outPip,outHp, feaFrameOuter] = run_with_hyperparam(lp, configurations, 
r_i, X_train, Y_train, X_test, Y_test, metaList,
-        evaluationFunc, evalFunHp, param, feaFrameOuter, verbose)
+      [outPip,outHp, f] = run_with_hyperparam(lp, configurations, r_i, 
X_train, Y_train, X_test, Y_test, metaList,
+        evaluationFunc, evalFunHp, param, feaFrameOuter, cv, cvk, verbose)
       # sort the pipelines by order of accuracy decreasing
       a = order(target = outPip, by = 1, decreasing=TRUE, index.return=FALSE)
       b = order(target = outHp, by = 1, decreasing=TRUE, index.return=FALSE)
@@ -102,24 +110,42 @@ m_bandit = function(Matrix[Double] X_train, 
Matrix[Double] Y_train, Matrix[Doubl
     # keep the best k results for each bracket
     [bracket_bestPipeline, bracket_bestHyperparams] = 
extractBracketWinners(bracket_pipel, bracket_hp, k, lookup)
     # optimize by the features
-
     startOut = endOut + 1
     endOut = endOut + nrow(bracket_bestPipeline)
-    pipeline[startOut: endOut, ] = bracket_bestPipeline
+    pipeline[startOut:endOut, ] = bracket_bestPipeline
+
+    # recordBracketPip = 
transformapply(target=bracket_bestPipeline[,2:ncol(bracket_bestPipeline)], 
meta=conf_meta, spec=jspecR)
+    # pipelineMatrix[startOut:endOut, ] = cbind(bracket_bestHyperparams[, 1], 
recordBracketPip)
+
     hparam[startOut:endOut, 1:ncol(bracket_bestHyperparams)] = 
bracket_bestHyperparams
   }
 
+  # pipelineR = transformdecode(target=pipelineMatrix[, 
2:ncol(pipelineMatrix)], meta=conf_meta, spec=jspecR)
+  # pipelineR = cbind(as.frame(pipelineMatrix[, 1]), pipelineR)
+
   [bestPipeline, bestHyperparams] = extractTopK(pipeline, hparam, 
baseLineScore, k)
 
   bestAccuracy = as.matrix(bestPipeline[,1])
   bestPipeline = bestPipeline[,2:ncol(bestPipeline)]
   bestHyperparams = bestHyperparams[,2:ncol(bestHyperparams)]
-  
+  imp = as.double(as.scalar(bestAccuracy[1, 1])) - as.double(baseLineScore)
+  perf = imp > 0
   if(verbose) {
-    print("best pipeline"+ toString(bestPipeline))
-    print("best hyper-parameters \n"+ toString(bestHyperparams))
-    print("best accuracy \n"+ toString(bestAccuracy))
+    print("dirty accuracy "+toString(baseLineScore))  
+    print("best logical pipelines \n"+toString(lp))  
+    print("topk pipelines \n"+toString(bestPipeline))
+    print("topk hyper params \n"+toString(bestHyperparams))
+    print("topk  scores: \n"+toString(bestAccuracy))
+    print("evalHp: \n"+toString(evalFunHp))
+    print("performance improvement "+ imp)
   }
+  write(bestPipeline, output+"/pip.csv", format="csv")
+  write(bestHyperparams, output+"/hp.csv", format="csv")
+  write(bestAccuracy, output+"/bestAcc.csv", format="csv")
+  write(feaFrameOuter, output+"/featureFrame.csv", format="csv")
+  write(baseLineScore, output+"/dirtyScore.csv", format="csv")
+  write(evalFunHp, output+"/evalHp.csv", format="csv")
+  write(lp, output+"/lp.csv", format="csv")
 }
 
 # this method will extract the physical pipelines for a given logical pipelines
@@ -129,24 +155,27 @@ get_physical_configurations = function(Frame[String] 
logical, Scalar[int] numCon
 {
   # load the primitives
   physical = as.frame("NaN")
-  outliers = primitives[,1]
-  mvi = primitives[,2]
-  noise = primitives[,3]
-  ci = primitives[,4]
-  dim = primitives[,5]
-  dummy = primitives[,6]
-  scale = primitives[,7]
+  ed = primitives[, 1]
+  mvi = primitives[, 2]
+  outliers = primitives[,3]
+  ec = primitives[, 4]
+  scale = primitives[, 5]
+  ci = primitives[, 6]
+  dummy = primitives[,7]
+  dim = primitives[, 8]
  
   operator = frame(0, rows=nrow(primitives), cols=ncol(logical))  # combine 
all logical primitives
   for(j in 1:ncol(logical))
   {
     # extract the physical primitives
-    if(as.scalar(logical[1,j]) == "OTLR")
+    if(as.scalar(logical[1,j]) == "ED")
+      operator[, j] = ed;
+    else if(as.scalar(logical[1,j]) == "EC")
+      operator[, j] = ec;  
+    else if(as.scalar(logical[1,j]) == "OTLR")
       operator[, j] = outliers;
     else if(as.scalar(logical[1,j]) == "MVI")
       operator[, j] = mvi;
-    else if(as.scalar(logical[1,j]) == "NR")
-      operator[, j] = noise;
     else if(as.scalar(logical[1,j]) == "CI")
       operator[, j] = ci;
     else if(as.scalar(logical[1,j]) == "DIM")
@@ -195,15 +224,14 @@ get_physical_configurations = function(Frame[String] 
logical, Scalar[int] numCon
 }
 
 # this method will call the execute pipelines with their hyper-parameters
-run_with_hyperparam = function(Frame[Unknown] lp, Frame[Unknown] ph_pip, 
Integer r_i, Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, 
Matrix[Double] Ytest, 
-  List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp, 
Frame[Unknown] param, Frame[Unknown] featureFrameOuter,
-  Boolean verbose)
+run_with_hyperparam = function(Frame[Unknown] lp, Frame[Unknown] ph_pip, 
Integer r_i, Matrix[Double] X, Matrix[Double] Y,
+  Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String 
evaluationFunc, Matrix[Double] evalFunHp,
+  Frame[Unknown] param, Frame[Unknown] featureFrameOuter, Boolean cv,  Integer 
cvk = 2, Boolean verbose)
   return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, 
Frame[Unknown] featureFrameOuter) {
   print("run_with_hyperparam started")
-  output_hp = matrix(0, nrow(ph_pip)*r_i, 100)
+  output_hp = matrix(0, nrow(ph_pip)*r_i, ncol(lp) * 5 * 3)
   output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
   output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
-  
   # rows in validation set
   clone_X = X
   clone_Y = Y
@@ -212,7 +240,7 @@ run_with_hyperparam = function(Frame[Unknown] lp, 
Frame[Unknown] ph_pip, Integer
   index = 1
   id = as.matrix(ph_pip[, 1])
   ph_pip = ph_pip[, 2:ncol(ph_pip)]
-  
+  evalFunOutput = as.matrix(0)
   feaVec = gatherStats(X, Y, as.matrix(metaList['mask']))
 
   for(i in 1:nrow(ph_pip))
@@ -222,6 +250,8 @@ run_with_hyperparam = function(Frame[Unknown] lp, 
Frame[Unknown] ph_pip, Integer
     if(ncol(featureFrameOuter) > 1)
       feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
     pip_toString = pipToString(ph_pip[i])
+    hpForPruning = matrix(0, rows=1, cols=ncol(lp))
+    changesByOp = matrix(0, rows=1, cols=ncol(lp))
     for(r in 1:no_of_res)
     {
       # as the matrix first block of r rows belongs to first operator and r+1 
block of rows to second operator 
@@ -231,34 +261,53 @@ run_with_hyperparam = function(Frame[Unknown] lp, 
Frame[Unknown] ph_pip, Integer
       indexes = cumsum(indexes)
       indexes = table(indexes, 1, 1, nrow(hp), 1)
       hp_matrix = removeEmpty(target = hp, margin="rows", select = indexes)
-      # # # clean the train data
-      [X, Y, Tr] = executePipeline(lp, ph_pip[i], X, Y, 
as.matrix(metaList['mask']), as.matrix(metaList['fd']),
-        hp_matrix, no_of_flag_vars, FALSE, FALSE)
-      # # # clean the test data
-      [Xtest, Ytest, T] = executePipeline(lp, ph_pip[i], Xtest, Ytest, 
as.matrix(metaList['mask']), as.matrix(metaList['fd']),
-        hp_matrix, no_of_flag_vars, TRUE, FALSE)
-      argList = list(X=X, Y=Y, Xtest=Xtest, Ytest=Ytest, Xorig=clone_X, 
metaList=metaList, evalFunHp=evalFunHp, trainML=0)
-      t1 = time()
-      evalFunOutput = eval(evaluationFunc, argList)  
-      accT = floor((time() - t1) / 1e+6)  
-      matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix))
-      hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, 
cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
-      output_accuracy[index, 1] = as.scalar(evalFunOutput[1, 1])
-      output_hp[index, 1:ncol(hp_vec)] = hp_vec
-      output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
-      X = clone_X
-      Y = clone_Y
-      Xtest = clone_Xtest
-      Ytest = clone_Ytest
-      index = index + 1
+      # # check if the pruning could be applied to avoid unnecessary executions
+      executionSingnal = pruningSignal(ph_pip[i], hp_matrix, hpForPruning, 
changesByOp)
+
+      if(executionSingnal)
+      {
+        t1 = time()
+        
+        if(cv)
+        {
+          pipList = list(lp = lp, ph = ph_pip[i], hp = hp_matrix, flags = 
no_of_flag_vars)
+          [evalFunOutput, hpForPruning, changesByOp] = crossV(X=X, y=Y, 
cvk=cvk, evalFunHp=evalFunHp, pipList=pipList, metaList=metaList, 
hpForPruning=hpForPruning, 
+          changesByOp=changesByOp, evalFunc=evaluationFunc, trainML = FALSE)
+          print(cvk+" cross validations acc: "+toString(evalFunOutput))
       
-      if(ncol(featureFrameOuter) > 1) {
-        feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
-        feaFrame[r, (ncol(feaVec)+1)] = pip_toString
-        feaFrame[r, (ncol(feaVec)+2)] = as.scalar(evalFunOutput[1, 1])
-        feaFrame[r, (ncol(feaVec)+3)] = Tr
-        feaFrame[r, (ncol(feaVec)+4)] = accT
+        }
+        else 
+        {
+          [eXtrain, eYtrain, eXtest, eYtest, Tr] = executePipeline(logical=lp, 
pipeline=ph_pip[i], X=X, Y=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList,
+            hyperParameters=hp_matrix, hpForPruning=hpForPruning, 
changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE)
+          if(max(eYtrain) == min(eYtrain)) 
+            print("Y contains only one class")
+          else 
+            evalFunOutput = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, 
Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 
0))
+          print("holdout acc: "+toString(evalFunOutput))
+        }
+
+        # evalFunOutput = eval(evaluationFunc, argList)  
+        accT = floor((time() - t1) / 1e+6)  
+        matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix))
+        hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, 
cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
+        output_accuracy[index, 1] = as.scalar(evalFunOutput[1, 1])
+        output_hp[index, 1:ncol(hp_vec)] = hp_vec
+        output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
+        X = clone_X
+        Y = clone_Y
+        Xtest = clone_Xtest
+        Ytest = clone_Ytest
+        if(ncol(featureFrameOuter) > 1) {
+          feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
+          feaFrame[r, (ncol(feaVec)+1)] = pip_toString
+          feaFrame[r, (ncol(feaVec)+2)] = as.scalar(evalFunOutput[1, 1])
+          feaFrame[r, (ncol(feaVec)+3)] = accT #Tr
+          feaFrame[r, (ncol(feaVec)+4)] = accT
+        }
       }
+      else print("prunningAlert: not executing instance : "+r)
+      index = index + 1
     }
     
     X = clone_X
@@ -285,7 +334,7 @@ getHyperparam = function(Frame[Unknown] pipeline, 
Frame[Unknown]  hpList, Intege
   # store the row indexes of the operator matches
   indexes = matrix(0, rows= ncol(pipeline), cols=1)
   paramCount = matrix(0, rows= ncol(pipeline), cols=1)
-  for(k in 1:ncol(pipeline))
+  parfor(k in 1:ncol(pipeline))
   {
     op = as.scalar(pipeline[1,k])
     hasParam = map(hpList[,1], "x->x.split(\",\")[0].equals(\""+op+"\")")
@@ -333,7 +382,10 @@ getHyperparam = function(Frame[Unknown] pipeline, 
Frame[Unknown]  hpList, Intege
           OpParam[, j] = val;
         }
         else if(type == "INT") {
-          val = sample(as.integer(maxVal), no_of_res, TRUE);
+          if(as.integer(maxVal) > no_of_res)
+            val = sample(as.integer(maxVal), no_of_res, FALSE)
+          else 
+            val = sample(as.integer(maxVal), no_of_res, TRUE)
           less_than_min = val < as.integer(minVal);
           val = (less_than_min * minVal) + val;
           OpParam[, j] = val;
@@ -370,10 +422,11 @@ getHyperparam = function(Frame[Unknown] pipeline, 
Frame[Unknown]  hpList, Intege
 
 # extract the top k pipelines as a final result after deduplication and sorting
 extractTopK = function(Frame[Unknown] pipeline, Matrix[Double] hyperparam, 
-  Double testAccuracy, Integer k)
+  Double baseLineScore, Integer k)
   return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams)
 {
-
+  # # # take out the accuracy from pipelines
+  pipeline = pipeline[, 2:ncol(pipeline)]
   idx = vectorToCsv(seq(1, ncol(pipeline)))
   jspecDC = "{ids:true, recode:["+idx+"]}";
   # OHE of categorical features
@@ -387,13 +440,13 @@ extractTopK = function(Frame[Unknown] pipeline, 
Matrix[Double] hyperparam,
   if(sum(dup) > 0)
   {
     # take out the unique tuples
-    uniqueTuples =  removeEmpty(target= forDedup, margin="rows", select = (dup 
==0))
+    uniqueTuples = removeEmpty(target=forDedup, margin="rows", select=(dup==0))
     # remove the zero rows, identifiers of unique records
-    dup =  removeEmpty(target = dup, margin="rows")
+    dup = removeEmpty(target=dup, margin="rows")
     # get the counts of duplicate tuples with their tuple id
-    dist = table(dup, 1) > 0
-    dist = dist * seq(1, nrow(dist))
-    countsVal = removeEmpty(target= dist, margin="rows")
+    countDist = table(dup, 1) > 0
+    countDist = countDist * seq(1, nrow(countDist))
+    countsVal = removeEmpty(target=countDist, margin="rows")
     indexes = table(seq(1, nrow(countsVal)),countsVal,1,nrow(countsVal), 
cols=nrow(forDedup))
 
     # for each duplicate record just take the one reocrd and strip the others
@@ -405,24 +458,24 @@ extractTopK = function(Frame[Unknown] pipeline, 
Matrix[Double] hyperparam,
   
   # decode the pipelines
   decoded = transformdecode(target=forDedup[, 1:ncol(pipeline)], meta=dM, 
spec=jspecDC)
-  
   # separate the pipelines and hyper-parameters
   pipeline = decoded[, 1:ncol(pipeline)]
   hyperparam = forDedup[, ncol(pipeline)+1:ncol(forDedup)]
 
   # sort results
+  # # add accuracy back
+  pipeline = cbind(as.frame(forDedup[, ncol(pipeline)+1]), pipeline)
   hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, 
index.return=FALSE)
   pipeline = frameSort(pipeline, TRUE)
 
 
   # remove the row with accuracy less than test accuracy 
-  mask = (hyperparam[, 1] < testAccuracy) == 0
+  mask = (hyperparam[, 1] < baseLineScore) == 0
   hyperparam = removeEmpty(target = hyperparam, margin = "rows", select = mask)
-  rowIndex = ifelse(nrow(hyperparam) > k, k, nrow(hyperparam))
+  rowIndex = min(nrow(hyperparam), k)
   # select the top k
   bestPipeline = pipeline[1:rowIndex,]
-  bestHyperparams = hyperparam[1:rowIndex,]
-  
+  bestHyperparams = hyperparam[1:rowIndex,]  
 }
 
 # extract the top k pipelines for each bracket, the intermediate results
@@ -443,7 +496,7 @@ extractBracketWinners = function(Matrix[Double] pipeline, 
Matrix[Double] hyperpa
     out = conf[index, 2:ncol(conf)]
     bestPipeline[i, 1] = as.frame(pipeline[i, 1])
     bestPipeline[i, 2:ncol(bestPipeline)] = out
-  }  
+  }
 }
 
 ###########################################################################
@@ -540,8 +593,6 @@ return (Double precision, Double T)
   precision = max(0.001, sum(match) / max(1, correctionsMade))
   T = floor((time() - t1) / 1e+6)
   print("Precision: "+toString(precision) + " in "+T+" ms")
-
-
 }
 
 pipToString = function(Frame[String] F)
@@ -552,3 +603,87 @@ return (String s)
     s = s + as.scalar(F[1,i])+";"
 
 }
+
+crossV = function(Matrix[double] X, Matrix[double] y, Integer cvk, 
Matrix[Double] evalFunHp, List[Unknown] pipList, List[Unknown] metaList,
+  Matrix[Double] hpForPruning = as.matrix(0), Matrix[Double] changesByOp = 
as.matrix(0), String evalFunc, Boolean trainML = FALSE) 
+return (Matrix[Double] accuracy, Matrix[Double] hpForPruning, Matrix[Double] 
changesByOp)
+{
+  accuracyMatrix = matrix(0, cvk, 1)
+  dataList = list()
+  testL = list()
+  data = order(target = cbind(y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
+  classes = table(data[, 1], 1)
+  ins_per_fold = classes/cvk
+  start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
+  fold_idxes = cbind(start_fold, ins_per_fold)
+
+  start_i = 0; end_i = 0; idx_fold = 1;;
+  for(i in 1:cvk)
+  {
+    fold_i = matrix(0, 0, ncol(data))
+    start=0; end=0; 
+    for(j in 1:nrow(classes))
+    {
+      idx = as.scalar(classes[j, 1])
+      start = end + 1;
+      end = end + idx
+      class_j =  data[start:end, ]
+      start_i = as.scalar(fold_idxes[j, 1]);
+      end_i = as.scalar(fold_idxes[j, 2])
+      fold_i = rbind(fold_i, class_j[start_i:end_i, ])
+    }
+    dataList = append(dataList, fold_i)
+    fold_idxes[, 1] = fold_idxes[, 2] + 1
+    fold_idxes[, 2] += ins_per_fold
+  }
+
+  for(i in seq(1,cvk))
+  {
+    [trainList, hold_out] = remove(dataList, i)
+    trainset = rbind(trainList)
+    testset = as.matrix(hold_out)
+    trainX = trainset[, 2:ncol(trainset)]
+    trainy = trainset[, 1]
+    testX = testset[, 2:ncol(testset)]
+    testy = testset[, 1]
+    # print("test in: "+nrow(testy))
+    if(as.scalar(pipList['flags']) != 0)
+    {
+      [trainX, trainy, testX, testy, Tr, hpForPruning, changesByOp] = 
executePipeline(logical=as.frame(pipList['lp']), 
pipeline=as.frame(pipList['ph']),
+        X=trainX, Y=trainy, Xtest= testX, Ytest=testy, metaList=metaList, 
hyperParameters=as.matrix(pipList['hp']), hpForPruning=hpForPruning,
+        changesByOp=changesByOp, flagsCount=as.scalar(pipList['flags']), 
test=TRUE, verbose=FALSE)
+    }
+    # print("test out: "+nrow(testy))
+    res = eval(evalFunc, list(X=trainX, Y=trainy, Xtest=testX, Ytest=testy, 
Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 0))
+    accuracyMatrix[i] = res
+  }
+  print(cvk+" CV: accuracy matrix: \n"+toString(accuracyMatrix))
+  print(cvk+" CV: average accuracy: "+mean(accuracyMatrix))
+  accuracy = as.matrix(mean(accuracyMatrix))
+}
+
+pruningSignal = function(Frame[Unknown] ph_pip, Matrix[Double] hp_matrix, 
Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
+return(Boolean execute)
+{
+  execute = TRUE
+  prune = (hpForPruning > 0) & (changesByOp == 0)
+  changeCount = 0
+  # # if there exist a case where the changes done by an operation are zeros
+  if(sum(prune) > 0)
+  {
+    # get the non-zero index of hpForPruning
+    idx = (hpForPruning > 0) * t(seq(1, ncol(hpForPruning)))
+    idx = removeEmpty(target=idx, margin="cols")
+    print("idx: "+toString(idx))
+    for(i in 1:ncol(idx)) {
+      index = as.scalar(idx[1, i])
+      inProcessHp = as.scalar(hp_matrix[index, 2])
+      prvHp = as.scalar(hpForPruning[1, index])
+      if(inProcessHp > prvHp)
+        changeCount = changeCount + 1
+    }
+  }
+  execute = !(changeCount > 0)
+}
+
+
diff --git a/scripts/builtin/executePipeline.dml 
b/scripts/builtin/executePipeline.dml
index 682155d..3d88fee 100644
--- a/scripts/builtin/executePipeline.dml
+++ b/scripts/builtin/executePipeline.dml
@@ -19,11 +19,23 @@
 #
 #-------------------------------------------------------------
 
-s_executePipeline = function(Frame[String] logical = as.frame("NULL"), 
Frame[String] pipeline, Matrix[Double] X,  Matrix[Double] Y, Matrix[Double] 
mask,
-  Matrix[Double] FD, Matrix[Double] hyperParameters, Integer flagsCount, 
Boolean test = FALSE, Boolean verbose)
-  return (Matrix[Double] X, Matrix[Double] Y, Double t2)
+s_executePipeline = function(Frame[String] logical = as.frame("NULL"), 
Frame[String] pipeline, Matrix[Double] X,  Matrix[Double] Y, 
+  Matrix[Double] Xtest,  Matrix[Double] Ytest, List[Unknown] metaList, 
Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0),
+  Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test 
= FALSE, Boolean verbose)
+  return (Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, 
Matrix[Double] Ytest, Double t2, Matrix[Double] hpForPruning, Matrix[Double] 
changesByOp)
 {
-  t1 = time();
+  mask=as.matrix(metaList['mask'])
+  FD = as.matrix(metaList['fd'])
+
+  cloneY = Y
+  Xorig = X
+  # # combine X and Y
+  n = nrow(X)
+  d = ncol(Xorig)
+  X = rbind(X, Xtest)
+  Y = rbind(Y, Ytest)
+  testRow = nrow(Xtest)
+  t1 = time()
   print("PIPELINE EXECUTION START ... "+toString(pipeline))
 
   if(verbose) {
@@ -32,41 +44,76 @@ s_executePipeline = function(Frame[String] logical = 
as.frame("NULL"), Frame[Str
     print("pipeline hps "+toString(hyperParameters))
   }
   for(i in 1:ncol(pipeline)) {
+    trainEndIdx = (nrow(X) - nrow(Xtest))
+    testStIdx = trainEndIdx + 1
     op = as.scalar(pipeline[1,i])
     lgOp = as.scalar(logical[1,i])
-    if(test == FALSE | lgOp != "CI") {    
-      [hp, withClass, dataFlag] = matrixToList(X, Y, mask, FD, 
hyperParameters[i], flagsCount, op)
-      Xclone = X
+    
+    if(test == FALSE | lgOp != "CI") {
+       Xclone = X 
+      [hp, dataFlag, yFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], 
flagsCount, op)
       X = eval(op, hp)
+      Xout = X
+      
+      X = confirmData(X, Xclone, mask, dataFlag, yFlag)
       # dataFlag 0 = only on numeric, 1 = on whole data
-      X = confirmData(X, Xclone, mask, dataFlag)
-      if(withClass)
+      if(yFlag)
       {
         Y = X[, ncol(X)]
         X = X[, 1:ncol(X) - 1]
       }
-
       X = confirmMeta(X, mask)
     }
-    else{
+    else {
+      Xclone = X 
       print("not applying "+lgOp+" "+op+" on data test flag: "+test)
+      Xtest = X[testStIdx:nrow(X), ]
+      Ytest = Y[testStIdx:nrow(X), ]
+      X = X[1:trainEndIdx, ]
+      Y = Y[1:trainEndIdx, ]
+      [hp, dataFlag, yFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], 
flagsCount, op)
+      X = eval(op, hp)
+      X = confirmData(X, Xclone, mask, dataFlag, yFlag)
+      # dataFlag 0 = only on numeric, 1 = on whole data
+      if(yFlag)
+      {
+        Y = X[, ncol(X)]
+        X = X[, 1:ncol(X) - 1]
+      }
+      X = confirmMeta(X, mask)
+      X = rbind(X, Xtest)
+      Y = rbind(Y, Ytest)
     }
+    if(as.scalar(pipeline[1, i]) == "outlierBySd" | as.scalar(pipeline[1, i]) 
== "outlierByIQR" | as.scalar(pipeline[1, i]) == "imputeByFd") {
+      changes = sum(abs(replace(target=Xout, pattern=NaN, replacement=0) - 
replace(target=as.matrix(hp[1]), pattern=NaN, replacement=0))  > 0.001 )
+      [hpForPruning, changesByOp] = storeDataForPrunning(pipeline, 
hyperParameters, hpForPruning,  changesByOp, changes, i)
+      print("ended "+op+" number of changes "+changes)
+      # print("ended "+op+" number of changes "+sum(abs(replace(target=X, 
pattern=NaN, replacement=0) - replace(target=Xclone, pattern=NaN, 
replacement=0))  > 0.001 ))
+    }
+
+    print("min max of Y: "+min(Y)+" "+max(Y))
   }
+  Xtest = X[testStIdx:nrow(X), ]
+  Ytest = Y[testStIdx:nrow(X), ]
+  X = X[1:trainEndIdx]
+  Y = Y[1:trainEndIdx]
+  # # # do a quick validation check
+  if(nrow(Xtest) != testRow)
+    stop("executePipeline: test rows altered")
   t2 = floor((time() - t1) / 1e+6)
+
   print("PIPELINE EXECUTION ENDED: "+t2+" ms")
 }
 
 # This function will convert the matrix row-vector into list
 matrixToList = function(Matrix[Double] X,  Matrix[Double] Y, Matrix[Double] 
mask, Matrix[Double] FD,
   Matrix[Double] p, Integer flagsCount, String op)
-  return (List[Unknown] l, Boolean hasY, Integer dataFlag)
+  return (List[Unknown] l, Integer dataFlag, Integer yFlag)
 {
   NUM_META_FLAGS = flagsCount;
-  hasY = FALSE
-
   dataFlag = as.integer(as.scalar(p[1, ncol(p)]))
-  hasVerbose = as.scalar(p[1, ncol(p) - 1])
-  yFlag = as.scalar(p[1, ncol(p) - 2])
+  hasVerbose = as.integer(as.scalar(p[1, ncol(p) - 1]))
+  yFlag = as.integer(as.scalar(p[1, ncol(p) - 2]))
   fDFlag = as.integer(as.scalar(p[1, ncol(p)-3]))
   maskFlag = as.integer(as.scalar(p[1, ncol(p)-4]))
   
@@ -91,7 +138,6 @@ matrixToList = function(Matrix[Double] X,  Matrix[Double] Y, 
Matrix[Double] mask
  
   if(yFlag == 1) {
     l = append(l, Y)
-    hasY = TRUE
   }
   ######################################################
   # CHECK FOR FD APPEND FLAG
@@ -148,12 +194,15 @@ return (Matrix[Double] X)
 }
 
 
-confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, 
Matrix[Double] mask, Integer dataFlag)
+confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, 
Matrix[Double] mask, Integer dataFlag, Integer yFlag)
 return (Matrix[Double] X)
 {
-  # print("changes data \n"+toString(nX, rows=10))
+  if(yFlag == 1)
+  {
+    Y = nX[, ncol(nX)]
+    nX = nX[, 1: ncol(nX) - 1]
   
-  while(FALSE){}
+  }
   if(dataFlag == 0 & (sum(mask) > 0))
   {
     maxDummy = max(nX) + 1
@@ -194,6 +243,10 @@ return (Matrix[Double] X)
   }
   else X = nX
     # print("recreated data \n"+toString(X, rows = 20))
+    
+  if(yFlag == 1)
+    X = cbind(X, Y)
+  
 }
 
 
@@ -225,20 +278,27 @@ return (Matrix[Double] dX_train) {
 # Output: filled matrix X
 #######################################################################
 
-imputeByFd = function(Matrix[Double] X, Matrix[Double] FD,  Double threshold)
+imputeByFd = function(Matrix[Double] X, Matrix[Double] fdMask,  Double 
threshold)
 return (Matrix[Double] X_filled)
 {
-  if(sum(FD) > 0)
+  if(sum(fdMask) > 0)
   {
-    for(i in 1: nrow(FD))
+    FD = discoverFD(X=replace(target=X, pattern=NaN, replacement=1), 
Mask=fdMask, threshold=threshold)
+    FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD 
+    FD = FD > 0
+    if(sum(FD) > 0)
     {
-      for(j in 1:ncol(FD)) {
-        if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) & 
(sum(FD[, j]) != nrow(FD)))
-          X = imputeByFD(X, i, j, threshold, FALSE)
+      for(i in 1: nrow(FD))
+      {
+        for(j in 1:ncol(FD)) {
+          if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) 
& (sum(FD[, j]) != nrow(FD)))
+            X = imputeByFD(X, i, j, threshold, FALSE)
+        }
       }
     }
   }
   X_filled = X
+  print("imputeByFd: record changes: "+sum(X_filled != X))
 }
 
 #######################################################################
@@ -251,6 +311,7 @@ return (Matrix[Double] X_filled)
 { 
   option = ifelse(op, "locf", "nocb")
   X_filled = na_locf(X=X, option=option, verbose=verbose)
+  print("nulls after forward_fill: "+sum(is.na(X_filled)))
 }
 
 
@@ -259,34 +320,46 @@ return (Matrix[Double] X_filled)
 SMOTE  = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, 
Integer remainingRatio, Boolean verbose)
 return (Matrix[Double] XY)
 {
-  XY = order(target = cbind(Y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
-  synthesized = matrix(0,0,0) # initialize variable
   # get the class count 
-  classes = table(XY[, 1], 1)
-  start_class = 1
-  end_class = 0
-  k = table(XY[, 1], 1)
-  getMax = max(k)
-  maxKIndex = as.scalar(rowIndexMax(t(k)))
-  outSet = matrix(0, 0, ncol(XY))
-  remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - 
(remainingRatio%%100)),
-  remainingRatio-(remainingRatio%%100))
-  for(i in 1: nrow(k)) {
-    end_class = end_class + as.scalar(classes[i])
-    class_t = XY[start_class:end_class, ]
-    if((i != maxKIndex)) {
-      synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, 
FALSE)
-      synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 
1), synthesized)
-      outSet = rbind(outSet, synthesized)
+  classes = table(Y[, 1], 1)
+  minClass = min(classes)
+  maxClass = max(classes)
+  diff = (maxClass - minClass)/sum(classes)
+  if(diff > 0.5)
+  {
+    print("initiating oversampling")
+    XY = order(target = cbind(Y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
+    synthesized = matrix(0,0,0) # initialize variable
+    start_class = 1
+    end_class = 0
+    k = table(XY[, 1], 1)
+    getMax = max(k)
+    maxKIndex = as.scalar(rowIndexMax(t(k)))
+    outSet = matrix(0, 0, ncol(XY))
+    remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - 
(remainingRatio%%100)),
+    remainingRatio-(remainingRatio%%100))
+    print("remaining ratio: "+remainingRatio)
+    for(i in 1: nrow(k), check=0) {
+      end_class = end_class + as.scalar(classes[i])
+      class_t = XY[start_class:end_class, ]
+      if((i != maxKIndex)) {
+        synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, 
FALSE)
+        synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 
1), synthesized)
+        outSet = rbind(outSet, synthesized)
+      }
+      start_class = end_class + 1
     }
-    start_class = end_class + 1
-  }
   
-  XY = rbind(XY, synthesized)
-  Y = XY[, 1]
-  X = XY[, 2:ncol(XY)]
-  XY = cbind(X,Y)
-  classes = table(Y, 1)
+    XY = rbind(XY, synthesized)
+    Y = XY[, 1]
+    X = XY[, 2:ncol(XY)]
+    XY = cbind(X,Y)
+    classes = table(Y, 1)
+  }
+  else { 
+    print("smote not applicable")
+    XY = cbind(X, Y)
+  }
 }
 
 
@@ -341,8 +414,21 @@ m_pca = function(Matrix[Double] X, Integer K=2, Boolean 
center=TRUE, Boolean sca
     else Xout = X # these elses could be removed via initiating Xout = X for 
now they are here for readability
   }
   else Xout = X
-
+  Xout = replace(target=Xout, pattern=1/0, replacement=0);
 }
 
+wtomeklink = function(Matrix[Double] X, Matrix[Double] y)
+return (Matrix[Double] XY) {
+  [Xunder, Yunder, rmv] = tomeklink(X, y)
+  XY = cbind(Xunder, Yunder)
+}
 
+storeDataForPrunning = function(Frame[Unknown] pipeline, Matrix[Double] hp, 
Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Integer changes, 
Integer i)
+return(Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
+{
+  if(ncol(hpForPruning) > 1) {
+    hpForPruning[1, i] = hp[i, 2]
+    changesByOp[1, i] = changes
+  }
+}
 
diff --git a/scripts/builtin/ppca.dml b/scripts/builtin/ppca.dml
index 2683f90..dfd7452 100644
--- a/scripts/builtin/ppca.dml
+++ b/scripts/builtin/ppca.dml
@@ -48,102 +48,109 @@ m_ppca = function(Matrix[Double] X, Integer K=2, Integer 
maxi = 10,
 {
   n = nrow(X);
   m = ncol(X);
+  if(K < m)
+  {
+    #initializing principal components matrix
+    C =  rand(rows=m, cols=K, pdf="normal");
+    ss = rand(rows=1, cols=1, pdf="normal");
+    ss = as.scalar(ss);
+    ssPrev = ss;
 
-  #initializing principal components matrix
-  C =  rand(rows=m, cols=K, pdf="normal");
-  ss = rand(rows=1, cols=1, pdf="normal");
-  ss = as.scalar(ss);
-  ssPrev = ss;
-
-  # best selected principle components - with the lowest reconstruction error
-  PC = C;
+    # best selected principle components - with the lowest reconstruction error
+    PC = C;
 
-  # initilizing reconstruction error
-  RE = tolrecerr+1;
-  REBest = RE;
+    # initilizing reconstruction error
+    RE = tolrecerr+1;
+    REBest = RE;
 
-  Z = matrix(0,rows=1,cols=1);
+    Z = matrix(0,rows=1,cols=1);
 
-  #Objective function value
-  ObjRelChng = tolobj+1;
+    #Objective function value
+    ObjRelChng = tolobj+1;
 
-  # mean centered input matrix - dim -> [n,m]
-  Xm = X - colMeans(X);
+    # mean centered input matrix - dim -> [n,m]
+    Xm = X - colMeans(X);
 
-  #I -> k x k
-  ITMP = matrix(1,rows=K,cols=1);
-  I = diag(ITMP);
+    #I -> k x k
+    ITMP = matrix(1,rows=K,cols=1);
+    I = diag(ITMP);
 
-  i = 0;
-  while (i < maxi & ObjRelChng > tolobj & RE > tolrecerr){
-    #Estimation step - Covariance matrix
-    #M -> k x k
-    M = t(C) %*% C + I*ss;
+    i = 0;
+    while (i < maxi & ObjRelChng > tolobj & RE > tolrecerr){
+      #Estimation step - Covariance matrix
+      #M -> k x k
+      M = t(C) %*% C + I*ss;
 
-    #Auxilary matrix with n latent variables
-    # Z -> n x k
-    Z = Xm %*% (C %*% inv(M));
+      #Auxilary matrix with n latent variables
+      # Z -> n x k
+      Z = Xm %*% (C %*% inv(M));
 
-    #ZtZ -> k x k
-    ZtZ = t(Z) %*% Z + inv(M)*ss;
+      #ZtZ -> k x k
+      ZtZ = t(Z) %*% Z + inv(M)*ss;
 
-    #XtZ -> m x k
-    XtZ = t(Xm) %*% Z;
+      #XtZ -> m x k
+      XtZ = t(Xm) %*% Z;
 
-    #Maximization step
-    #C ->  m x k
-    ZtZ_sum = sum(ZtZ); #+n*inv(M));
-    C = XtZ/ZtZ_sum;
+      #Maximization step
+      #C ->  m x k
+      ZtZ_sum = sum(ZtZ); #+n*inv(M));
+      C = XtZ/ZtZ_sum;
 
-    #ss2 -> 1 x 1
-    ss2 = trace(ZtZ * (t(C) %*% C));
+      #ss2 -> 1 x 1
+      ss2 = trace(ZtZ * (t(C) %*% C));
 
-    #ss3 -> 1 x 1
-    ss3 = sum((Z %*% t(C)) %*% t(Xm));
+      #ss3 -> 1 x 1
+      ss3 = sum((Z %*% t(C)) %*% t(Xm));
 
-    #Frobenius norm of reconstruction error -> Euclidean norm
-    #Fn -> 1 x 1
-    Fn = sum(Xm*Xm);
+      #Frobenius norm of reconstruction error -> Euclidean norm
+      #Fn -> 1 x 1
+      Fn = sum(Xm*Xm);
+  
+      #ss -> 1 x 1
+      ss = (Fn + ss2 - 2*ss3)/(n*m);
 
-    #ss -> 1 x 1
-    ss = (Fn + ss2 - 2*ss3)/(n*m);
+      #calculating objective function relative change
+      ObjRelChng = abs(1 - ss/ssPrev);
+      #print("Objective Relative Change: " + ObjRelChng + ", Objective: " + 
ss);
 
-    #calculating objective function relative change
-    ObjRelChng = abs(1 - ss/ssPrev);
-    #print("Objective Relative Change: " + ObjRelChng + ", Objective: " + ss);
+      #Reconstruction error
+      R = ((Z %*% t(C)) -  Xm);
 
-    #Reconstruction error
-    R = ((Z %*% t(C)) -  Xm);
+      #calculate the error
+      #TODO rethink calculation of reconstruction error ....
+      #1-Norm of reconstruction error - a big dense matrix
+      #RE -> n x m
+      RE = abs(sum(R)/sum(Xm));
+      if (RE < REBest){
+        PC = C;
+        REBest = RE;
+      }
+      #print("ss: " + ss +" = Fn( "+ Fn +" ) + ss2( " + ss2  +" ) - 2*ss3( " + 
ss3 + " ), Reconstruction Error: " + RE);
 
-    #calculate the error
-    #TODO rethink calculation of reconstruction error ....
-    #1-Norm of reconstruction error - a big dense matrix
-    #RE -> n x m
-    RE = abs(sum(R)/sum(Xm));
-    if (RE < REBest){
-      PC = C;
-      REBest = RE;
+      ssPrev = ss;
+      i = i+1;
     }
-    #print("ss: " + ss +" = Fn( "+ Fn +" ) + ss2( " + ss2  +" ) - 2*ss3( " + 
ss3 + " ), Reconstruction Error: " + RE);
-
-    ssPrev = ss;
-    i = i+1;
+    if( verbose )
+      print("Objective Relative Change: " + ObjRelChng);
+    if( verbose )
+      print ("Number of iterations: " + i + ", Reconstruction Err: " + REBest);
+
+    # reconstructs data
+    # RD -> n x k
+    Xout = X %*% PC;
+
+    # calculate eigenvalues - principle component variance
+    RDMean = colMeans(Xout);
+    V = t(colMeans(Xout^2) - (RDMean^2));
+
+    # sorting eigenvalues and eigenvectors in decreasing order
+    V_decr_idx = order(target=V,by=1,decreasing=TRUE,index.return=TRUE);
+    VF_decr = table(seq(1,nrow(V)),V_decr_idx);
+    Mout = PC %*% VF_decr;  # vectors (values via VF_decr %*% V)
+  }
+  else 
+  {
+    Xout = X
+    Mout = as.matrix(0)
   }
-  if( verbose )
-    print("Objective Relative Change: " + ObjRelChng);
-  if( verbose )
-    print ("Number of iterations: " + i + ", Reconstruction Err: " + REBest);
-
-  # reconstructs data
-  # RD -> n x k
-  Xout = X %*% PC;
-
-  # calculate eigenvalues - principle component variance
-  RDMean = colMeans(Xout);
-  V = t(colMeans(Xout^2) - (RDMean^2));
-
-  # sorting eigenvalues and eigenvectors in decreasing order
-  V_decr_idx = order(target=V,by=1,decreasing=TRUE,index.return=TRUE);
-  VF_decr = table(seq(1,nrow(V)),V_decr_idx);
-  Mout = PC %*% VF_decr;  # vectors (values via VF_decr %*% V)
 }
diff --git a/scripts/builtin/tomeklink.dml b/scripts/builtin/tomeklink.dml
index 18daafe..6169dbf 100644
--- a/scripts/builtin/tomeklink.dml
+++ b/scripts/builtin/tomeklink.dml
@@ -49,11 +49,20 @@ return (Matrix[Double] X_under, Matrix[Double] y_under, 
Matrix[Double] drop_idx)
 
   tomek_links = get_links(X, y, majority_label)
   drop_idx = tomek_links * seq(1, nrow(X)) 
-  X_under = removeEmpty(target=X, margin="rows", select = (tomek_links == 0))
-  y_under = removeEmpty(target=y, margin="rows", select = (tomek_links == 0))
-  drop_idx = removeEmpty(target=drop_idx, margin="rows", select = tomek_links)
-  if(ymin)
-    y = y - 1
+  if(sum(tomek_links == 0) > 0)
+  {
+    X_under = removeEmpty(target=X, margin="rows", select = (tomek_links == 0))
+    y_under = removeEmpty(target=y, margin="rows", select = (tomek_links == 0))
+    drop_idx = removeEmpty(target=drop_idx, margin="rows", select = 
tomek_links)
+  }
+  else
+  {
+    X_under = X
+    y_under = y
+    drop_idx = as.matrix(NaN)
+  }
+  if(ymin == 0)
+    y_under = y_under - 1
 }
 
 # get the nearest neighbour index
@@ -61,7 +70,7 @@ get_nn = function(Matrix[Double] X)
 return (Matrix[Double] nn) {
   # TODO exchange manhatten by euclidean dist()?
   nn = matrix(0, rows = nrow(X), cols = 1)
-  parfor (i in 1:nrow(X)) {
+  for (i in 1:nrow(X)) {
     dists = rowSums((X - X[i,])^2) 
     dists[i,] = NaN; # mask out self-ref
     nn[i, 1] = rowIndexMin(t(dists))
diff --git a/scripts/builtin/topk_cleaning.dml 
b/scripts/builtin/topk_cleaning.dml
index e3f1998..c4d8cf9 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -23,18 +23,110 @@ source("scripts/pipelines/scripts/utils.dml") as utils;
 source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
 
 
-s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest, 
Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives, 
Frame[Unknown] parameters, 
-  Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String 
evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, 
-  Integer resource_val = 20, Double sample = 0.1, Boolean isLastLabel = TRUE)
-  return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, 
Matrix[Double] topKScores, Frame[Unknown] bestLogical, Frame[Unknown] features, 
Double dirtyScore)
+s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = 
as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] 
primitives,
+  Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, 
cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, 
+  Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk 
= 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output)
+  return(Boolean perf)
+  # return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, 
Matrix[Double] topKScores, Frame[Unknown] bestLogical,
+  # Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
 {
+  Xtest = as.frame("0")
+  Ytest = as.frame("0")
   print("starting topk_cleaning")
-  dirtyScore = 100
+  
+  [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
+
+  # # keeping the meta list format if we decide to add more stuff in metadata
+  metaList = list(mask=mask, schema=schema, fd=fdMask)
+  
+  # separate the label
+  [Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel)
+  if(!cv)
+    [Xtest, Ytest] = getLabel(dataTest, isLastLabel)
+
+  # always recode the label 
+  if(maskY == 1) {
+    [eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, 
recode:[1]}");
+    eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", 
meta=M);
+  }
+  else
+  {
+    eYtrain = as.matrix(Ytrain)
+    eYtest = as.matrix(Ytest)
+  }
+
+  # # # when the evaluation function is called first we also compute and keep 
hyperparams of target application
+  [dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, 
Ytest=eYtest, evaluationFunc=evaluationFunc, 
+    metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, 
cvk=cvk)
+  
+  # # do the string processing
+  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, 
correctTypos)
+  
+  # # if mask has 1s then there are categorical features
+  [eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode")
+  
+  # apply sampling on training data for pipeline enumeration
+  [eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE)
+
+  # # # create logical pipeline seeds
+  logicalSeedCI =  frame([
+                   "4", "ED", "MVI", "OTLR", "EC", "0", "0", "0", "0",
+                   "4", "ED", "MVI", "CI", "DUMMY","0","0", "0", "0", 
+                   "4", "OTLR", "EC", "CI", "DUMMY", "0", "0","0", "0",
+                   "6", "MVI", "OTLR", "ED", "EC", "CI", "DUMMY", "0", "0",
+                   "4", "ED",  "MVI",  "CI", "DUMMY", "0", "0", "0", "0",
+                   "4", "MVI", "SCALE", "CI", "DUMMY", "0", "0", "0", "0", 
+                   "4", "ED", "EC", "CI", "DUMMY", "0", "0", "0", "0",
+                   "4", "MVI", "OTLR", "CI", "DUMMY", "0", "0", "0", "0",
+                   "5", "MVI", "OTLR", "EC", "CI", "DUMMY", "0", "0", "0",
+                   "7", "ED", "MVI", "OTLR", "EC", "SCALE", "CI", "DUMMY", "0"
+                   ], rows=10, cols=9)  
+                   
+  logicalSeedNoCI = frame([
+                   "4", "ED", "MVI", "OTLR", "EC", "0", "0",
+                   "3", "ED", "MVI", "DUMMY", "0","0","0", 
+                   "3", "OTLR", "EC", "DUMMY", "0","0","0",
+                   "5", "MVI", "OTLR", "ED", "EC", "DUMMY", "0", 
+                   "3", "ED",  "MVI", "DUMMY", "0", "0", "0",
+                   "3", "MVI", "SCALE", "DUMMY", "0", "0", "0", 
+                   "3", "ED", "EC", "DUMMY", "0", "0", "0",
+                   "3", "MVI", "OTLR", "DUMMY", "0", "0", "0", 
+                   "4", "MVI", "OTLR", "EC", "DUMMY", "0", "0", 
+                   "6", "ED", "MVI", "OTLR", "EC", "SCALE", "DUMMY"
+                   ], rows=10, cols=7) 
+                   
+  tab = table(eYtrain, 1)
+  dist = nrow(tab)
+  if((nrow(eYtrain) > 0 & dist < 10))
+    logical = logicalSeedCI
+  else 
+    logical = logicalSeedNoCI
+
+  # category = frame(["MVI", "OTLR"], rows=1, cols=2)
+  idx = as.integer(as.scalar(logical[1, 1])) + 1
+
+  category = logical[1, 2:idx]
+  [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, 
Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, 
population=logical[2:nrow(logical)],
+    max_iter=ceil(resource_val/topK), metaList = metaList, 
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, 
+    primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, 
cvk=cvk, verbose=TRUE)
+  # # # bestLogical = frame(["MVI", "CI", "SCALE"], rows=1, cols=3)
+
+  topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); 
topKScores = matrix(0,0,0); features = as.frame("NULL")
+  
+  # # [topKPipelines, topKHyperParams, topKScores, features] = 
+  perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, 
Y_test=eYtest,  metaList=metaList,
+    evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, 
primitives=primitives, param=parameters, baseLineScore=dirtyScore,
+    k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE);  
+}
+
+prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData)
+return(Frame[String] schema, Matrix[Double] mask, Matrix[Double] fdMask, 
Integer maskY)
+{
   if(as.scalar(metaData[1, 1]) == "NULL")
   {
     print("creating meta data")
-    r1 = detectSchema(dataTrain)
-    r2 = matrix(0, rows=1, cols=ncol(dataTrain))
+    r1 = detectSchema(data)
+    r2 = matrix(0, rows=1, cols=ncol(data))
     for(i in 1 : ncol(r1))
     {
       if(as.scalar(r1[1, i]) == "STRING" | as.scalar(r1[1, i]) == "BOOLEAN")
@@ -43,125 +135,88 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, 
Frame[Unknown] dataTest, Fr
     schema = r1[, 1:ncol(r1) - 1]
     mask = r2[, 1:ncol(r2) - 1]
     fdMask = r2[, 1:ncol(r2) - 1]
-    maskY = as.scalar(r2[,ncol(r2)])
+    maskY = as.integer(as.scalar(r2[,ncol(r2)]))
   }
   else {
     schema = metaData[1, 1:ncol(metaData) - 1]
     mask = as.matrix(metaData[2, 1:ncol(metaData) - 1])
     fdMask = as.matrix(metaData[3, 1:ncol(metaData) - 1])
-    maskY = as.scalar(metaData[2, ncol(metaData)])
+    maskY = as.integer(as.scalar(metaData[2, ncol(metaData)]))
   }
-  # # keeping the meta list format if we decide to add more stuff in metadata
-  metaList = list(mask=mask, schema=schema, fd=fdMask)
-  
-  # separate the label
+}
+
+getLabel = function(Frame[Unknown] data, Boolean isLastLabel)
+return(Frame[Unknown] X, Frame[Unknown] Y)
+{
   if(isLastLabel) {
-    Xtrain = dataTrain[, 1:ncol(dataTrain) - 1]
-    labeltrain = dataTrain[, ncol(dataTrain)]
-    Xtest = dataTest[, 1:ncol(dataTest) - 1]
-    labeltest = dataTest[, ncol(dataTest)]
-    # always recode the label
-    if(maskY == 1) {
-      [Ytrain, M] = transformencode(target=labeltrain, spec= "{ids:true, 
recode:[1]}");
-      Ytest = transformapply(target=labeltest, spec= "{ids:true, recode:[1]}", 
meta=M);
-    }
-    else
-    {
-      Ytrain = as.matrix(labeltrain)
-      Ytest = as.matrix(labeltest)
-    }
+    X = data[, 1:ncol(data) - 1]
+    Y = data[, ncol(data)]
   }
   else 
   {
-    Xtrain = dataTrain
-    Ytrain = as.matrix(0)
-    Xtest = dataTest
-    Ytest = as.matrix(0)
+    X = data
+    Y = as.frame("0")
   }
-  
-  
-  # # do the string processing
-  X_train_dirty = Xtrain
-  X_test_dirty = Xtest
-  Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, 
CorrectTypos=FALSE)
-  Xtest = utils::stringProcessing(data=Xtest, mask=mask, schema=schema, 
CorrectTypos=FALSE)
-  
-  # # if mask has 1s then there are categorical features
+}
+
+runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, 
Frame[String] schema,
+  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE)
+return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
+{
+  if(cv)
+    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, 
CorrectTypos=correctTypos)
+  else
+  {
+    # # # binding train and test to use same dictionary for both
+    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, 
schema=schema, CorrectTypos=correctTypos)
+    Xtrain = XAll[1:nrow(Xtrain),]
+    Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
+  }
+}
+
+getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] 
Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList,
+  Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, 
Integer cvk)
+return(Double dirtyScore, Matrix[Double] evalFunHp)
+{
+  mask = as.matrix(metaList['mask']) 
+  [eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode")
+  eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0)
+  eXtest = replace(target=eXtest, pattern=NaN, replacement = 0)
+  dirtyScore = 100
+  # # # sample data
+  [eXtrain, Ytrain] =  utils::doSample(eXtrain, Y, sample, TRUE)
+  [eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, 
cv, "dummycode")
+  pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = 
as.matrix(0), flags = 0)
+  if(cv)
+  {
+    score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, 
pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
+  }
+  else 
+  {
+    score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, 
Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1))
+  }
+
+  dirtyScore = as.scalar(score[1, 1])
+  evalFunHp = score[1, 2:ncol(score)]
+  # evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)]
+}
+
+recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, 
Matrix[Double] mask, Boolean cv, String code)
+return(Matrix[Double] eXtrain, Matrix[Double] eXtest)
+{
   if(sum(mask) > 0)
   {
     index = vectorToCsv(mask)
-    jspecR = "{ids:true, recode:["+index+"]}"
+    jspecR = "{ids:true, "+code+":["+index+"]}"
     [eXtrain, X_meta] = transformencode(target=Xtrain, spec=jspecR);
-    eXtest = transformapply(target=Xtest, spec=jspecR, meta=X_meta);
-    [eX_train_dirty, X_meta_dirty] = transformencode(target=X_train_dirty, 
spec=jspecR);
-    eX_test_dirty = transformapply(target=X_test_dirty, spec=jspecR, 
meta=X_meta_dirty);
-    
+    if(!cv)
+      eXtest = transformapply(target=Xtest, spec=jspecR, meta=X_meta);
+    else eXtest = as.matrix(Xtest)
   } 
   # if no categorical value exist then just cast the frame into matrix
   else {
     eXtrain = as.matrix(Xtrain)
-    eX_train_dirty = as.matrix(X_train_dirty)
     eXtest = as.matrix(Xtest)
-    eX_test_dirty = as.matrix(X_test_dirty)
   }
-  # take the sample
-  [eXtrain, Ytrain] = utils::doSample(eXtrain, Ytrain, sample, TRUE)
-  [eX_train_dirty, Ytrain] = utils::doSample(eX_train_dirty, Ytrain, sample, 
FALSE)
-  # # allData = rbind(eX_train_dirty)
-  # # eX_train_dirty = utils::dummycoding(eX_train_dirty, mask)
-  # # eX_test_dirty = utils::dummycoding(eX_test_dirty, mask)
-  # get the dirty score
-  scoreAndHp = eval(evaluationFunc, list(X=eX_train_dirty, Y=Ytrain, 
Xtest=eX_test_dirty, Ytest=Ytest, Xorig=as.matrix(0), metaList=metaList, 
evalFunHp=evalFunHp, trainML=1))
-  dirtyScore = as.scalar(scoreAndHp[1, 1])
-  evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)]
-
-  logicalSeedCI =  frame([
-                   "4", "MVI", "OTLR", "CI", "SCALE", "0",
-                   "1", "MVI", "0", "0", "0", "0", 
-                   "1", "OTLR", "0", "0", "0", "0", 
-                   "1", "CI", "0", "0", "0", "0", 
-                   "2", "MVI", "CI", "0", "0", "0", 
-                   "2", "MVI", "OTLR", "0", "0", "0",
-                   "2", "MVI", "SCALE", "0", "0", "0", 
-                   "3", "MVI", "SCALE", "OTLR", "0", "0", 
-                   "4", "OTLR", "MVI", "CI", "SCALE", "0",
-                   "5", "MVI", "OTLR", "MVI", "CI", "SCALE"
-                   ], rows=10, cols=6)  
-                   
-  logicalSeedNoCI =  frame([
-                   "3", "MVI", "OTLR", "SCALE", "0", 
-                   "1", "MVI", "0", "0", "0", 
-                   "1", "OTLR", "0", "0", "0", 
-                   "2", "MVI", "OTLR", "0", "0",
-                   "2", "MVI", "SCALE", "0", "0", 
-                   "3", "MVI", "SCALE", "OTLR", "0",
-                   "3", "OTLR", "MVI", "SCALE", "0",
-                   "4", "MVI", "OTLR", "MVI", "SCALE"
-                   ], rows=8, cols=5) 
-                   
-  tab = table(Ytrain, 1)
-  dist = nrow(tab)
-  if((nrow(Ytrain) > 0 & dist < 10))
-    logical = logicalSeedCI
-  else 
-    logical = logicalSeedNoCI
-
-  # category = frame(["MVI", "OTLR"], rows=1, cols=2)
-  idx = as.integer(as.scalar(logical[1, 1])) + 1
-
-  category = logical[1, 2:idx]
-  print("sending ytest in enumLog: \n"+toString(Ytest, rows=5))
-
-  [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=Ytrain, 
Xtest=eXtest, ytest=Ytest, cmr=cmr, cat=category, population=logical,
-    max_iter=ceil(resource_val/topK), metaList = metaList, 
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, 
-    primitives=primitives, param=parameters, num_inst = nrow(primitives), 
num_exec = topK, verbose=TRUE)
-
-  topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); 
topKScores = matrix(0,0,0); features = as.frame("NULL")
-  
-  [topKPipelines, topKHyperParams, topKScores, features] = 
bandit(X_train=eXtrain, Y_train=Ytrain, X_test=eXtest, Y_test=Ytest,  
metaList=metaList,
-    evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, 
primitives=primitives, param=parameters, baseLineScore=dirtyScore,
-    k=topK, R=resource_val, verbose=FALSE);
 }
 
-
-
diff --git a/scripts/pipelines/properties/param.csv 
b/scripts/pipelines/properties/param.csv
index 254a0d3..fc454c4 100644
--- a/scripts/pipelines/properties/param.csv
+++ b/scripts/pipelines/properties/param.csv
@@ -1,16 +1,18 @@
-name,param_no,maskFlag,FDFlag,yFlag,verboseFlag,dataFlag,dataType,ranges,st1,en1,st2,en1,en2,st3,en3
-outlierByIQR,3,0,0,0,1,0,FP,INT,INT,1,5,1,2,1,10
-outlierBySd,3,0,0,0,1,0,FP,INT,INT,1,5,1,2,1,10
-winsorize,0,0,0,0,1,0,,,,,,,,,
-imputeByMean,0,1,0,0,0,2,,,,,,,,,
-imputeByMedian,0,1,0,0,0,2,,,,,,,,,
-mice,2,1,0,0,1,2,INT,FP,1,3,0.5,1.0,,,
-abstain,1,0,0,1,1,2,FP,0.6,0.8,,,,,,
-SMOTE,1,1,0,1,1,2,INT,100,500,,,,,,
-downSample,0,0,0,1,0,2,,,,,,,,,
-m_pca,3,0,0,0,0,2,INT,BOOL,BOOL,100,200,0,1,0,0
-fillDefault,0,0,0,0,0,2,,,,,,,,,
-dummycoding,0,1,0,0,0,2,,,,,,,,,
-scale,2,0,0,0,0,0,BOOL,BOOL,0,1,0,1,,,
-forward_fill,1,0,0,0,1,2,BOOL,0,1,,,,,,
-imputeByFd,1,0,1,0,0,2,FP,0.55,1,,,,,,
\ No newline at end of file
+name,param_no,maskFlag,FDFlag,yFlag,verboseFlag,dataFlag,dt1,dt2,dt3,dt4,st1,en1,st2,en2,st3,en3,st4,en4
+outlierByIQR,3,0,0,0,1,0,FP,INT,INT,1,7,2,2,1,1,,,
+outlierBySd,3,0,0,0,1,0,INT,INT,INT,1,7,1,2,2,1,,,
+winsorize,0,0,0,0,1,0,,,,,,,,,,,,
+normalize,0,0,0,0,0,0,,,,,,,,,,,,
+imputeByMean,0,1,0,0,0,2,,,,,,,,,,,,
+imputeByMedian,0,1,0,0,0,2,,,,,,,,,,,,
+mice,2,1,0,0,1,2,INT,FP,1,3,0.5,1,,,,,,
+abstain,1,0,0,1,1,2,FP,0.6,0.8,,,,,,,,,
+SMOTE,1,1,0,1,1,2,INT,100,500,,,,,,,,,
+m_pca,3,0,0,0,0,2,INT,BOOL,BOOL,100,200,0,1,0,0,,,
+ppca,4,0,0,0,1,2,INT,INT,FP,FP,100,200,1,10,1.00E-09,1.00E-06,1.00E-02,1.00E-01
+fillDefault,0,0,0,0,0,2,,,,,,,,,,,,
+dummycoding,0,1,0,0,0,2,,,,,,,,,,,,
+scale,2,0,0,0,0,0,BOOL,BOOL,0,1,0,1,,,,,,
+forward_fill,1,0,0,0,1,2,BOOL,0,1,,,,,,,,,
+imputeByFd,1,0,1,0,0,2,FP,0.6,0.9,,,,,,,,,
+wtomeklink,0,0,0,1,0,2,,,,,,,,,,,,
diff --git a/scripts/pipelines/properties/primitives.csv 
b/scripts/pipelines/properties/primitives.csv
index da2bec8..962acc3 100644
--- a/scripts/pipelines/properties/primitives.csv
+++ b/scripts/pipelines/properties/primitives.csv
@@ -1,7 +1,7 @@
-OTLR,MVI,NR,CI,DIM,DUMMY,SCALE
-winsorize,imputeByMean,abstain,SMOTE,m_pca,dummycoding,scale
-outlierBySd,imputeByMedian,,,,,
-outlierByIQR,mice,,,,,
-,fillDefault,,,,,
-,imputeByFd,,,,,
-,forward_fill,,,,,
\ No newline at end of file
+ED,MVI,OTLR,EC,SCALE,CI,DUMMY,DIM
+imputeByFd,imputeByMean,winsorize,imputeByMean,scale,abstain,dummycoding,m_pca
+outlierBySd,imputeByMedian,outlierBySd,imputeByMedian,,wtomeklink,,ppca
+outlierByIQR,mice,outlierByIQR,fillDefault,,SMOTE,,
+,fillDefault,,,,,,
+,imputeByFd,,,,,,
+,forward_fill,,,,,,
diff --git a/scripts/pipelines/properties/testPrimitives.csv 
b/scripts/pipelines/properties/testPrimitives.csv
index 048b5b1..3ce4b97 100644
--- a/scripts/pipelines/properties/testPrimitives.csv
+++ b/scripts/pipelines/properties/testPrimitives.csv
@@ -1,3 +1,3 @@
-OTLR,MVI,NR,CI,DIM,DUMMY,SCALE
-winsorize,imputeByMean,abstain,SMOTE,m_pca,dummycoding,scale
-outlierBySd,imputeByMedian,,,,,
\ No newline at end of file
+ED,MVI,OTLR,EC,SCALE,CI,DUMMY,DIM
+outlierBySd,imputeByMean,winsorize,imputeByMean,scale,SMOTE,dummycoding,m_pca
+outlierByIQR,imputeByMedian,outlierBySd,imputeByMedian,,wtomeklink,,ppca
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml 
b/scripts/pipelines/scripts/enumerateLogical.dml
index 2319dd8..0d07a45 100644
--- a/scripts/pipelines/scripts/enumerateLogical.dml
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -52,7 +52,7 @@
 source("scripts/builtin/bandit.dml") as bandit;
 enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] 
Xtest, Matrix[Double] ytest, Matrix[Double] cmr, Frame[Unknown] cat, 
Frame[Unknown] population,
   Integer max_iter=10, List[Unknown] metaList, String evaluationFunc, 
Matrix[Double] evalFunHp, Frame[Unknown] primitives, Frame[Unknown] param,
-  Integer num_inst, Integer num_exec, Boolean verbose)
+  Integer num_inst, Integer num_exec, Boolean cv=FALSE, Boolean cvk=3, Boolean 
verbose)
 return (Frame[Unknown] bestLg, Double pre_best, Double T)
 { 
   t1 = time()
@@ -90,10 +90,11 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
 
       # # execute the physical instances and store the minimum scores, each 
pipeline is executed num_exec times
       [outPip,outHp, feaFrameOuter] = bandit::run_with_hyperparam(lp, 
physicalConf, num_exec, X, y, Xtest, ytest, metaList,
-        evaluationFunc, evalFunHp, param, as.frame(""), verbose)
+        evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, verbose)
       # # sort the configurations groupwise
       max_perf =  bandit::getMaxPerConf(outPip, nrow(physicalConf)) 
       scores[i] = as.matrix(max_perf[1, 1])
+      print("scores: \n"+toString(scores))
     }
     
     # # select parents and best score
@@ -114,10 +115,12 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
       idxC = as.integer(as.scalar(population[idxR, 1])) + 1
       bestLg = population[idxR, 2:idxC]
     }
-    pipLength = max(as.matrix(population[, 1])) + as.scalar(cmr[1, 1]) + 1
+    pipLength = max(as.matrix(population[, 1])) + as.scalar(cmr[1, 1]) + 3
     # # # if new best is not better than pre_best then no need od generating 
new population
     children = frame(0, rows=ceil(nrow(scores)/2), cols=pipLength)
     i = 1
+    print(i <= ceil(nrow(scores)/2))
+    print(converged)
     while(i <= ceil(nrow(scores)/2) & !converged)
     {
       top = population[as.scalar(selected[i]), ]
@@ -136,9 +139,10 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
       c1 = removal(c1, as.scalar(cmr[1, 3]))
 
       # # # append length of pipeline and pipeline in frame
+      # # 
+      print("problem kia he apka")
       children[i, 1] = ncol(c1)
       children[i, 2:(ncol(c1) + 1)] = c1
-
       i = i + 1
     }
     population = children
@@ -161,7 +165,7 @@ return (Frame [Unknown] child)
   for(i in 1:addCount)
   {
     c = as.scalar(sample(ncol(allOps), 1))
-    place_to_add = as.scalar(sample(ncol(top)+1, 1))
+    place_to_add = as.scalar(sample(ncol(top)-2, 1))
     if(place_to_add == 1)
       child = cbind(allOps[1, c], top)
     else if(place_to_add >= ncol(top))
@@ -180,9 +184,9 @@ return (Frame [Unknown] mChild)
 {
   print("Starting mutation on "+toString(child))
   random = as.scalar(rand(rows=1, cols=1))
-  if(random > mutationRate & ncol(child) >= 2)
+  if(random > mutationRate & ncol(child) >= 3)
   {
-    r = sample(ncol(child), 2)
+    r = sample(ncol(child) - 2, 2)
     r1 = as.scalar(r[1,1])
     r2 = as.scalar(r[2,1])
     temp = child[1, r1]
@@ -195,12 +199,12 @@ return (Frame [Unknown] mChild)
 removal = function(Frame[Unknown] child, Integer removal)
 return (Frame[Unknown] output)
 {
-  if(ncol(child) > 1 & ncol(child) > removal & removal > 0)
+  if(ncol(child) > 2 & (ncol(child)-2) > removal & removal > 0)
   {
     print("Starting removal on "+toString(child))
     for(i in 1:removal)
     {
-      idx = as.scalar(sample(ncol(child), 1))
+      idx = as.scalar(sample(ncol(child)-3, 1))
       if(idx == 1)
         ch = child[, 2:ncol(child)]
       else if (idx == ncol(child))
@@ -211,5 +215,5 @@ return (Frame[Unknown] output)
     }
   }
   output = child
-  print("Starting removal on "+toString(output))
+  print("ended removal on "+toString(output))
 }
diff --git a/scripts/pipelines/scripts/utils.dml 
b/scripts/pipelines/scripts/utils.dml
index bdbffeb..d2916f1 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -135,9 +135,6 @@ return (Matrix[Double] dX_train) {
 }
 
 
-
-
-
 #####################################
 # The function will check if the pipeline have zero hyper-parameters
 # then it should not use more resource iterations and should be executed once
@@ -157,67 +154,6 @@ return(Boolean validForResources)
   validForResources = count > 0
 }
 
-
-
-
-
-# # ######################################################################
-# # # # Function for cross validation using hold out method
-# # # # Inputs: The input dataset X, Y and the value of k validation, mask of 
the 
-# # # # dataset for OHE of categorical columns, vector of ML hyper-parameters 
identified 
-# # # # via gridsearch and a boolean value of (un)weighted accuracy.
-# # # # Output: It return a matrix having the accuracy of each fold.
-# # ######################################################################
-
-crossVML = function(Matrix[double] X, Matrix[double] y, Integer k, 
Matrix[Double] MLhp) 
-return (Matrix[Double] accuracyMatrix)
-{
-  accuracyMatrix = matrix(0, k, 1)
-  dataList = list()
-  testL = list()
-  data = order(target = cbind(y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
-  classes = table(data[, 1], 1)
-  ins_per_fold = classes/k
-  start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
-  fold_idxes = cbind(start_fold, ins_per_fold)
-
-  start_i = 0; end_i = 0; idx_fold = 1;;
-  for(i in 1:k)
-  {
-    fold_i = matrix(0, 0, ncol(data))
-    start=0; end=0; 
-    for(j in 1:nrow(classes))
-    {
-      idx = as.scalar(classes[j, 1])
-      start = end + 1;
-      end = end + idx
-      class_j =  data[start:end, ]
-      start_i = as.scalar(fold_idxes[j, 1]);
-      end_i = as.scalar(fold_idxes[j, 2])
-      fold_i = rbind(fold_i, class_j[start_i:end_i, ])
-    }
-    dataList = append(dataList, fold_i)
-    fold_idxes[, 1] = fold_idxes[, 2] + 1
-    fold_idxes[, 2] += ins_per_fold
-  }
-
-  for(i in seq(1,k))
-  {
-    [trainList, hold_out] = remove(dataList, i)
-    trainset = rbind(trainList)
-    testset = as.matrix(hold_out)
-    trainX = trainset[, 2:ncol(trainset)]
-    trainy = trainset[, 1]
-    testX = testset[, 2:ncol(testset)]
-    testy = testset[, 1]
-    beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(MLhp[1,1]), 
reg=as.scalar(MLhp[1,2]), tol=as.scalar(MLhp[1,3]), 
-    maxi=as.scalar(MLhp[1,4]), maxii=50, verbose=FALSE);
-    [prob, yhat, accuracy] = multiLogRegPredict(testX, beta, testy, FALSE)
-    accuracyMatrix[i] = accuracy
-  }
-
-}
-
 stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, 
Frame[String] schema, Boolean CorrectTypos)
 return(Frame[Unknown] processedData)
 {
@@ -260,6 +196,7 @@ return(Frame[Unknown] processedData)
     # print("after correctTypos "+toString(data, rows=5))
   }
   
+  data = map(data, "x -> PorterStemmer.stem(x)")
   # TODO add deduplication
   processedData = data
 }
@@ -398,8 +335,7 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double] 
y, Matrix[Double] Xt
       }
       
       Rloss[i,] = mean(accuracyMatrix)
-    }
-  
+    }  
   }
   # without cross-validation
   else {
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java 
b/src/main/java/org/apache/sysds/common/Builtins.java
index 8ea0bd6..7788b94 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -46,6 +46,7 @@ public enum Builtins {
        ALS_DS("alsDS", true),
        ALS_PREDICT("alsPredict", true),
        ALS_TOPK_PREDICT("alsTopkPredict", true),
+       APPLY_PIPELINE("applyAndEvaluate", true),
        ARIMA("arima", true),
        ASIN("asin", false),
        ATAN("atan", false),
diff --git 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinExecutePipelineTest.java
 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinExecutePipelineTest.java
new file mode 100644
index 0000000..0aa92a0
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinExecutePipelineTest.java
@@ -0,0 +1,55 @@
+package org.apache.sysds.test.functions.pipelines;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BuiltinExecutePipelineTest extends AutomatedTestBase {
+
+       private final static String TEST_NAME1 = "executePipelineTest";
+       private final static String TEST_CLASS_DIR = SCRIPT_DIR + 
BuiltinExecutePipelineTest.class.getSimpleName() + "/";
+
+       private static final String RESOURCE = 
SCRIPT_DIR+"functions/pipelines/";
+       private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
+
+       private final static String DIRTY = DATA_DIR+ "dirty.csv";
+       private final static String META = RESOURCE+ "meta/meta_census.csv";
+
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1,new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
+       }
+
+       @Test
+       public void testEvalPipClass() {
+               execPip(Types.ExecMode.SINGLE_NODE);
+       }
+
+       private void execPip(Types.ExecMode et) {
+
+               setOutputBuffering(true);
+               String HOME = SCRIPT_DIR+"functions/pipelines/" ;
+               Types.ExecMode modeOld = setExecMode(et);
+               try {
+                       loadTestConfiguration(getTestConfiguration(TEST_NAME1));
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[] {"-stats", "-exec", 
"singlenode", "-args", DIRTY, META, output("O")};
+
+                       runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+                       //expected loss smaller than default invocation
+                       
Assert.assertTrue(TestUtils.readDMLBoolean(output("O")));
+               }
+               finally {
+                       resetExecMode(modeOld);
+               }
+       }
+
+
+       public static void main(String[] args) {
+               String s = null;
+               System.out.println("length is "+s.length());
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningClassificationTest.java
 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningClassificationTest.java
index 6c4b14c..7d95937 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningClassificationTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningClassificationTest.java
@@ -23,6 +23,7 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class BuiltinTopkCleaningClassificationTest extends AutomatedTestBase {
@@ -35,6 +36,7 @@ public class BuiltinTopkCleaningClassificationTest extends 
AutomatedTestBase {
 
        private final static String DIRTY = DATA_DIR+ "dirty.csv";
        private final static String META = RESOURCE+ "meta/meta_census.csv";
+       private final static String OUTPUT = RESOURCE+ 
"intermediates/classification/";
 
        private static final String PARAM_DIR = 
"./scripts/pipelines/properties/";
        private final static String PARAM = PARAM_DIR + "param.csv";
@@ -47,24 +49,23 @@ public class BuiltinTopkCleaningClassificationTest extends 
AutomatedTestBase {
 
        @Test
        public void testFindBestPipeline1() {
-               runtopkCleaning(0.1, 3,5,
-                       "FALSE", Types.ExecMode.SINGLE_NODE);
+               runtopkCleaning(0.5, 3,5,
+                       "FALSE", 0,0.8, Types.ExecMode.SINGLE_NODE);
        }
 
-       @Test
+       @Ignore
        public void testFindBestPipeline2() {
                runtopkCleaning(0.1, 3,5,
-                       "TRUE", Types.ExecMode.SINGLE_NODE);
+                       "TRUE", 3,0.8,  Types.ExecMode.SINGLE_NODE);
        }
 
        @Test
        public void testFindBestPipelineHybrid() {
                runtopkCleaning(0.1, 3,5,
-                       "FALSE", Types.ExecMode.HYBRID);
+                       "FALSE", 0,0.8,  Types.ExecMode.HYBRID);
        }
 
-
-       private void runtopkCleaning(Double sample, int topk, int resources,  
String cv, Types.ExecMode et) {
+       private void runtopkCleaning(Double sample, int topk, int resources,  
String cv, int cvk , double split, Types.ExecMode et) {
 
                setOutputBuffering(true);
                Types.ExecMode modeOld = setExecMode(et);
@@ -72,9 +73,9 @@ public class BuiltinTopkCleaningClassificationTest extends 
AutomatedTestBase {
                try {
                        loadTestConfiguration(getTestConfiguration(TEST_NAME));
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
-                       programArgs = new String[] {"-stats", "-exec", 
"singlenode", "-nvargs", "dirtyData="+DIRTY,
+                       programArgs = new String[] { "-stats", "-exec", 
"singlenode", "-nvargs", "dirtyData="+DIRTY,
                                "metaData="+META, "primitives="+PRIMITIVES, 
"parameters="+PARAM, "topk="+ topk, "rv="+ resources,
-                               "sample="+sample, "testCV="+cv, 
"O="+output("O")};
+                               "sample="+sample, "testCV="+cv, "cvk="+cvk, 
"split="+split, "output="+OUTPUT, "O="+output("O")};
 
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
index 45ab4c3..84f326f 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
@@ -32,7 +32,7 @@ public class BuiltinTopkCleaningRegressionTest extends 
AutomatedTestBase{
        private static final String RESOURCE = 
SCRIPT_DIR+"functions/pipelines/";
 
        private final static String DIRTY = DATASET_DIR+ "Salaries.csv";
-       private final static String OUTPUT = RESOURCE+"intermediates/";
+       private final static String OUTPUT = 
RESOURCE+"intermediates/regression/";
        private static final String PARAM_DIR = 
"./scripts/pipelines/properties/";
        private final static String PARAM = PARAM_DIR + "param.csv";
        private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
@@ -42,20 +42,21 @@ public class BuiltinTopkCleaningRegressionTest extends 
AutomatedTestBase{
                addTestConfiguration(TEST_NAME1,new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
        }
 
+// TODO: support CV for regression
        @Test
-       public void testRegressionPipelinesCP() {
-               runFindPipelineTest(1.0, 5,20, 10,
-                       "lm", Types.ExecMode.SINGLE_NODE);
+       public void testRegressionPipelinesCP1() {
+               runFindPipelineTest(1.0, 5,20, "FALSE", 3,
+                       0.8, Types.ExecMode.SINGLE_NODE);
        }
 
        @Test
        public void testRegressionPipelinesHybrid() {
-               runFindPipelineTest(1.0, 5,5, 2,
-                       "lm", Types.ExecMode.HYBRID);
+               runFindPipelineTest(1.0, 5,5, "FALSE", 3,
+                       0.8, Types.ExecMode.HYBRID);
        }
 
-       private void runFindPipelineTest(Double sample, int topk, int 
resources, int crossfold,
-               String target, Types.ExecMode et) {
+       private void runFindPipelineTest(Double sample, int topk, int 
resources, String crossfold,
+               int cvk, double split,  Types.ExecMode et) {
 
                setOutputBuffering(true);
                String HOME = SCRIPT_DIR+"functions/pipelines/" ;
@@ -64,8 +65,8 @@ public class BuiltinTopkCleaningRegressionTest extends 
AutomatedTestBase{
                        loadTestConfiguration(getTestConfiguration(TEST_NAME1));
                        fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
                        programArgs = new String[] {"-stats", "-exec", 
"singlenode", "-nvargs", "dirtyData="+DIRTY,
-                               "primitives="+PRIMITIVES, "parameters="+PARAM, 
"sampleSize="+ sample, "topk="+ topk,
-                               "rv="+ resources, "sample="+ sample, 
"output="+OUTPUT, "target="+target, "O="+output("O")};
+                               "primitives="+PRIMITIVES, "parameters="+PARAM, 
"sample="+ sample, "topk="+ topk,
+                               "rv="+ resources, "testCV="+ crossfold, 
"cvk="+cvk, "output="+OUTPUT, "split="+ split, "O="+output("O")};
 
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
similarity index 62%
copy from 
src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
copy to 
src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
index 45ab4c3..2476fb2 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkCleaningRegressionTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.sysds.test.functions.pipelines;
 
 import org.apache.sysds.common.Types;
@@ -23,19 +24,20 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class BuiltinTopkCleaningRegressionTest extends AutomatedTestBase{
-       private final static String TEST_NAME1 = "topkcleaningRegressionTest";
-       private final static String TEST_CLASS_DIR = SCRIPT_DIR + 
BuiltinTopkCleaningRegressionTest.class.getSimpleName() + "/";
+public class BuiltinTopkEvaluateTest extends AutomatedTestBase {
+       //      private final static String TEST_NAME1 = "prioritized";
+       private final static String TEST_NAME1 = "applyEvaluateTest";
+       private final static String TEST_CLASS_DIR = SCRIPT_DIR + 
BuiltinTopkEvaluateTest.class.getSimpleName() + "/";
 
        private static final String RESOURCE = 
SCRIPT_DIR+"functions/pipelines/";
+       private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
 
-       private final static String DIRTY = DATASET_DIR+ "Salaries.csv";
-       private final static String OUTPUT = RESOURCE+"intermediates/";
-       private static final String PARAM_DIR = 
"./scripts/pipelines/properties/";
-       private final static String PARAM = PARAM_DIR + "param.csv";
-       private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
+       private final static String DIRTY = DATA_DIR+ "dirty.csv";
+       private final static String META = RESOURCE+ "meta/meta_census.csv";
+       private final static String INPUT = RESOURCE+"intermediates/";
 
        @Override
        public void setUp() {
@@ -43,19 +45,11 @@ public class BuiltinTopkCleaningRegressionTest extends 
AutomatedTestBase{
        }
 
        @Test
-       public void testRegressionPipelinesCP() {
-               runFindPipelineTest(1.0, 5,20, 10,
-                       "lm", Types.ExecMode.SINGLE_NODE);
-       }
-
-       @Test
-       public void testRegressionPipelinesHybrid() {
-               runFindPipelineTest(1.0, 5,5, 2,
-                       "lm", Types.ExecMode.HYBRID);
+       public void testEvalPipClass() {
+               evalPip(0.8, "FALSE", INPUT+"/classification/", 
Types.ExecMode.SINGLE_NODE);
        }
 
-       private void runFindPipelineTest(Double sample, int topk, int 
resources, int crossfold,
-               String target, Types.ExecMode et) {
+       private void evalPip(double split, String cv, String path, 
Types.ExecMode et) {
 
                setOutputBuffering(true);
                String HOME = SCRIPT_DIR+"functions/pipelines/" ;
@@ -63,9 +57,8 @@ public class BuiltinTopkCleaningRegressionTest extends 
AutomatedTestBase{
                try {
                        loadTestConfiguration(getTestConfiguration(TEST_NAME1));
                        fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
-                       programArgs = new String[] {"-stats", "-exec", 
"singlenode", "-nvargs", "dirtyData="+DIRTY,
-                               "primitives="+PRIMITIVES, "parameters="+PARAM, 
"sampleSize="+ sample, "topk="+ topk,
-                               "rv="+ resources, "sample="+ sample, 
"output="+OUTPUT, "target="+target, "O="+output("O")};
+                       programArgs = new String[] {"-stats", "-exec", 
"singlenode", "-args", DIRTY, META, path, cv,
+                               String.valueOf(split), output("O")};
 
                        runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
 
diff --git a/src/test/scripts/functions/builtin/tomeklink.dml 
b/src/test/scripts/functions/builtin/tomeklink.dml
index 8ab9145..33ed3c9 100644
--- a/src/test/scripts/functions/builtin/tomeklink.dml
+++ b/src/test/scripts/functions/builtin/tomeklink.dml
@@ -23,5 +23,7 @@
 X = read($1)
 y = read($2)
 
+under = eval("tomeklink", list(X, y))
+print(toString(under, rows=5))
 [X_under, y_under, drop_idx] = tomeklink(X, y)
 write(drop_idx, $3) # sorted by default
diff --git a/src/test/scripts/functions/pipelines/applyEvaluateTest.dml 
b/src/test/scripts/functions/pipelines/applyEvaluateTest.dml
new file mode 100644
index 0000000..813ef94
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/applyEvaluateTest.dml
@@ -0,0 +1,89 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+##################################################################################################################
+# This script will read the dirty and clean data, then it will apply the best 
pipeline on dirty data
+# and then will classify both cleaned dataset and check if the cleaned dataset 
is performing same as original dataset
+# in terms of classification accuracy
+
+# Vocab = original data -> dataset without any noise, the original version 
with ground truths
+        # cleaned data -> dirty dataset cleaned by pipeline
+# read the items
+# dirty dataset F
+# clean dataset O
+# metadata (schema and mask)
+# best k pipelines and hyperparameters generated by previous script 
mainScript.dml
+
+# do the initial preprocessing like dropping invalid values so that pipeline 
could fix them
+# then recode the data to bring it into matrix format
+# then construct the hyper-parameters list and call the executePipeline() on 
the dirty dataset
+# for the comparison OHE the original dataset, there is no need to OHE the 
cleaned dataset because cleaning pipeline
+# has a primitive for this
+# Call the multilogReg on both of the datasets and compare accuracy on k=3 
cross validation
+######################################################################################################################
+
+source("scripts/pipelines/scripts/utils.dml") as utils;
+
+
+F = read($1, data_type="frame", format="csv", header=FALSE, 
+  naStrings= ["NA", "null","  ","NaN", "nan", "", "?", "99999"]);
+metaInfo = read($2, data_type="frame", format="csv", header=FALSE);  
+input = $3
+pip = read(input+"pip.csv", data_type="frame", format="csv", header=FALSE);
+hp = read(input+"hp.csv", data_type="matrix", format="csv", header=FALSE);
+lg = read(input+"lp.csv", data_type="frame", format="csv", header=FALSE);
+evalHp = read(input+"evalHp.csv", data_type="matrix", format="csv", 
header=FALSE);
+# dirtyScore = read(input+"dirtyScore.csv", data_type="scalar", 
value_type="double");
+cv = as.logical($4)
+trainTestSplit = as.double($5)
+metaInfo = metaInfo[, 2:ncol(metaInfo)]
+
+split = nrow(F) * trainTestSplit
+trainData = F[1:split,]
+testData = F[split+1:nrow(F),]
+
+
+result = applyAndEvaluate(trainData, testData, metaInfo, lg, pip[1,], hp[1,], 
"evalML", matrix("1 1e-3 1e-9 100", rows=1, cols=4), TRUE, FALSE)
+
+header = frame(["dirty acc", "train acc", "test acc"], rows=1, cols=3)
+result = as.frame(result)
+
+
+writeRes = rbind(header, result)
+print(toString(writeRes))
+
+result = as.scalar(result[1, 3] > result[1, 1])
+write(result, $6)
+
+# UDF for evaluation  
+# choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
+evalML = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, 
Matrix[Double] Ytest, Matrix[Double] Xorig=as.matrix(0),
+  Matrix[Double] evalFunHp, Boolean trainML = FALSE)
+  
+return(Matrix[Double] accuracy)
+{
+
+  beta = multiLogReg(X=X, Y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]), 
+    maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
+  [prob, yhat, a] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
+  accuracy = getAccuracy(Ytest, yhat, TRUE)
+  print("accuracy weighted: "+accuracy)
+  accuracy = as.matrix(accuracy)
+}
diff --git a/src/test/scripts/functions/pipelines/executePipelineTest.dml 
b/src/test/scripts/functions/pipelines/executePipelineTest.dml
new file mode 100644
index 0000000..cbfb2c6
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/executePipelineTest.dml
@@ -0,0 +1,101 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("scripts/pipelines/scripts/utils.dml") as utils;
+
+F = read($1, data_type="frame", format="csv", header=FALSE, 
+  naStrings= ["NA", "null","  ","NaN", "nan", "", "?", "99999"]);
+metaData = read($2, data_type="frame", format="csv", header=FALSE);  
+trainTestSplit = 0.7
+metaData = metaData[, 2:ncol(metaData)]
+F = F[1:100]
+split = nrow(F) * trainTestSplit
+trainData = F[1:split,]
+testData = F[split+1:nrow(F),]
+
+flagsCount = 5
+schema = metaData[1, 1:ncol(metaData) - 1]
+mask = as.matrix(metaData[2, 1:ncol(metaData) - 1])
+FD = as.matrix(metaData[3, 1:ncol(metaData) - 1])
+maskY = as.integer(as.scalar(metaData[2, ncol(metaData)]))
+metaList = list(mask=mask, schema=schema, fd=FD)
+
+# separate the label
+[Xtrain, Ytrain] = getLabel(trainData, TRUE)
+[Xtest, Ytest] = getLabel(testData, TRUE)
+    
+# always recode the label 
+[eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
+eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
+[eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, FALSE, "recode")
+
+
+lp = frame(["MVI", "CI"], rows=1, cols=2)
+pip = frame(["imputeByMean", "abstain"], rows=1, cols=2)
+hp = matrix("0.000 0.000 1.000 0.000 0.000 0.000 2.000
+            1.000 0.786 0.000 0.000 1.000 1.000 2.000", rows=2, cols=7)
+print("X unchanged "+sum(eXtrain))
+[eX, Y, Xtest, Ytest, tr] = executePipeline(lp, pip, eXtrain, eYtrain, eXtest, 
eYtest, metaList, hp,
+  as.matrix(0), as.matrix(0), flagsCount, TRUE, FALSE)
+
+trainEndIdx = (nrow(rbind(eXtrain, eXtest)) - nrow(eXtest))
+testStIdx = trainEndIdx + 1
+X = imputeByMean(rbind(eXtrain, eXtest), mask)
+eXtrain = X[1:trainEndIdx,]
+eXtest = X[testStIdx:nrow(X),]
+hX = abstain(eXtrain, eYtrain, 0.786, FALSE)
+
+equalX = (abs(eX - eXtrain) > 0.0001)
+result = sum(equalX) == 0
+write(result, $3)
+
+recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, 
Matrix[Double] mask, Boolean cv, String code)
+return(Matrix[Double] eXtrain, Matrix[Double] eXtest)
+{
+  if(sum(mask) > 0)
+  {
+    index = vectorToCsv(mask)
+    jspecR = "{ids:true, "+code+":["+index+"]}"
+    [eXtrain, X_meta] = transformencode(target=Xtrain, spec=jspecR);
+    if(!cv)
+      eXtest = transformapply(target=Xtest, spec=jspecR, meta=X_meta);
+    else eXtest = as.matrix(Xtest)
+  } 
+  # if no categorical value exist then just cast the frame into matrix
+  else {
+    eXtrain = as.matrix(Xtrain)
+    eXtest = as.matrix(Xtest)
+  }
+}
+
+getLabel = function(Frame[Unknown] data, Boolean isLastLabel)
+return(Frame[Unknown] X, Frame[Unknown] Y)
+{
+  if(isLastLabel) {
+    X = data[, 1:ncol(data) - 1]
+    Y = data[, ncol(data)]
+  }
+  else 
+  {
+    X = data
+    Y = as.frame("0")
+  }
+}
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/bestAcc.csv 
b/src/test/scripts/functions/pipelines/intermediates/classification/bestAcc.csv
new file mode 100644
index 0000000..6b9c512
--- /dev/null
+++ 
b/src/test/scripts/functions/pipelines/intermediates/classification/bestAcc.csv
@@ -0,0 +1,3 @@
+85.58558558558559
+82.88288288288288
+82.88288288288288
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/dirtyScore.csv
 
b/src/test/scripts/functions/pipelines/intermediates/classification/dirtyScore.csv
new file mode 100644
index 0000000..6339ce1
--- /dev/null
+++ 
b/src/test/scripts/functions/pipelines/intermediates/classification/dirtyScore.csv
@@ -0,0 +1 @@
+67.56756756756756
\ No newline at end of file
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/evalHp.csv 
b/src/test/scripts/functions/pipelines/intermediates/classification/evalHp.csv
new file mode 100644
index 0000000..9b09301
--- /dev/null
+++ 
b/src/test/scripts/functions/pipelines/intermediates/classification/evalHp.csv
@@ -0,0 +1 @@
+2.0,0.001,1.0E-5,1000.0
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/hp.csv 
b/src/test/scripts/functions/pipelines/intermediates/classification/hp.csv
new file mode 100644
index 0000000..b6b88f8
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/intermediates/classification/hp.csv
@@ -0,0 +1,3 @@
+36.0,3.0,3.0,2.0,1.0,0,0,0,1.0,0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,1.0,0,2.0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
+36.0,3.0,7.0,1.0,1.0,0,0,0,1.0,0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,1.0,0,2.0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
+36.0,3.0,1.0,1.0,1.0,0,0,0,1.0,0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,1.0,0,2.0,0,0,0,0,1.0,0,0,0,2.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/lp.csv 
b/src/test/scripts/functions/pipelines/intermediates/classification/lp.csv
new file mode 100644
index 0000000..ec8927d
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/intermediates/classification/lp.csv
@@ -0,0 +1 @@
+ED,EC,CI,DUMMY
diff --git 
a/src/test/scripts/functions/pipelines/intermediates/classification/pip.csv 
b/src/test/scripts/functions/pipelines/intermediates/classification/pip.csv
new file mode 100644
index 0000000..db5a7e6
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/intermediates/classification/pip.csv
@@ -0,0 +1,3 @@
+outlierBySd,imputeByMedian,wtomeklink,dummycoding
+outlierBySd,imputeByMedian,wtomeklink,dummycoding
+outlierBySd,imputeByMean,wtomeklink,dummycoding
diff --git a/src/test/scripts/functions/pipelines/topkLogicalTest.dml 
b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
index 2756a66..a52a40f 100644
--- a/src/test/scripts/functions/pipelines/topkLogicalTest.dml
+++ b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
@@ -69,37 +69,31 @@ getSchema = getSchema[, 1:ncol(getSchema) - 1] # strip the 
mask of class label
 metaList = list(mask=getMask, schema=getSchema, fd=as.matrix(0))
 
 logical =  frame([
-                   "1", "MVI", "0", "0", "0", "0", 
-                   # "1", "OTLR", "0", "0", "0", "0", 
-                   # "1", "CI", "0", "0", "0", "0", 
-                   # "2", "MVI", "CI", "0", "0", "0", 
-                   # "2", "MVI", "OTLR", "0", "0", "0",
-                   # "2", "MVI", "SCALE", "0", "0", "0", 
-                   # "3", "MVI", "SCALE", "OTLR", "0", "0",
-                   # "4", "MVI", "OTLR", "CI", "SCALE", "0", 
-                   # "4", "OTLR", "MVI", "CI", "SCALE", "0",
-                   "5", "MVI", "OTLR", "MVI", "CI", "SCALE"
-                   ], rows=2, cols=6)
-
-
-categories = frame(["MVI", "OTLR", "SCALE"], rows=1, cols=3)
+                 "7", "MVI", "OTLR", "ED", "EC", "CI", "DUMMY", "DIM", 
+                 "5", "ED",  "MVI",  "CI", "DUMMY", "DIM", "0", "0"
+                 ], rows=2, cols=8) 
+
+
+
+categories = frame(["ED", "MVI", "OTLR", "EC"], rows=1, cols=4)
 cmr = matrix("4 0.7 1", rows=1, cols=3)
 
 # doing holdout evaluation
-split = nrow(eX) * trainTestSplit
-trainX = eX[1:split,]
-trainY = eY[1:split,]
-testX = eX[split+1:nrow(eX),]
-testY = eY[split+1:nrow(eY),]
+
+[trainX, trainY, testX, testY] = splitBalanced(eX, eY, trainTestSplit, FALSE)
+# split = nrow(eX) * trainTestSplit
+# trainX = eX[1:split,]
+# trainY = eY[1:split,]
+# testX = eX[split+1:nrow(eX),]
+# testY = eY[split+1:nrow(eY),]
 
 
 [bestLogical, score, T] = lg::enumerateLogical(X=trainX, y=trainY, 
Xtest=testX, ytest=testY,  cmr=cmr, cat=categories, population=logical,
-    max_iter=max_iter, metaList = metaList, 
evaluationFunc="evalClassification", evalFunHp=matrix("1 1e-3 1e-9 100", 
rows=1, cols=4), 
-    primitives=primitives, param=param , num_inst=num_inst, num_exec=num_exec, 
verbose=TRUE)
+    max_iter=max_iter, metaList = metaList, evaluationFunc="evalML", 
evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, cols=4), 
+    primitives=primitives, param=param , num_inst=num_inst, num_exec=num_exec, 
cv=FALSE, verbose=TRUE)
 
 print("score of pipeline: "+toString(score)+" in "+(T/60000)+" mins")
 print("bestLogical "+toString(bestLogical))
-
 result = dirtyScore < score  
 print("result satisfied ------------"+result)
 
@@ -109,38 +103,22 @@ write(result , $O)
 
 # UDF for evaluation  
 # choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
-evalClassification = function(Matrix[Double] X, Matrix[Double] Y, 
Matrix[Double] Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig, List[Unknown] 
metaList,
-  Matrix[Double] evalFunHp, Integer trainML=0)
+# UDF for evaluation  
+# choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
+evalML = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, 
Matrix[Double] Ytest, Matrix[Double] Xorig=as.matrix(0),
+  Matrix[Double] evalFunHp, Boolean trainML = FALSE)
   
-return(Matrix[Double] output)
+return(Matrix[Double] accuracy)
 {
-  cv = 2
-  mask = as.matrix(metaList['mask'])
-  print("min and max of y in eval: "+min(Y)+" "+max(Y))
-  if(max(Y) == min(Y)) {
-    print("Y contains only one class")
-    accuracy = as.double(0)
-  }
-  else {
-    if(trainML == 1)
-    {
-      # do the gridsearch for hyper-parameters
-      params = list("icpt", "reg", "tol", "maxii")
-      paramRanges = list(seq(0, 2, 1), 10^seq(1,-4), 10^seq(1,-6), 
10^seq(1,3));
-
-      trainArgs = list(X=X, Y=Y, icpt=-1, reg=-1, tol=-1, maxi=100, maxii=-1, 
verbose=FALSE);
-      [B1, opt] = utils::topk_gridSearch(X=X, y=Y, Xtest=Xtest, ytest=Ytest, 
train="multiLogReg", predict="accuracy", numB=ncol(X)+1, cv=FALSE, cvk=cv,
-        params=params, paramValues=paramRanges, trainArgs=trainArgs, 
verbose=FALSE);
-      evalFunHp = as.matrix(opt)  
-    }
-
-     beta =  multiLogReg(X=X, Y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]),
-     maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
-    [prob, yhat, score] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
-
-  }
-  output = cbind(as.matrix(score), evalFunHp)
+
+  beta = multiLogReg(X=X, Y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]), 
+    maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
+  [prob, yhat, a] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
+  accuracy = getAccuracy(Ytest, yhat, TRUE)
+  print("accuracy weighted: "+accuracy)
+  accuracy = as.matrix(accuracy)
 }
+
 accuracy = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] B) 
return (Matrix[Double] err) {
   [M,yhat,acc] = multiLogRegPredict(X=X, B=B, Y=y, verbose=TRUE);
   err = as.matrix(1-(acc/100));
diff --git 
a/src/test/scripts/functions/pipelines/topkcleaningClassificationTest.dml 
b/src/test/scripts/functions/pipelines/topkcleaningClassificationTest.dml
index 79d56cd..2d7b57f 100644
--- a/src/test/scripts/functions/pipelines/topkcleaningClassificationTest.dml
+++ b/src/test/scripts/functions/pipelines/topkcleaningClassificationTest.dml
@@ -32,18 +32,22 @@ param = read($parameters, data_type = "frame", 
format="csv", header= TRUE)
 topK = $topk
 resources = $rv
 sample=$sample
+output=$output
 testCV = as.logical($testCV)
-trainTestSplit = 0.7
-
-if(testCV)
-  evalFunc = "evalClassificationCV"
+cvk = as.integer($cvk)
+trainTestSplit = as.double($split)
+evalFunc = "evalClassification"
+split = nrow(F) * trainTestSplit
+if(testCV) {
 
-else
-  evalFunc = "evalClassification"
+  trainData = F
+  testData = as.frame("0")
+}
+else {
 
-split = nrow(F) * trainTestSplit
-trainData = F[1:split,]
-testData = F[split+1:nrow(F),]
+  trainData = F[1:split,]
+  testData = F[split+1:nrow(F),]
+}
 
 if(nrow(metaInfo) < 2)
   stop("incomplete meta info")
@@ -51,171 +55,92 @@ if(nrow(metaInfo) < 2)
 metaInfo = metaInfo[, 2:ncol(metaInfo)]
 # # # split in train/test 70/30
 
-[topKPipelines, topKHyperParams, topKScores, bestLogical, features, 
dirtyScore] = topk_cleaning(trainData, testData, metaInfo, primitives, param, 
-  matrix("2 0.7 1", rows=1, cols=3), evalFunc, as.matrix("0"), topK, 
resources, sample, TRUE)
-  
-     
-print("dirty accuracy "+toString(dirtyScore))  
-print("best logical pipelines "+toString(bestLogical))  
-print("topk pipelines "+toString(topKPipelines))
-print("topk hyper params "+toString(topKHyperParams))
-print("topk  scores: \n"+toString(topKScores))
-perf = as.double(as.scalar(topKScores[1, 1])) - as.double(dirtyScore)
-print("performance improvement "+ perf)
-result = dirtyScore < as.scalar(topKScores[1, 1]) 
-write(result, $O)
+# [topKPipelines, topKHyperParams, topKScores, bestLogical, features, 
dirtyScore, evalHp] = 
+result = topk_cleaning(dataTrain=trainData, dataTest=testData, 
metaData=metaInfo, primitives=primitives, parameters=param,
+  cmr=matrix("2 0.7 1", rows=1, cols=3), evaluationFunc=evalFunc, 
evalFunHp=as.matrix(0),
+  topK=topK, resource_val=resources, cv=testCV, cvk=cvk, sample=sample, 
isLastLabel=TRUE, correctTypos=FALSE, output=output) 
 
+write(result, $O)
 
 
 # UDF for evaluation  
 # choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
-evalClassificationCV = function(Matrix[Double] X, Matrix[Double] Y, 
Matrix[Double] Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig, List[Unknown] 
metaList,
-  Matrix[Double] evalFunHp, Integer trainML=0)
+evalClassification = function(Matrix[Double] X, Matrix[Double] Y, 
Matrix[Double] Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig=as.matrix(0),
+  Matrix[Double] evalFunHp, Integer trainML)
   
 return(Matrix[Double] output)
 {
-  cv = 2
-  mask = as.matrix(metaList['mask'])
-  X = utils::dummycoding(replace(target = rbind(X, Xtest), pattern = NaN, 
replacement=1), mask)
-  Y = rbind(Y, Ytest)
-  
-  if(max(Y) == min(Y)) {
-    print("Y contains only one class")
-    accuracy = as.double(0)
-  }
-  else {
-    if(trainML == 1)
-    {
-      # do the gridsearch for hyper-parameters
-      params = list("icpt", "reg", "tol", "maxii")
-      paramRanges = list(seq(0, 2, 1), 10^seq(1,-4), 10^seq(1,-6), 
10^seq(1,3));
-      trainArgs = list(X=X, Y=Y, icpt=-1, reg=-1, tol=-1, maxi=100, maxii=-1, 
verbose=FALSE);
-      [B1, opt] = utils::topk_gridSearch(X=X, y=Y, train="multiLogReg", 
predict="W", numB=ncol(X)+1, cv=TRUE, cvk=cv,
-        params=params, paramValues=paramRanges, trainArgs=trainArgs, 
verbose=FALSE);
-      evalFunHp = as.matrix(opt)  
-    }
-
-    # do the k = 3 cross validations
-    # evalFunHpM = as.matrix(evalFunHp)
-    [accuracyMatrix] = crossV(X, Y, cv, evalFunHp, FALSE)
-    accuracyMatrix = removeEmpty(target=accuracyMatrix, margin="rows")
-    score = mean(accuracyMatrix)
-    print(cv +" validation accuracy "+score)
-  }
-  output = cbind(as.matrix(score), evalFunHp)
-
-}
-
-# # ######################################################################
-# # # # Function for cross validation using hold out method
-# # # # Inputs: The input dataset X, Y and the value of k validation, mask of 
the 
-# # # # dataset for OHE of categorical columns, vector of ML hyper-parameters 
identified 
-# # # # via gridsearch and a boolean value of (un)weighted accuracy.
-# # # # Output: It return a matrix having the accuracy of each fold.
-# # ######################################################################
-
-crossV = function(Matrix[double] X, Matrix[double] y, Integer k, 
Matrix[Double] MLhp, Boolean isWeighted) 
-return (Matrix[Double] accuracyMatrix)
-{
-  accuracyMatrix = matrix(0, k, 1)
-  dataList = list()
-  testL = list()
-  data = order(target = cbind(y, X),  by = 1, decreasing=FALSE, 
index.return=FALSE)
-  classes = table(data[, 1], 1)
-  ins_per_fold = classes/k
-  start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
-  fold_idxes = cbind(start_fold, ins_per_fold)
-
-  start_i = 0; end_i = 0; idx_fold = 1;;
-  for(i in 1:k)
+  print("trainML: "+as.integer(trainML))
+  if(trainML == 1)
   {
-    fold_i = matrix(0, 0, ncol(data))
-    start=0; end=0; 
-    for(j in 1:nrow(classes))
-    {
-      idx = as.scalar(classes[j, 1])
-      start = end + 1;
-      end = end + idx
-      class_j =  data[start:end, ]
-      start_i = as.scalar(fold_idxes[j, 1]);
-      end_i = as.scalar(fold_idxes[j, 2])
-      fold_i = rbind(fold_i, class_j[start_i:end_i, ])
-    }
-    dataList = append(dataList, fold_i)
-    fold_idxes[, 1] = fold_idxes[, 2] + 1
-    fold_idxes[, 2] += ins_per_fold
-  }
-
-  for(i in seq(1,k))
-  {
-    [trainList, hold_out] = remove(dataList, i)
-    trainset = rbind(trainList)
-    testset = as.matrix(hold_out)
-    trainX = trainset[, 2:ncol(trainset)]
-    trainy = trainset[, 1]
-    testX = testset[, 2:ncol(testset)]
-    testy = testset[, 1]
-    beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(MLhp[1,1]), 
reg=as.scalar(MLhp[1,2]), tol=as.scalar(MLhp[1,3]), 
-    maxi=as.scalar(MLhp[1,4]), maxii=50, verbose=FALSE);
-    [prob, yhat, a] = multiLogRegPredict(testX, beta, testy, FALSE)
-    accuracy = getAccuracy(testy, yhat, isWeighted)
-    accuracyMatrix[i] = accuracy
+    print("training")
+    params = list("icpt", "reg", "tol", "maxii")
+    paramRanges = list(seq(0, 2, 1), 10^seq(1,-3), 10^seq(1,-5), 10^seq(1,3));
+    trainArgs = list(X=X, Y=Y, icpt=-1, reg=-1, tol=-1, maxi=100, maxii=-1, 
verbose=FALSE);
+    [B1, opt] = utils::topk_gridSearch(X=X, y=Y, Xtest=Xtest, ytest=Ytest, 
train="multiLogReg", predict="accuracy", numB=ncol(X)+1, cv=FALSE, cvk=0,
+      params=params, paramValues=paramRanges, trainArgs=trainArgs, 
verbose=FALSE);
+    evalFunHp = as.matrix(opt)  
   }
+  beta = multiLogReg(X=X, Y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]), 
+    maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
+  [prob, yhat, accuracy] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
+  print("accuracy a: "+toString(accuracy))
+  a = getAccuracy(Ytest, yhat, TRUE)
+  print("accuracy weighted: "+a)
+  accuracy = as.matrix(accuracy)
+  output = cbind(accuracy, evalFunHp)
+  print("output: "+toString(output))
 }
 
 # UDF for evaluation  
 # choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
-evalClassification = function(Matrix[Double] X, Matrix[Double] Y, 
Matrix[Double] Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig, List[Unknown] 
metaList,
-  Matrix[Double] evalFunHp, Integer trainML=0)
-  
+evalClassificationOLd = function(Matrix[Double] X, Matrix[Double] Y, 
Matrix[Double] Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig, List[Unknown] 
pipList, List[Unknown] metaList,
+  Matrix[Double] evalFunHp, Integer trainML=0) 
 return(Matrix[Double] output)
 {
+  score = as.double(0)
   mask = as.matrix(metaList['mask'])
-  if(sum(mask) > 0)
-  {
-    X = replace(target=X, pattern=NaN, replacement=1)
-    Xtest = replace(target=Xtest, pattern=NaN, replacement=1)
-    idx = vectorToCsv(mask)
-    # specifications for one-hot encoding of categorical features
-    jspecDC = "{ids:true, dummycode:["+idx+"]}";
-    # OHE of categorical features
-    [dX, dM] = transformencode(target=as.frame(rbind(X,Xtest)), spec=jspecDC);
-    X = dX[1:nrow(X),]
-    Xtest = dX[nrow(X)+1:nrow(dX),]
-  }
-
+  cv = FALSE
   print("cols in X and Xtest: "+ncol(X)+" "+ncol(Xtest))
   if(ncol(X) != ncol(Xtest))
     stop("Dimension mismatch: number of columns and train and test are not 
equal")
-  cv = 2
 
+  if(trainML == 1)
+  {
+    # do the gridsearch for hyper-parameters
+    params = list("icpt", "reg", "tol", "maxii")
+    paramRanges = list(seq(0, 2, 1), 10^seq(1,-3), 10^seq(1,-5), 10^seq(1,3));
+    trainArgs = list(X=X, Y=Y, icpt=-1, reg=-1, tol=-1, maxi=100, maxii=-1, 
verbose=FALSE);
+    [B1, opt] = utils::topk_gridSearch(X=X, y=Y, Xtest=Xtest, ytest=Ytest, 
train="multiLogReg", predict="accuracy", numB=ncol(X)+1, cv=FALSE, cvk=0,
+      params=params, paramValues=paramRanges, trainArgs=trainArgs, 
verbose=FALSE);
+    evalFunHp = as.matrix(opt)  
+  }
+  # do the hold out train/test
+  # evalFunHpM = as.matrix(evalFunHp)
+  if(as.scalar(pipList['flags']) != 0)
+  {
+    [X, Y, Xtest, Ytest, Tr] = executePipeline(as.frame(pipList['lp']), 
as.frame(pipList['ph']), X, Y, Xtest, Ytest, as.matrix(metaList['mask']), 
as.matrix(metaList['fd']),
+      as.matrix(pipList['hp']), as.scalar(pipList['flags']), TRUE, FALSE)
+  }
   print("min and max of y in eval: "+min(Y)+" "+max(Y))
   if(max(Y) == min(Y)) {
     print("Y contains only one class")
-    accuracy = as.double(0)
   }
   else {
-    if(trainML == 1)
-    {
-      # do the gridsearch for hyper-parameters
-      params = list("icpt", "reg", "tol", "maxii")
-      paramRanges = list(seq(0, 2, 1), 10^seq(1,-4), 10^seq(1,-6), 
10^seq(1,3));
-      trainArgs = list(X=rbind(X, Xtest), y=rbind(Y, Ytest), Xtest=Xtest, 
ytest=Ytest, icpt=-1, reg=-1, tol=-1, maxi=100, maxii=-1, verbose=FALSE);
-      [B1, opt] = utils::topk_gridSearch(X=X, y=Y, train="multiLogReg", 
predict="W", numB=ncol(X)+1, cv=TRUE, cvk=cv,
-        params=params, paramValues=paramRanges, trainArgs=trainArgs, 
verbose=FALSE);
-      evalFunHp = as.matrix(opt)  
-    }
-
-    # do the hold out train/test
-    # evalFunHpM = as.matrix(evalFunHp)
     beta =  multiLogReg(X=X, Y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]),
-     maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
-
-    [prob, yhat, score] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
+      maxi=as.scalar(evalFunHp[1,4]), maxii=50, verbose=FALSE);
 
+    [prob, yhat, acc] = multiLogRegPredict(Xtest, beta, Ytest, FALSE)
+    score = getAccuracy(Ytest, yhat, TRUE)
   }
-  output = cbind(as.matrix(score), evalFunHp)
-  print("hold out accuracy: "+score)
 
-} 
+  output = cbind(as.matrix(acc), evalFunHp)
+  print("hold out accuracy: "+acc)
+  print("hold out waccuracy: "+score)
+
+}
 
+accuracy = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] B) 
return (Matrix[Double] err) {
+  [M,yhat,acc] = multiLogRegPredict(X=X, B=B, Y=y, verbose=TRUE);
+  err = as.matrix(1-(acc/100));
+}
\ No newline at end of file
diff --git 
a/src/test/scripts/functions/pipelines/topkcleaningRegressionTest.dml 
b/src/test/scripts/functions/pipelines/topkcleaningRegressionTest.dml
index fc664c0..b1f0310 100644
--- a/src/test/scripts/functions/pipelines/topkcleaningRegressionTest.dml
+++ b/src/test/scripts/functions/pipelines/topkcleaningRegressionTest.dml
@@ -18,77 +18,54 @@
 # under the License.
 #
 #-------------------------------------------------------------
+
 source("scripts/pipelines/scripts/utils.dml") as utils;
 
 # read the inputs
 F = read($dirtyData, data_type="frame", format="csv", header=TRUE, 
   naStrings= ["NA", "null","  ","NaN", "nan", "", "?", "99999"]);
-# only for salaries data
-F = F[, 2:ncol(F)]
-# metaInfo = read($metaData, data_type="frame", format="csv", header=FALSE);
+F = F[,2:ncol(F)]
 primitives = read($primitives, data_type = "frame", format="csv", header= TRUE)
 param = read($parameters, data_type = "frame", format="csv", header= TRUE)
 topK = $topk
 resources = $rv
 sample=$sample
+output=$output
+testCV = as.logical($testCV)
+trainTestSplit = as.double($split)
+cvk = as.integer($cvk)
+
+split = nrow(F) * trainTestSplit
+  evalFunc = "evalRegression"
+if(testCV) {
+  trainData = F[1:split,]
+  testData = as.frame("0")
+}
+else {
+  trainData = F[1:split,]
+  testData = F[split+1:nrow(F),]
+}
 
-split = nrow(F) * 0.7
-trainData = F[1:split,]
-testData = F[split+1:nrow(F),]
+# # # split in train/test 70/30
 
-[topKPipelines, topKHyperParams, topKScores, bestLogical, features, 
dirtyScore] = topk_cleaning(dataTrain=trainData, dataTest=testData,  
primitives=primitives, parameters=param, 
-  cmr=matrix("4 0.7 1", rows=1, cols=3), evaluationFunc="evalRegression", 
evalFunHp=as.matrix("0"), topK=topK, resource_val=resources, sample=sample, 
isLastLabel=TRUE)
+result = topk_cleaning(dataTrain=trainData, dataTest=testData, 
+  primitives=primitives, parameters=param, cmr=matrix("2 0.7 1", rows=1, 
cols=3), evaluationFunc=evalFunc, evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, 
cols=4),
+  topK=topK, resource_val=resources, cv=testCV, cvk=cvk, sample=sample, 
isLastLabel=TRUE, correctTypos=FALSE, output=output)
   
      
-print("dirty accuracy "+toString(dirtyScore))  
-print("best logical pipelines "+toString(bestLogical))  
-print("topk pipelines "+toString(topKPipelines))
-print("topk hyper params "+toString(topKHyperParams))
-print("topk  scores: \n"+toString(topKScores))
-perf = as.scalar(topKScores[1, 1]) - dirtyScore
-print("performce improvemnet "+ perf)
-result = dirtyScore < as.scalar(topKScores[1, 1]) 
 write(result, $O)
 
 
-
 # UDF for evaluation  
 # choice of parameters provided by API, X, Y, clone_X, evalFunHp 
(hyper-param), trainML (boolean for optimizing hp internally or passed by 
externally )
-evalRegression = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] 
Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig, List[Unknown] metaList,
-  Matrix[Double] evalFunHp, Integer trainML=0)
-  
-return(Matrix[Double] output)
+evalRegression = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] 
Xtest, Matrix[Double] Ytest, Matrix[Double] Xorig=as.matrix(0),
+  Matrix[Double] evalFunHp, Boolean trainML = FALSE)
+return(Matrix[Double] accuracy)
 {
-  cv = 2
-  mask = as.matrix(metaList['mask'])
-
-  X = utils::dummycoding(replace(target = rbind(X, Xtest), pattern = NaN, 
replacement=1), mask)
-  Y = rbind(Y, Ytest)
-
-  if(max(Y) == min(Y)) {
-    print("Y contains only one class")
-    accuracy = as.double(0)
-  }
-  else {
-    if(trainML == 1)
-    {
-      # do the gridsearch for hyper-parameters
-      params = list("icpt","reg", "tol", "maxi");
-      paramRanges = list(seq(0,2),10^seq(0,-4), 10^seq(-6,-12), 10^seq(1,3));
-      [B1, opt] = utils::topk_gridSearch(X=X, y=Y, train="lm", predict="wmape",
-        numB=ncol(X)+1, cv=TRUE, params=params, paramValues=paramRanges, 
verbose=FALSE);
-      evalFunHp = as.matrix(opt)  
-    }
-
-    # do the k = 3 cross validations
-    # evalFunHpM = as.matrix(evalFunHp)
-    [accuracyMatrix] = crossV(X, Y, cv, evalFunHp)
-    accuracyMatrix = removeEmpty(target=accuracyMatrix, margin="rows")
-    score =  mean(accuracyMatrix)
-    print(cv +" validation accuracy "+score)
-  }
-  output = cbind(as.matrix(score), evalFunHp)
-
+  beta = lm(X=X, y=Y, icpt=as.scalar(evalFunHp[1,1]), 
reg=as.scalar(evalFunHp[1,2]), tol=as.scalar(evalFunHp[1,3]), 
+    maxi=as.scalar(evalFunHp[1,4]));
+  acc = wmape(Xtest, Ytest, beta, as.scalar(evalFunHp[1,1]))
+  accuracy = (1 - acc)
 }
 
 wmape = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] B, Integer 
icpt) return (Matrix[Double] loss) {
@@ -97,42 +74,3 @@ wmape = function(Matrix[Double] X, Matrix[Double] y, 
Matrix[Double] B, Integer i
   WMAPE = sum(abs(y - pred))/sum(abs(y)) #this will give the lose into range 
of [0,1]
   loss = as.matrix(WMAPE) 
 }
-
-
-
-crossV = function(Matrix[Double] X, Matrix[Double] y, Integer k, 
Matrix[Double] hp) return (Matrix[Double] accuracyMatrix) 
-{
-  icpt = as.scalar(hp[1, 1])
-  reg = as.scalar(hp[1, 2])
-  tol = as.scalar(hp[1, 3])
-  maxi = as.scalar(hp[1, 4])
-  M = nrow(X);
-  lim = floor(as.integer(M/k));
-  accuracyMatrix = matrix(0, rows=k, cols=1)
-
-       for (i in 1:k)
-  {
-    testS = ifelse(i==1, 1, ((i-1) * lim)+1)
-    testE = i * lim;
-    testSet = X[testS:testE,];
-    testRes = y[testS:testE,];
-
-    if (i == 1) {
-      trainSet = X[testE+1:M,];
-      trainRes = y[testE+1:M,];
-    }
-    else if(i == k)
-    {
-      trainSet = X[1:testS-1,];
-      trainRes = y[1:testS-1,];
-    }
-    else {
-      trainSet = rbind(X[1:testS-1,], X[testE+1:M,]);
-      trainRes = rbind(y[1:testS-1,], y[testE+1:M,]);
-    }
-    beta = lm(X=trainSet, y=trainRes, icpt=icpt, reg=reg, tol=tol, maxi=maxi);
-    acc = wmape(testSet, testRes, beta, icpt)
-    accuracyMatrix[i] = (1 - acc)
-  }
-}
- 
\ No newline at end of file

Reply via email to