This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 57c1643dcb4d94e4c21aba0f87143abdab02e819 Author: Matthias Boehm <[email protected]> AuthorDate: Sat Sep 4 23:01:22 2021 +0200 [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I --- scripts/builtin/topk_cleaning.dml | 55 ++++++++++-------- scripts/pipelines/scripts/cleaning.dml | 4 +- scripts/pipelines/scripts/utils.dml | 65 +++++++++------------- .../pipelines/BuiltinTopkEvaluateTest.java | 4 +- 4 files changed, 61 insertions(+), 67 deletions(-) diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml index c4d8cf9..d9bdc93 100644 --- a/scripts/builtin/topk_cleaning.dml +++ b/scripts/builtin/topk_cleaning.dml @@ -22,7 +22,6 @@ source("scripts/pipelines/scripts/utils.dml") as utils; source("scripts/pipelines/scripts/enumerateLogical.dml") as lg; - s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives, Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output) @@ -30,15 +29,18 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a # return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical, # Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp) { + t1 = time(); print("TopK-Cleaning:"); + Xtest = as.frame("0") Ytest = as.frame("0") - print("starting topk_cleaning") + ctx = list(prefix="----"); #TODO include seed - [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData) - + # prepare meta data # # keeping the meta list format if we decide to add more stuff in metadata + [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData) metaList = list(mask=mask, schema=schema, fd=fdMask) - + t2 = time(); print("-- Cleaning - Prepare Metadata: "+(t2-t1)/1e9+"s"); + # separate the label [Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel) if(!cv) @@ -49,24 +51,31 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a [eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}"); eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M); } - else - { + else { eYtrain = as.matrix(Ytrain) eYtest = as.matrix(Ytest) } + t3 = time(); print("-- Cleaning - Prepare Labels: "+(t3-t2)/1e9+"s"); # # # when the evaluation function is called first we also compute and keep hyperparams of target application + print("-- Cleaning - Get Dirty Score: "); [dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, evaluationFunc=evaluationFunc, - metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk) - + metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk, ctx=ctx) + t4 = time(); print("---- finalized in: "+(t4-t3)/1e9+"s"); + # # do the string processing - [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos) + print("-- Cleaning - Data Preparation (strings, transform, sample): "); + [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos, ctx) # # if mask has 1s then there are categorical features + print("---- feature transformations to numeric matrix"); [eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode") # apply sampling on training data for pipeline enumeration + # TODO why recoding/sampling twice (within getDirtyScore) + print("---- class-stratified sampling of feature matrix w/ f="+sample); [eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE) + t5 = time(); print("---- finalized in: "+(t5-t4)/1e9+"s"); # # # create logical pipeline seeds logicalSeedCI = frame([ @@ -109,7 +118,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)], max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE) - # # # bestLogical = frame(["MVI", "CI", "SCALE"], rows=1, cols=3) + t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s"); topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL") @@ -117,6 +126,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest, metaList=metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore, k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE); + t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s"); } prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData) @@ -160,45 +170,46 @@ return(Frame[Unknown] X, Frame[Unknown] Y) } runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Frame[String] schema, - Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE) + Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE, List[Unknown] ctx) return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest) { if(cv) - Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos) + Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx) else { # # # binding train and test to use same dictionary for both - XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos) + XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx) Xtrain = XAll[1:nrow(Xtrain),] Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),] } } getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList, - Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk) + Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk, List[Unknown] ctx=list() ) return(Double dirtyScore, Matrix[Double] evalFunHp) { + prefix = as.scalar(ctx["prefix"]); mask = as.matrix(metaList['mask']) [eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode") eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0) eXtest = replace(target=eXtest, pattern=NaN, replacement = 0) dirtyScore = 100 - # # # sample data + print(prefix+" sample from train data and dummy code"); [eXtrain, Ytrain] = utils::doSample(eXtrain, Y, sample, TRUE) [eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, cv, "dummycode") pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = as.matrix(0), flags = 0) - if(cv) - { - score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1) + + print(prefix+" hyper-parameter tuning"); + if(cv) { + score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, + pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1) } - else - { + else { score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1)) } dirtyScore = as.scalar(score[1, 1]) evalFunHp = score[1, 2:ncol(score)] - # evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)] } recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Matrix[Double] mask, Boolean cv, String code) diff --git a/scripts/pipelines/scripts/cleaning.dml b/scripts/pipelines/scripts/cleaning.dml index 4557f07..73200bd 100644 --- a/scripts/pipelines/scripts/cleaning.dml +++ b/scripts/pipelines/scripts/cleaning.dml @@ -97,7 +97,7 @@ startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String target paramRanges = list(10^seq(0,-10), seq(10,100, 10)); [opt, loss] = gridSearchMLR(X_train, y_train, X_test, y_test, - "multiLogReg", "lossFunc", params, paramRanges, FALSE); + "multiLogReg", "lossFunc", params, paramRanges, FALSE); d_accuracy = classifyDirty(X_train, y_train, opt, getMask, isWeighted, cv) # [eX, eY] = prioritise(eX, eY, getMask) @@ -493,7 +493,6 @@ crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double] Matrix[Double] MLhp, Boolean isWeighted) return (Matrix[Double] accuracyMatrix) { - accuracyMatrix = matrix(0, k, 1) dataList = list() @@ -526,7 +525,6 @@ return (Matrix[Double] accuracyMatrix) dataList = append(dataList, fold_i) fold_idxes[, 1] = fold_idxes[, 2] + 1 fold_idxes[, 2] += ins_per_fold - while(FALSE){} } for(i in seq(1,k)) diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml index f6d3d01..05d22a8 100644 --- a/scripts/pipelines/scripts/utils.dml +++ b/scripts/pipelines/scripts/utils.dml @@ -60,24 +60,26 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean { MIN_SAMPLE = 1000 sampled = floor(nrow(eX) * ratio) - sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE) - dist = table(eY, 1) - dist = nrow(dist) - if(sample) + sampledX = eX + sampledY = eY + + if(sampled > MIN_SAMPLE) { + dist = max(eY) # num classes (one-hot encoded eY) + if((nrow(eY) > 1) & (dist < 10)) # for classification { XY = order(target = cbind(eY, eX), by = 1, decreasing=FALSE, index.return=FALSE) - # get the class count + # get the class count classes = table(eY, 1) + # TODO vectorize extraction compute extraction vector start_class = 1 out_s = 1 out_e = 0 end_class = 0 out = matrix(0, sampled, ncol(XY)) classes_ratio = floor(classes*ratio) - for(i in 1:nrow(classes)) - { + for(i in 1:nrow(classes)) { end_class = end_class + as.scalar(classes[i]) class_t = XY[start_class:end_class, ] out_e = out_e + as.scalar(classes_ratio[i]) @@ -89,28 +91,15 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean sampledY = out[, 1] sampledX = out[, 2:ncol(out)] } - else if(nrow(eY) > 1 & (dist > 10)) # regression - { + else if(nrow(eY) > 1 & (dist > 10)) { # regression sampledX = eX[1:sampled, ] sampledY = eY[1:sampled, ] } - else if(nrow(eY) == 1) - { + else if(nrow(eY) == 1) { # TODO ? sampledX = eX[1:sampled, ] sampledY = eY } - else { - sampledX = eX - sampledY = eY - } } - else - { - sampledX = eX - sampledY = eY - } - if(verbose) - print("AFTER SAMPLING: "+nrow(eX)) } # ####################################################################### @@ -154,29 +143,26 @@ return(Boolean validForResources) validForResources = count > 0 } -stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, Frame[String] schema, Boolean CorrectTypos) +stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, + Frame[String] schema, Boolean CorrectTypos, List[Unknown] ctx = list(prefix="--")) return(Frame[Unknown] processedData) { + prefix = as.scalar(ctx["prefix"]); # step 1 drop invalid types + print(prefix+" drop values with type mismatch"); data = dropInvalidType(data, schema) # step 2 do the case transformations + print(prefix+" convert strings to lower case"); for(i in 1:ncol(mask)) - { if(as.scalar(schema[1,i]) == "STRING") - { - lowerCase = map(data[, i], "x -> x.toLowerCase()") - data[, i] = lowerCase - } - - } - + data[, i] = map(data[, i], "x -> x.toLowerCase()") + if(CorrectTypos) { - # recode data to get null mask - if(sum(mask) > 0) - { + # recode data to get null mask + if(sum(mask) > 0) { # always recode the label index = vectorToCsv(mask) jspecR = "{ids:true, recode:["+index+"]}" @@ -186,18 +172,19 @@ return(Frame[Unknown] processedData) else eX = as.matrix(data) nullMask = is.na(eX) - print("starting correctTypos ") + print(prefix+" correct typos in strings"); # fix the typos for(i in 1:ncol(schema)) - { if(as.scalar(schema[1,i]) == "STRING") data[, i] = correctTypos(data[, i], nullMask[, i], 0.2, 0.9, FALSE, TRUE, FALSE); - } - # print("after correctTypos "+toString(data, rows=5)) } + print(prefix+" porter-stemming on all features"); data = map(data, "x -> PorterStemmer.stem(x)") + # TODO add deduplication + print(prefix+" deduplication via entity resolution"); + processedData = data } @@ -238,7 +225,7 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xt # Step 2) materialize hyper-parameter combinations # (simplify debugging and compared to compute negligible) HP = matrix(0, numConfigs, numParams); - for( i in 1:nrow(HP) ) { + parfor( i in 1:nrow(HP) ) { for( j in 1:numParams ) HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)]; } diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java index acfd032..a5bb997 100644 --- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java +++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java @@ -49,9 +49,7 @@ public class BuiltinTopkEvaluateTest extends AutomatedTestBase { } private void evalPip(double split, String cv, String path, Types.ExecMode et) { - - setOutputBuffering(true); - String HOME = SCRIPT_DIR+"functions/pipelines/" ; + String HOME = SCRIPT_DIR+"functions/pipelines/"; Types.ExecMode modeOld = setExecMode(et); try { loadTestConfiguration(getTestConfiguration(TEST_NAME1));
