This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push: new 8cbc85a [SYSTEMDS-253] Distributed slice finding (task/data parallel, fixes) 8cbc85a is described below commit 8cbc85a949b3699cde8ed3cf3e3abec6a27fbc60 Author: gilgenbergg <lanasv...@gmail.com> AuthorDate: Sun May 3 17:17:57 2020 +0200 [SYSTEMDS-253] Distributed slice finding (task/data parallel, fixes) Closes #881. --- docs/Tasks.txt | 3 +- scripts/staging/hmm/HMM.py | 2 - scripts/staging/slicing/__init__.py | 0 scripts/staging/slicing/base/Bucket.py | 168 +++++++++++++++++++++ .../staging/slicing/base/{node.py => SparkNode.py} | 79 ++++++---- scripts/staging/slicing/base/__init__.py | 0 scripts/staging/slicing/base/node.py | 28 +++- scripts/staging/slicing/base/slicer.py | 135 +++++++++-------- .../base/tests/classification/test_adult.py | 101 ------------- .../slicing/base/tests/classification/test_iris.py | 88 ----------- .../base/tests/regression/test_insurance.py | 81 ---------- .../slicing/base/tests/regression/test_salary.py | 87 ----------- scripts/staging/slicing/base/top_k.py | 7 +- scripts/staging/slicing/base/union_slicer.py | 78 ++++------ .../slicing/spark_modules/join_data_parallel.py | 120 +++++++++++++++ .../staging/slicing/spark_modules/spark_slicer.py | 100 ++++++++++++ .../slicing/spark_modules/spark_union_slicer.py | 70 +++++++++ .../staging/slicing/spark_modules/spark_utils.py | 141 +++++++++++++++++ .../slicing/spark_modules/union_data_parallel.py | 119 +++++++++++++++ scripts/staging/slicing/tests/__init__.py | 0 .../slicing/tests/classification/__init__.py | 0 .../slicing/tests/classification/sparked_adults.py | 118 +++++++++++++++ .../slicing/tests/classification/test_adult.py | 121 +++++++++++++++ .../slicing/tests/classification/test_iris.py | 109 +++++++++++++ .../staging/slicing/tests/regression/__init__.py | 0 .../slicing/tests/regression/bd_spark_salary.py | 131 ++++++++++++++++ .../slicing/tests/regression/spark_salary.py | 123 +++++++++++++++ .../slicing/tests/regression/test_insurance.py | 103 +++++++++++++ .../slicing/tests/regression/test_salary.py | 104 +++++++++++++ 29 files changed, 1717 insertions(+), 499 deletions(-) diff --git a/docs/Tasks.txt b/docs/Tasks.txt index 9d2dba4..9fa9a6f 100644 --- a/docs/Tasks.txt +++ b/docs/Tasks.txt @@ -203,7 +203,8 @@ SYSTEMDS-240 GPU Backend Improvements SYSTEMDS-250 Extended Slice Finding * 251 Alternative slice enumeration approach OK - * 252 Initial data slicing implementation Python + * 252 Initial data slicing implementation Python OK + * 253 Distributed slicing algorithms (task/data parallel) OK SYSTEMDS-260 Misc Tools * 261 Stable marriage algorithm OK diff --git a/scripts/staging/hmm/HMM.py b/scripts/staging/hmm/HMM.py index 61fa0d0..d9eb187 100644 --- a/scripts/staging/hmm/HMM.py +++ b/scripts/staging/hmm/HMM.py @@ -19,8 +19,6 @@ # #------------------------------------------------------------- -#Author: Afan Secic - from bs4 import BeautifulSoup,SoupStrainer import nltk from nltk.tokenize import sent_tokenize, word_tokenize diff --git a/scripts/staging/slicing/__init__.py b/scripts/staging/slicing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/staging/slicing/base/Bucket.py b/scripts/staging/slicing/base/Bucket.py new file mode 100644 index 0000000..0277f6d --- /dev/null +++ b/scripts/staging/slicing/base/Bucket.py @@ -0,0 +1,168 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +class Bucket: + + key: [] + attributes: [] + name: "" + error: float + e_upper: float + size: float + sum_error: float + max_tuple_error: float + s_upper: float + s_lower: float + e_max: float + e_max_upper: float + score: float + c_upper: float + parents: [] + x_size: int + loss: float + w: float + + def __init__(self, node, cur_lvl, w, x_size, loss): + self.attributes = [] + self.parents = [] + self.sum_error = 0 + self.size = 0 + self.score = 0 + self.error = 0 + self.max_tuple_error = 0 + self.x_size = x_size + self.w = w + self.loss = loss + if cur_lvl == 0: + self.key = node + self.attributes.append(node) + else: + self.attributes = node.attributes + self.key = node.attributes + self.name = self.make_name() + self.__hash__() + + def __hash__(self): + return hash(self.name) + + def __add__(self, other): + self.size += other.size + self.sum_error += other.sum_error + return self + + def combine_with(self, other): + self.size = max(self.size, other.size) + self.sum_error = max(self.sum_error, other.sum_error) + return self + + def minimize_bounds(self, other): + minimized = min(self.s_upper, other.s_upper) + self.s_upper = minimized + minimized = min(other.s_lower, self.s_lower) + self.s_lower = minimized + minimized = min(other.e_upper, self.e_upper) + self.e_upper = minimized + minimized = min(other.e_max_upper, self.e_max_upper) + self.e_max_upper = minimized + c_upper = self.calc_c_upper(self.w, self.x_size, self.loss) + minimized = min(c_upper, self.c_upper) + self.c_upper = minimized + + def __eq__(self, other): + return ( + self.__hash__ == other.__hash__ and self.key == other.key) + + def update_metrics(self, row, loss_type): + self.size += 1 + if loss_type == 0: + self.sum_error += row[2] + if row[2] > self.max_tuple_error: + self.max_tuple_error = row[2] + else: + if row[2] != 0: + self.sum_error += 1 + + def calc_error(self): + if self.size != 0: + self.error = self.sum_error / self.size + else: + self.error = 0 + self.e_max = self.max_tuple_error + self.e_max_upper = self.e_max + self.e_upper = self.error + + def check_constraint(self, top_k, x_size, alpha): + return self.score >= top_k.min_score and self.size >= x_size / alpha + + def make_name(self): + name = "" + for attribute in self.attributes: + name = name + str(attribute) + " && " + name = name[0: len(name) - 4] + return name + + def calc_bounds(self, w, x_size, loss): + self.s_upper = self.calc_s_upper() + self.s_lower = self.calc_s_lower(x_size) + self.e_upper = self.calc_e_upper() + self.e_max_upper = self.calc_e_max_upper() + self.c_upper = self.calc_c_upper(w, x_size, loss) + + def check_bounds(self, x_size, alpha, top_k): + return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score + + def calc_s_upper(self): + cur_min = self.parents[0].size + for parent in self.parents: + cur_min = min(cur_min, parent.s_upper) + return cur_min + + def calc_e_max_upper(self): + e_max_min = self.parents[0].e_max_upper + for parent in self.parents: + e_max_min = min(e_max_min, parent.e_max_upper) + return e_max_min + + def calc_s_lower(self, x_size): + size_value = x_size + for parent in self.parents: + size_value = size_value - (size_value - parent.s_lower) + return max(size_value, 1) + + def calc_e_upper(self): + prev_e_uppers = [] + for parent in self.parents: + prev_e_uppers.append(parent.e_upper) + return float(min(prev_e_uppers)) + + def calc_c_upper(self, w, x_size, loss): + upper_score = w * (self.e_upper / self.s_lower) / (loss / x_size) + ( + 1 - w) * self.s_upper + return float(upper_score) + + def print_debug(self, topk): + print("new node has been created: " + self.make_name() + "\n") + print("s_upper = " + str(self.s_upper)) + print("s_lower = " + str(self.s_lower)) + print("e_upper = " + str(self.e_upper)) + print("c_upper = " + str(self.c_upper)) + print("current topk min score = " + str(topk.min_score)) + print("-------------------------------------------------------------------------------------") diff --git a/scripts/staging/slicing/base/node.py b/scripts/staging/slicing/base/SparkNode.py similarity index 70% copy from scripts/staging/slicing/base/node.py copy to scripts/staging/slicing/base/SparkNode.py index 6acd65a..fbaa0bd 100644 --- a/scripts/staging/slicing/base/node.py +++ b/scripts/staging/slicing/base/SparkNode.py @@ -19,7 +19,7 @@ # #------------------------------------------------------------- -class Node: +class SparkNode: error: float name: "" attributes: [] @@ -36,34 +36,33 @@ class Node: e_max_upper: float key: "" - def __init__(self, all_features, model, complete_x, loss, x_size, y_test, preds): - self.error = loss, + def __init__(self, loss, preds): + if loss: + self.error = loss, + if preds: + self.preds = preds self.parents = [] self.attributes = [] self.size = 0 self.score = 0 - self.model = model - self.complete_x = complete_x self.loss = 0 - self.x_size = x_size - self.preds = preds self.s_lower = 1 - self.y_test = y_test - self.all_features = all_features self.key = '' def calc_c_upper(self, w): - upper_score = w * (self.e_upper / self.s_lower) / (float(self.error[0]) / self.x_size) + (1 - w) * self.s_upper + upper_score = w * (self.e_upper / self.s_lower) / (self.error[0] / self.preds[0][0].size) + (1 - w) * self.s_upper return float(upper_score) def make_slice_mask(self): mask = [] for feature in self.attributes: - mask.append(feature[1]) + mask.append(feature) return mask def process_slice(self, loss_type): mask = self.make_slice_mask() + print("mask") + print(mask) if loss_type == 0: self.calc_l2(mask) if loss_type == 1: @@ -73,15 +72,15 @@ class Node: self.e_max = 1 size = 0 mistakes = 0 - for row in self.complete_x: + for row in self.preds: flag = True for attr in mask: - if row[1][attr] == 0: + if attr not in row[0].indices: flag = False if flag: size = size + 1 - if self.y_test[row[0]][1] != self.preds[row[0]][1]: - mistakes = mistakes + 1 + if row[1] == 0: + mistakes += 1 self.size = size if size != 0: self.loss = mistakes / size @@ -93,23 +92,25 @@ class Node: max_tuple_error = 0 sum_error = 0 size = 0 - for row in self.complete_x: + for row in self.preds: flag = True for attr in mask: - if row[1][attr] == 0: + if attr not in row[0].indices: flag = False if flag: size = size + 1 - if float(self.preds[row[0]][1]) > max_tuple_error: - max_tuple_error = float(self.preds[row[0]][1]) - sum_error = sum_error + float(self.preds[row[0]][1]) + if row[1] > max_tuple_error: + max_tuple_error = row[1] + sum_error = sum_error + row[1] self.e_max = max_tuple_error self.e_upper = max_tuple_error + self.e_max_upper = max_tuple_error if size != 0: self.loss = sum_error/size else: self.loss = 0 self.size = size + self.s_upper = size def calc_s_upper(self, cur_lvl): cur_min = self.parents[0].size @@ -132,10 +133,10 @@ class Node: e_max_min = min(e_max_min, parent.e_max_upper) return e_max_min - def calc_s_lower(self, cur_lvl): - size_value = self.x_size + def calc_s_lower(self): + size_value = len(self.preds) for parent in self.parents: - size_value = size_value - (self.x_size - parent.s_lower) + size_value = size_value - (size_value - parent.s_lower) return max(size_value, 1) def calc_e_upper(self): @@ -146,7 +147,7 @@ class Node: def calc_bounds(self, cur_lvl, w): self.s_upper = self.calc_s_upper(cur_lvl) - self.s_lower = self.calc_s_lower(cur_lvl) + self.s_lower = self.calc_s_lower() self.e_upper = self.calc_e_upper() self.e_max_upper = self.calc_e_max_upper(cur_lvl) self.c_upper = self.calc_c_upper(w) @@ -154,12 +155,12 @@ class Node: def make_name(self): name = "" for attribute in self.attributes: - name = name + str(attribute[0]) + " && " + name = name + str(attribute) + " && " name = name[0: len(name) - 4] return name - def make_key(self, new_id): - return new_id, self.name + def make_key(self): + return self.name def check_constraint(self, top_k, x_size, alpha): return self.score >= top_k.min_score and self.size >= x_size / alpha @@ -167,6 +168,30 @@ class Node: def check_bounds(self, top_k, x_size, alpha): return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score + def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w): + try: + minimized = min(s_upper, self.s_upper) + self.s_upper = minimized + minimized = min(s_lower, self.s_lower) + self.s_lower = minimized + minimized = min(e_upper, self.e_upper) + self.e_upper = minimized + minimized = min(e_max_upper, self.e_max_upper) + self.e_max_upper = minimized + c_upper = self.calc_c_upper(w) + minimized = min(c_upper, self.c_upper) + self.c_upper = minimized + except AttributeError: + # initial bounds calculation + self.s_upper = s_upper + self.s_lower = s_lower + self.e_upper = e_upper + self.e_max_upper = e_max_upper + c_upper = self.calc_c_upper(w) + self.c_upper = c_upper + minimized = min(c_upper, self.c_upper) + self.c_upper = minimized + def print_debug(self, topk, level): print("new node has been created: " + self.make_name() + "\n") if level >= 1: diff --git a/scripts/staging/slicing/base/__init__.py b/scripts/staging/slicing/base/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/staging/slicing/base/node.py b/scripts/staging/slicing/base/node.py index 6acd65a..33091a5 100644 --- a/scripts/staging/slicing/base/node.py +++ b/scripts/staging/slicing/base/node.py @@ -36,20 +36,18 @@ class Node: e_max_upper: float key: "" - def __init__(self, all_features, model, complete_x, loss, x_size, y_test, preds): + def __init__(self, complete_x, loss, x_size, y_test, preds): self.error = loss, self.parents = [] self.attributes = [] self.size = 0 self.score = 0 - self.model = model self.complete_x = complete_x self.loss = 0 self.x_size = x_size self.preds = preds self.s_lower = 1 self.y_test = y_test - self.all_features = all_features self.key = '' def calc_c_upper(self, w): @@ -167,6 +165,30 @@ class Node: def check_bounds(self, top_k, x_size, alpha): return self.s_upper >= x_size / alpha and self.c_upper >= top_k.min_score + def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w): + try: + minimized = min(s_upper, self.s_upper) + self.s_upper = minimized + minimized = min(s_lower, self.s_lower) + self.s_lower = minimized + minimized = min(e_upper, self.e_upper) + self.e_upper = minimized + minimized = min(e_max_upper, self.e_max_upper) + self.e_max_upper = minimized + c_upper = self.calc_c_upper(w) + minimized = min(c_upper, self.c_upper) + self.c_upper = minimized + except AttributeError: + # initial bounds calculation + self.s_upper = s_upper + self.s_lower = s_lower + self.e_upper = e_upper + self.e_max_upper = e_max_upper + c_upper = self.calc_c_upper(w) + self.c_upper = c_upper + minimized = min(c_upper, self.c_upper) + self.c_upper = minimized + def print_debug(self, topk, level): print("new node has been created: " + self.make_name() + "\n") if level >= 1: diff --git a/scripts/staging/slicing/base/slicer.py b/scripts/staging/slicing/base/slicer.py index 4bc3415..be967b6 100644 --- a/scripts/staging/slicing/base/slicer.py +++ b/scripts/staging/slicing/base/slicer.py @@ -19,8 +19,8 @@ # #------------------------------------------------------------- -from slicing.node import Node -from slicing.top_k import Topk +from slicing.base.node import Node +from slicing.base.top_k import Topk # optimization function calculation: @@ -33,7 +33,7 @@ def opt_fun(fi, si, f, x_size, w): # slice_name_nonsense function defines if combination of nodes on current level is fine or impossible: -# there is a dependency between common nodes' attributes number and current level is such that: +# there is dependency between common nodes' attributes number and current level is such that: # commons == cur_lvl - 1 # valid combination example: node ABC + node BCD (on 4th level) // three attributes nodes have two common attributes # invalid combination example: node ABC + CDE (on 4th level) // result node - ABCDE (absurd for 4th level) @@ -51,21 +51,13 @@ def union(lst1, lst2): return final_list -# alpha is size significance coefficient (required for optimization function) -# verbose option is for returning debug info while creating slices and printing it (in console) -# k is number of top-slices we want to receive as a result (maximum output, if less all of subsets will be printed) -# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function -# loss_type = 0 (in case of regression model) -# loss_type = 1 (in case of classification model) -def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug, alpha, k, w, loss_type): - counter = 0 - # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds +def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w): first_level = [] - levels = [] + counter = 0 all_nodes = {} - top_k = Topk(k) + # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds for feature in all_features: - new_node = Node(all_features, model, complete_x, loss, x_size, y_test, errors) + new_node = Node(complete_x, loss, x_size, y_test, errors) new_node.parents = [(feature, counter)] new_node.attributes.append((feature, counter)) new_node.name = new_node.make_name() @@ -82,11 +74,69 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug # this method updates top k slices if needed top_k.add_new_top_slice(new_node) counter = counter + 1 - levels.append(first_level) + return first_level, all_nodes + +def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type, b_update, cur_lvl, + all_nodes, top_k, cur_lvl_nodes): + for node_j in range(len(prev_lvl)): + flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl) + if not flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]: + new_node = Node(complete_x, loss, x_size, y_test, errors) + parents_set = set(new_node.parents) + parents_set.add(prev_lvl[node_i]) + parents_set.add(prev_lvl[node_j]) + new_node.parents = list(parents_set) + parent1_attr = prev_lvl[node_i].attributes + parent2_attr = prev_lvl[node_j].attributes + new_node_attr = union(parent1_attr, parent2_attr) + new_node.attributes = new_node_attr + new_node.name = new_node.make_name() + new_id = len(all_nodes) + new_node.key = new_node.make_key(new_id) + if new_node.key[1] in all_nodes: + existing_item = all_nodes[new_node.key[1]] + parents_set = set(existing_item.parents) + existing_item.parents = parents_set + if b_update: + s_upper = new_node.calc_s_upper(cur_lvl) + s_lower = new_node.calc_s_lower(cur_lvl) + e_upper = new_node.calc_e_upper() + e_max_upper = new_node.calc_e_max_upper(cur_lvl) + new_node.update_bounds(s_upper, s_lower, e_upper, e_max_upper, w) + else: + new_node.calc_bounds(cur_lvl, w) + all_nodes[new_node.key[1]] = new_node + # check if concrete data should be extracted or not (only for those that have score upper + # big enough and if size of subset is big enough + to_slice = new_node.check_bounds(top_k, x_size, alpha) + if to_slice: + new_node.process_slice(loss_type) + new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w) + # we decide to add node to current level nodes (in order to make new combinations + # on the next one or not basing on its score value + if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys: + top_k.add_new_top_slice(new_node) + cur_lvl_nodes.append(new_node) + if debug: + new_node.print_debug(top_k, cur_lvl) + return cur_lvl_nodes, all_nodes + + +# alpha is size significance coefficient (required for optimization function) +# verbose option is for returning debug info while creating slices and printing it (in console) +# k is number of top-slices we want to receive as a result (maximum output, if less all of subsets will be printed) +# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function +# loss_type = 0 (in case of regression model) +# loss_type = 1 (in case of classification model) +def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha, k, w, loss_type, b_update): + levels = [] + top_k = Topk(k) + first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, top_k, alpha, w) + all_nodes = first_level[1] + levels.append(first_level[0]) # cur_lvl - index of current level, correlates with number of slice forming features cur_lvl = 1 # currently filled level after first init iteration - # currently for debug print("Level 1 had " + str(len(all_features)) + " candidates") print() @@ -95,52 +145,17 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug # combining each candidate of previous level with every till it becomes useless (one node can't make a pair) while len(levels[cur_lvl - 1]) > 1: cur_lvl_nodes = [] - for node_i in range(len(levels[cur_lvl - 1]) - 1): - for node_j in range(node_i + 1, len(levels[cur_lvl - 1])): - flag = slice_name_nonsense(levels[cur_lvl - 1][node_i], levels[cur_lvl - 1][node_j], cur_lvl) - if not flag: - new_node = Node(all_features, model, complete_x, loss, x_size, y_test, errors) - parents_set = set(new_node.parents) - parents_set.add(levels[cur_lvl - 1][node_i]) - parents_set.add(levels[cur_lvl - 1][node_j]) - new_node.parents = list(parents_set) - parent1_attr = levels[cur_lvl - 1][node_i].attributes - parent2_attr = levels[cur_lvl - 1][node_j].attributes - new_node_attr = union(parent1_attr, parent2_attr) - new_node.attributes = new_node_attr - new_node.name = new_node.make_name() - new_id = len(all_nodes) - new_node.key = new_node.make_key(new_id) - if new_node.key[1] in all_nodes: - existing_item = all_nodes[new_node.key[1]] - parents_set = set(existing_item.parents) - existing_item.parents = parents_set - else: - new_node.calc_bounds(cur_lvl, w) - all_nodes[new_node.key[1]] = new_node - # check if concrete data should be extracted or not (only for those that have score upper - # big enough and if size of subset is big enough - to_slice = new_node.check_bounds(top_k, x_size, alpha) - if to_slice: - new_node.process_slice(loss_type) - new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w) - # we decide to add node to current level nodes (in order to make new combinations - # on the next one or not basing on its score value - if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys: - cur_lvl_nodes.append(new_node) - top_k.add_new_top_slice(new_node) - elif new_node.check_bounds(top_k, x_size, alpha): - cur_lvl_nodes.append(new_node) - else: - if new_node.check_bounds(top_k, x_size, alpha): - cur_lvl_nodes.append(new_node) - if debug: - new_node.print_debug(top_k, cur_lvl) - print("Level " + str(cur_lvl + 1) + " had " + str(len(levels[cur_lvl - 1]) * (len(levels[cur_lvl - 1]) - 1)) + - " candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level") + prev_lvl = levels[cur_lvl - 1] + for node_i in range(len(prev_lvl)): + partial = join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, debug, alpha, w, loss_type, + b_update, cur_lvl, all_nodes, top_k, cur_lvl_nodes) + cur_lvl_nodes = partial[0] + all_nodes = partial[1] cur_lvl = cur_lvl + 1 levels.append(cur_lvl_nodes) top_k.print_topk() + print("Level " + str(cur_lvl) + " had " + str(len(prev_lvl) * (len(prev_lvl) - 1)) + + " candidates but after pruning only " + str(len(cur_lvl_nodes)) + " go to the next level") print("Program stopped at level " + str(cur_lvl + 1)) print() print("Selected slices are: ") diff --git a/scripts/staging/slicing/base/tests/classification/test_adult.py b/scripts/staging/slicing/base/tests/classification/test_adult.py deleted file mode 100644 index 247d5c4..0000000 --- a/scripts/staging/slicing/base/tests/classification/test_adult.py +++ /dev/null @@ -1,101 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -import pandas as pd -from sklearn.preprocessing import OneHotEncoder - -import slicing.slicer as slicer -from sklearn.ensemble import RandomForestClassifier -from sklearn import preprocessing -from sklearn.model_selection import train_test_split - - -dataset = pd.read_csv('adult.csv') -attributes_amount = len(dataset.values[0]) -x = dataset.iloc[:, 0:attributes_amount - 1].values -# enc = OneHotEncoder(handle_unknown='ignore') -# x = enc.fit_transform(x).toarray() -y = dataset.iloc[:, attributes_amount - 1] -le = preprocessing.LabelEncoder() -le.fit(y) -y = le.transform(y) -complete_x = [] -complete_y = [] -counter = 0 -all_indexes = [] -not_encoded_columns = [ - "Age", "WorkClass", "fnlwgt", "Education", "EducationNum", - "MaritalStatus", "Occupation", "Relationship", "Race", "Gender", - "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income" -] -for row in x: - row[0] = int(row[0] / 10) - row[2] = int(row[2]) // 100000 - row[4] = int(row[4] / 5) - row[10] = int(row[10] / 1000) - row[12] = int(row[12] / 10) -enc = OneHotEncoder(handle_unknown='ignore') -x = enc.fit_transform(x).toarray() -all_features = enc.get_feature_names() -x_size = len(complete_x) -x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0) -for item in x_test: - complete_x.append((counter, item)) - complete_y.append((counter, y_test[counter])) - counter = counter + 1 -x_size = counter -clf = RandomForestClassifier(n_jobs=2, random_state=0) -clf.fit(x_train, y_train) -RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini', - max_depth=None, max_features='auto', max_leaf_nodes=None, - min_impurity_split=1e-07, min_samples_leaf=1, - min_samples_split=2, min_weight_fraction_leaf=0.0, - n_estimators=10, n_jobs=2, oob_score=False, random_state=0, - verbose=0, warm_start=False) - -# alpha is size significance coefficient -# verbose option is for returning debug info while creating slices and printing it -# k is number of top-slices we want -# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function -# loss_type = 0 (l2 in case of regression model -# loss_type = 1 (cross entropy in case of classification model) -preds = clf.predict(x_test) -predictions = [] -counter = 0 -mistakes = 0 -for pred in preds: - predictions.append((counter, pred)) - if y_test[counter] != pred: - mistakes = mistakes + 1 - counter = counter + 1 -lossF = mistakes / counter - -# enumerator <union>/<join> indicates an approach of next level slices combination process: -# in case of <join> in order to create new node of current level slicer -# combines only nodes of previous layer with each other -# <union> case implementation is based on DPSize algorithm -enumerator = "union" -if enumerator == "join": - slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, predictions, debug=True, alpha=4, k=10, - w=0.5, loss_type=1) -elif enumerator == "union": - union_slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, predictions, debug=True, alpha=4, - k=10, w=0.5, loss_type=1) diff --git a/scripts/staging/slicing/base/tests/classification/test_iris.py b/scripts/staging/slicing/base/tests/classification/test_iris.py deleted file mode 100644 index ce8effd..0000000 --- a/scripts/staging/slicing/base/tests/classification/test_iris.py +++ /dev/null @@ -1,88 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -import pandas as pd -from sklearn.preprocessing import OneHotEncoder - -import slicing.slicer as slicer -from sklearn.ensemble import RandomForestClassifier -from sklearn import preprocessing -from sklearn.model_selection import train_test_split - - -dataset = pd.read_csv('iris.csv') -attributes_amount = len(dataset.values[0]) -x = dataset.iloc[:, 0:attributes_amount - 1].values -enc = OneHotEncoder(handle_unknown='ignore') -x = enc.fit_transform(x).toarray() -y = dataset.iloc[:, attributes_amount - 1] -le = preprocessing.LabelEncoder() -le.fit(y) -y = le.transform(y) -complete_x = [] -complete_y = [] -counter = 0 -all_indexes = [] -all_features = enc.get_feature_names() -x_size = len(complete_x) -x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0) -for item in x_test: - complete_x.append((counter, item)) - complete_y.append((counter, y_test[counter])) - counter = counter + 1 -x_size = counter -clf = RandomForestClassifier(n_jobs=2, random_state=0) -clf.fit(x_train, y_train) -RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini', - max_depth=None, max_features='auto', max_leaf_nodes=None, - min_impurity_split=1e-07, min_samples_leaf=1, - min_samples_split=2, min_weight_fraction_leaf=0.0, - n_estimators=10, n_jobs=2, oob_score=False, random_state=0, - verbose=0, warm_start=False) - -# alpha is size significance coefficient -# verbose option is for returning debug info while creating slices and printing it -# k is number of top-slices we want -# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function -# loss_type = 0 (l2 in case of regression model -# loss_type = 1 (cross entropy in case of classification model) -preds = clf.predict(x_test) -predictions = [] -counter = 0 -mistakes = 0 -for pred in preds: - predictions.append((counter, pred)) - if y_test[counter] != pred: - mistakes = mistakes + 1 - counter = counter + 1 -lossF = mistakes / counter - -# enumerator <union>/<join> indicates an approach of next level slices combination process: -# in case of <join> in order to create new node of current level slicer -# combines only nodes of previous layer with each other -# <union> case implementation is based on DPSize algorithm -enumerator = "join" -if enumerator == "join": - slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, predictions, debug=True, alpha=6, k=10, - w=0.5, loss_type=1) -elif enumerator == "union": - union_slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, predictions, debug=True, alpha=6, k=10, - w=0.5, loss_type=1) diff --git a/scripts/staging/slicing/base/tests/regression/test_insurance.py b/scripts/staging/slicing/base/tests/regression/test_insurance.py deleted file mode 100644 index 34a5d37..0000000 --- a/scripts/staging/slicing/base/tests/regression/test_insurance.py +++ /dev/null @@ -1,81 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -import pandas as pd -from sklearn.linear_model import LinearRegression -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import OneHotEncoder - -import slicing.slicer as slicer - -file_name = 'insurance.csv' -dataset = pd.read_csv(file_name) -attributes_amount = len(dataset.values[0]) -# for now working with regression datasets, assuming that target attribute is the last one -# currently non-categorical features are not supported and should be binned -y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values -# starting with one not including id field -x = dataset.iloc[:, 0:attributes_amount - 1].values -# list of numerical columns -non_categorical = [1, 3] -for row in x: - for attribute in non_categorical: - # <attribute - 2> as we already excluded from x id column - row[attribute - 1] = int(row[attribute - 1] / 5) -# hot encoding of categorical features -enc = OneHotEncoder(handle_unknown='ignore') -x = enc.fit_transform(x).toarray() -complete_x = [] -complete_y = [] -counter = 0 -all_features = enc.get_feature_names() -# train model on a whole dataset -x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0) -for item in x_test: - complete_x.append((counter, item)) - complete_y.append((counter, y_test[counter])) - counter = counter + 1 -x_size = counter -model = LinearRegression() -model.fit(x_train, y_train) -preds = (model.predict(x_test) - y_test) ** 2 -f_l2 = sum(preds)/x_size -errors = [] -counter = 0 -for pred in preds: - errors.append((counter, pred)) - counter = counter + 1 -# alpha is size significance coefficient -# verbose option is for returning debug info while creating slices and printing it -# k is number of top-slices we want -# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function - -# enumerator <union>/<join> indicates an approach of next level slices combination process: -# in case of <join> in order to create new node of current level slicer -# combines only nodes of previous layer with each other -# <union> case implementation is based on DPSize algorithm -enumerator = "union" -if enumerator == "join": - slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, debug=True, alpha=5, k=10, - w=0.5, loss_type=0) -elif enumerator == "union": - union_slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, debug=True, alpha=5, k=10, - w=0.5, loss_type=0) diff --git a/scripts/staging/slicing/base/tests/regression/test_salary.py b/scripts/staging/slicing/base/tests/regression/test_salary.py deleted file mode 100644 index c51f4b2..0000000 --- a/scripts/staging/slicing/base/tests/regression/test_salary.py +++ /dev/null @@ -1,87 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -import pandas as pd -from sklearn.linear_model import LinearRegression -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import OneHotEncoder - -import slicing.slicer as slicer - -file_name = 'salary.csv' -dataset = pd.read_csv(file_name) -attributes_amount = len(dataset.values[0]) -# for now working with regression datasets, assuming that target attribute is the last one -# currently non-categorical features are not supported and should be binned -y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values -# starting with one not including id field -x = dataset.iloc[:, 1:attributes_amount - 1].values -# list of numerical columns -non_categorical = [4, 5] -for row in x: - for attribute in non_categorical: - # <attribute - 2> as we already excluded from x id column - row[attribute - 2] = int(row[attribute - 2] / 5) -# hot encoding of categorical features -enc = OneHotEncoder(handle_unknown='ignore') -x = enc.fit_transform(x).toarray() -complete_x = [] -complete_y = [] -counter = 0 -all_features = enc.get_feature_names() -# train model on a whole dataset -x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0) -for item in x_test: - complete_x.append((counter, item)) - complete_y.append((counter, y_test[counter])) - counter = counter + 1 -x_size = counter -model = LinearRegression() -model.fit(x_train, y_train) -preds = (model.predict(x_test) - y_test) ** 2 -f_l2 = sum(preds)/x_size -errors = [] -counter = 0 -for pred in preds: - errors.append((counter, pred)) - counter = counter + 1 -# alpha is size significance coefficient -# verbose option is for returning debug info while creating slices and printing it -# k is number of top-slices we want -# w is a weight of error function significance (1 - w) is a size significance propagated into optimization function -<<<<<<< HEAD:scripts/staging/slicing/base/tests/regression/test_salary.py -slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, debug=True, alpha=4, k=10, w=0.5, - loss_type=0) - -======= - -# enumerator <union>/<join> indicates an approach of next level slices combination process: -# in case of <join> in order to create new node of current level slicer -# combines only nodes of previous layer with each other -# <union> case implementation is based on DPSize algorithm -enumerator = "union" -if enumerator == "join": - slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, debug=True, alpha=4, k=10, w=0.5, - loss_type=0) -elif enumerator == "union": - union_slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, debug=True, alpha=4, k=10, w=0.5, - loss_type=0) ->>>>>>> [SYSTEMDS-xxx] Alternative slice enumeration algorithm:scripts/staging/slicing/base/tests/regression/test_salary.py diff --git a/scripts/staging/slicing/base/top_k.py b/scripts/staging/slicing/base/top_k.py index 87a5d56..3957fea 100644 --- a/scripts/staging/slicing/base/top_k.py +++ b/scripts/staging/slicing/base/top_k.py @@ -51,5 +51,8 @@ class Topk: for candidate in self.slices: print(candidate.name + ": " + "score = " + str(candidate.score) + "; size = " + str(candidate.size)) - print(candidate.name + ": " + "score = " + str(candidate.score) + "; size = " + str(candidate.size)) - + def buckets_top_k(self, cur_lvl_slices, x_size, alpha): + for bucket in cur_lvl_slices: + if bucket.check_constraint(self, x_size, alpha): + self.add_new_top_slice(bucket) + return self diff --git a/scripts/staging/slicing/base/union_slicer.py b/scripts/staging/slicing/base/union_slicer.py index 6ca003f..6b5fb0e 100644 --- a/scripts/staging/slicing/base/union_slicer.py +++ b/scripts/staging/slicing/base/union_slicer.py @@ -19,9 +19,9 @@ # #------------------------------------------------------------- -from slicing.node import Node -from slicing.top_k import Topk -from slicing.slicer import opt_fun, union +from slicing.base.node import Node +from slicing.base.top_k import Topk +from slicing.base.slicer import opt_fun, union def check_attributes(left_node, right_node): @@ -35,15 +35,12 @@ def check_attributes(left_node, right_node): return flag -def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug, alpha, k, w, loss_type): +def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, w, alpha, top_k): + all_nodes = {} counter = 0 - # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds first_level = [] - levels = [] - all_nodes = {} - top_k = Topk(k) for feature in all_features: - new_node = Node(all_features, model, complete_x, loss, x_size, y_test, errors) + new_node = Node(complete_x, loss, x_size, y_test, errors) new_node.parents = [(feature, counter)] new_node.attributes.append((feature, counter)) new_node.name = new_node.make_name() @@ -65,12 +62,23 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug # this method updates top k slices if needed top_k.add_new_top_slice(new_node) counter = counter + 1 - # double appending of first level nodes in order to enumerating second level in the same way as others - levels.append((first_level, len(all_features))) - levels.append((first_level, len(all_features))) + return first_level, all_nodes + +def union_enum(): + return None + + +def process(all_features, complete_x, loss, x_size, y_test, errors, debug, alpha, k, w, loss_type, b_update): + top_k = Topk(k) + # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds + levels = [] + first_level = make_first_level(all_features, complete_x, loss, x_size, y_test, errors, loss_type, w, alpha, top_k) + # double appending of first level nodes in order to enumerating second level in the same way as others + levels.append((first_level[0], len(all_features))) + all_nodes = first_level[1] # cur_lvl - index of current level, correlates with number of slice forming features - cur_lvl = 2 # level that is planned to be filled later + cur_lvl = 1 # level that is planned to be filled later cur_lvl_nodes = first_level # currently for debug print("Level 1 had " + str(len(all_features)) + " candidates") @@ -81,13 +89,13 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug while len(cur_lvl_nodes) > 0: cur_lvl_nodes = [] count = 0 - for left in range(int(cur_lvl / 2)): + for left in range(int(cur_lvl / 2) + 1): right = cur_lvl - 1 - left for node_i in range(len(levels[left][0])): for node_j in range(len(levels[right][0])): flag = check_attributes(levels[left][0][node_i], levels[right][0][node_j]) if not flag: - new_node = Node(all_features, model, complete_x, loss, x_size, y_test, errors) + new_node = Node(complete_x, loss, x_size, y_test, errors) parents_set = set(new_node.parents) parents_set.add(levels[left][0][node_i]) parents_set.add(levels[right][0][node_j]) @@ -103,32 +111,12 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug existing_item = all_nodes[new_node.key[1]] parents_set = set(existing_item.parents) existing_item.parents = parents_set - s_upper = new_node.calc_s_upper(cur_lvl) - s_lower = new_node.calc_s_lower(cur_lvl) - e_upper = new_node.calc_e_upper() - e_max_upper = new_node.calc_e_max_upper(cur_lvl) - try: - minimized = min(s_upper, new_node.s_upper) - new_node.s_upper = minimized - minimized = min(s_lower, new_node.s_lower) - new_node.s_lower = minimized - minimized = min(e_upper, new_node.e_upper) - new_node.e_upper = minimized - minimized= min(e_max_upper, new_node.e_max_upper) - new_node.e_max_upper = minimized - c_upper = new_node.calc_c_upper(w) - minimized= min(c_upper, new_node.c_upper) - new_node.c_upper = minimized - except AttributeError: - # initial bounds calculation - new_node.s_upper = s_upper - new_node.s_lower = s_lower - new_node.e_upper = e_upper - new_node.e_max_upper = e_max_upper - c_upper = new_node.calc_c_upper(w) - new_node.c_upper = c_upper - minimized = min(c_upper, new_node.c_upper) - new_node.c_upper = minimized + if b_update: + s_upper = new_node.calc_s_upper(cur_lvl) + s_lower = new_node.calc_s_lower(cur_lvl) + e_upper = new_node.calc_e_upper() + e_max_upper = new_node.calc_e_max_upper(cur_lvl) + new_node.update_bounds(s_upper, s_lower, e_upper, e_max_upper, w) else: new_node.calc_bounds(cur_lvl, w) all_nodes[new_node.key[1]] = new_node @@ -140,13 +128,9 @@ def process(all_features, model, complete_x, loss, x_size, y_test, errors, debug new_node.score = opt_fun(new_node.loss, new_node.size, loss, x_size, w) # we decide to add node to current level nodes (in order to make new combinations # on the next one or not basing on its score value - if new_node.score >= top_k.min_score and new_node.size >= x_size / alpha \ - and new_node.key not in top_k.keys: - cur_lvl_nodes.append(new_node) + if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys: top_k.add_new_top_slice(new_node) - else: - if new_node.s_upper >= x_size / alpha and new_node.c_upper >= top_k.min_score: - cur_lvl_nodes.append(new_node) + cur_lvl_nodes.append(new_node) if debug: new_node.print_debug(top_k, cur_lvl) count = count + levels[left][1] * levels[right][1] diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py b/scripts/staging/slicing/spark_modules/join_data_parallel.py new file mode 100644 index 0000000..78156c3 --- /dev/null +++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py @@ -0,0 +1,120 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark import SparkContext + +from slicing.base.Bucket import Bucket +from slicing.base.SparkNode import SparkNode +from slicing.base.top_k import Topk +from slicing.spark_modules import spark_utils +from slicing.spark_modules.spark_utils import approved_join_slice + + +def rows_mapper(row, buckets, loss_type): + filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in bucket[1].attributes), buckets.items())) + for item in filtered: + filtered[item].update_metrics(row, loss_type) + return filtered + + +def join_enum(cur_lvl_nodes, cur_lvl, x_size, alpha, top_k, w, loss): + buckets = {} + for node_i in range(len(cur_lvl_nodes)): + for node_j in range(node_i + 1, len(cur_lvl_nodes)): + flag = approved_join_slice(cur_lvl_nodes[node_i], cur_lvl_nodes[node_j], cur_lvl) + if flag: + node = SparkNode(None, None) + node.attributes = list(set(cur_lvl_nodes[node_i].attributes) | set(cur_lvl_nodes[node_j].attributes)) + bucket = Bucket(node, cur_lvl, w, x_size, loss) + bucket.parents.append(cur_lvl_nodes[node_i]) + bucket.parents.append(cur_lvl_nodes[node_j]) + bucket.calc_bounds(w, x_size, loss) + if bucket.check_bounds(x_size, alpha, top_k): + buckets[bucket.name] = bucket + return buckets + + +def combiner(a): + return a + + +def merge_values(a, b): + a.combine_with(b) + return a + + +def merge_combiners(a, b): + a + b + return a + + +def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type): + top_k = Topk(k) + cur_lvl = 0 + cur_lvl_nodes = list(all_features) + pred_pandas = predictions.toPandas() + x_size = len(pred_pandas) + b_topk = SparkContext.broadcast(sc, top_k) + b_cur_lvl = SparkContext.broadcast(sc, cur_lvl) + b_cur_lvl_nodes = SparkContext.broadcast(sc, cur_lvl_nodes) + buckets = {} + for node in b_cur_lvl_nodes.value: + bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss) + buckets[bucket.name] = bucket + b_buckets = SparkContext.broadcast(sc, buckets) + rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\ + .map(lambda item: (item[0], item[1].tolist(), item[2])) + mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type)) + flattened = mapped.flatMap(lambda line: (line.items())) + reduced = flattened.combineByKey(combiner, merge_values, merge_combiners) + cur_lvl_nodes = reduced.values()\ + .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w, x_size, b_cur_lvl.value)) + if debug: + cur_lvl_nodes.map(lambda bucket: bucket.print_debug(b_topk.value)).collect() + cur_lvl = 1 + prev_level = cur_lvl_nodes.collect() + top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha) + while len(prev_level) > 0: + b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level) + b_topk = SparkContext.broadcast(sc, top_k) + b_cur_lvl = SparkContext.broadcast(sc, cur_lvl) + b_topk.value.print_topk() + buckets = join_enum(b_cur_lvl_nodes.value, b_cur_lvl.value, x_size, alpha, b_topk.value, w, loss) + b_buckets = SparkContext.broadcast(sc, buckets) + to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, alpha, b_topk.value), b_buckets.value.items())) + mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type)) + flattened = mapped.flatMap(lambda line: (line.items())) + to_process = flattened.combineByKey(combiner, merge_values, merge_combiners) + if debug: + to_process.values().map(lambda bucket: bucket.print_debug(b_topk.value)).collect() + prev_level = to_process\ + .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value))\ + .collect() + cur_lvl = b_cur_lvl.value + 1 + top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha) + print("Level " + str(b_cur_lvl.value) + " had " + str( + len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+" candidates but after pruning only " + + str(len(prev_level)) + " go to the next level") + print("Program stopped at level " + str(cur_lvl)) + print() + print("Selected slices are: ") + b_topk.value.print_topk() + return None diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py b/scripts/staging/slicing/spark_modules/spark_slicer.py new file mode 100644 index 0000000..86f2b34 --- /dev/null +++ b/scripts/staging/slicing/spark_modules/spark_slicer.py @@ -0,0 +1,100 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark import SparkContext + +from slicing.base.SparkNode import SparkNode +from slicing.base.slicer import union, opt_fun +from slicing.base.top_k import Topk +from slicing.spark_modules import spark_utils +from slicing.spark_modules.spark_utils import update_top_k + + +def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, loss_type, cur_lvl, top_k): + x_size = len(predictions) + nodes = [] + for node_i in range(len(list_b)): + flag = spark_utils.approved_join_slice(node_i, node_a, cur_lvl) + if not flag: + new_node = SparkNode(predictions, f_l2) + parents_set = set(new_node.parents) + parents_set.add(node_i) + parents_set.add(node_a) + new_node.parents = list(parents_set) + parent1_attr = node_a.attributes + parent2_attr = list_b[node_i].attributes + new_node_attr = union(parent1_attr, parent2_attr) + new_node.attributes = new_node_attr + new_node.name = new_node.make_name() + new_node.calc_bounds(cur_lvl, w) + # check if concrete data should be extracted or not (only for those that have score upper + # and if size of subset is big enough + to_slice = new_node.check_bounds(top_k, x_size, alpha) + if to_slice: + new_node.process_slice(loss_type) + new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, x_size, w) + # we decide to add node to current level nodes (in order to make new combinations + # on the next one or not basing on its score value + if new_node.check_constraint(top_k, x_size, alpha) and new_node.key not in top_k.keys: + top_k.add_new_top_slice(new_node) + nodes.append(new_node) + if debug: + new_node.print_debug(top_k, cur_lvl) + return nodes + + +def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator): + top_k = Topk(k) + cur_lvl = 0 + levels = [] + first_level = {} + all_features = list(all_features) + first_tasks = sc.parallelize(all_features) + b_topk = SparkContext.broadcast(sc, top_k) + init_slices = first_tasks.mapPartitions(lambda features: spark_utils.make_first_level(features, predictions, loss, + b_topk.value, w, loss_type)) \ + .map(lambda node: (node.key, node)).collect() + first_level.update(init_slices) + update_top_k(first_level, b_topk.value, alpha, predictions) + prev_level = SparkContext.broadcast(sc, first_level) + levels.append(prev_level) + cur_lvl = cur_lvl + 1 + b_topk.value.print_topk() + # checking the first partition of level. if not empty then processing otherwise no elements were added to this level + while len(levels[cur_lvl - 1].value) > 0: + nodes_list = {} + partitions = sc.parallelize(levels[cur_lvl - 1].value.values()) + mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(), + predictions, loss, b_topk.value, alpha, k, w, + loss_type, cur_lvl, debug, enumerator)) + flattened = mapped.flatMap(lambda node: node) + nodes_list.update(flattened.map(lambda node: (node.key, node)).distinct().collect()) + prev_level = SparkContext.broadcast(sc, nodes_list) + levels.append(prev_level) + update_top_k(nodes_list, b_topk.value, alpha, predictions) + cur_lvl = cur_lvl + 1 + b_topk.value.print_topk() + print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) + + " candidates but after pruning only " + str(len(nodes_list)) + " go to the next level") + print("Program stopped at level " + str(cur_lvl)) + print() + print("Selected slices are: ") + b_topk.value.print_topk() diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py b/scripts/staging/slicing/spark_modules/spark_union_slicer.py new file mode 100644 index 0000000..811baff --- /dev/null +++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py @@ -0,0 +1,70 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +from pyspark import SparkContext + +from slicing.base.top_k import Topk +from slicing.spark_modules import spark_utils +from slicing.spark_modules.spark_utils import update_top_k + + +def process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type, enumerator): + top_k = Topk(k) + cur_lvl = 0 + levels = [] + all_features = list(all_features) + first_level = {} + first_tasks = sc.parallelize(all_features) + b_topk = SparkContext.broadcast(sc, top_k) + init_slices = first_tasks.mapPartitions(lambda features: spark_utils.make_first_level(features, predictions, loss, + b_topk.value, w, loss_type)) \ + .map(lambda node: (node.key, node)) \ + .collect() + first_level.update(init_slices) + update_top_k(first_level, b_topk.value, alpha, predictions) + prev_level = SparkContext.broadcast(sc, first_level) + levels.append(prev_level) + cur_lvl = 1 + b_topk.value.print_topk() + while len(levels[cur_lvl - 1].value) > 0: + cur_lvl_res = {} + for left in range(int(cur_lvl / 2) + 1): + right = cur_lvl - left - 1 + partitions = sc.parallelize(levels[left].value.values()) + mapped = partitions.mapPartitions(lambda nodes: spark_utils.nodes_enum(nodes, levels[right].value.values(), + predictions, loss, b_topk.value, alpha, k, + w, loss_type, cur_lvl, debug, + enumerator)) + flattened = mapped.flatMap(lambda node: node) + partial = flattened.map(lambda node: (node.key, node)).collect() + cur_lvl_res.update(partial) + prev_level = SparkContext.broadcast(sc, cur_lvl_res) + levels.append(prev_level) + update_top_k(cur_lvl_res, b_topk.value, alpha, predictions) + cur_lvl = cur_lvl + 1 + top_k.print_topk() + print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) + + " candidates but after pruning only " + str(len(prev_level.value)) + " go to the next level") + print("Program stopped at level " + str(cur_lvl)) + print() + print("Selected slices are: ") + b_topk.value.print_topk() diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py b/scripts/staging/slicing/spark_modules/spark_utils.py new file mode 100644 index 0000000..a12c84b --- /dev/null +++ b/scripts/staging/slicing/spark_modules/spark_utils.py @@ -0,0 +1,141 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +from pyspark.sql.functions import udf +from pyspark.sql.types import FloatType + +from slicing.base.SparkNode import SparkNode +from slicing.base.slicer import opt_fun, union + +calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, prediction, type), FloatType()) +model_type_init = udf(lambda type: init_model_type(type)) + + +def calc_loss_fun(target, prediction, type): + if type == 0: + return (prediction - target) ** 2 + elif type == 1: + return float(target == prediction) + + +def init_model_type(model_type): + if model_type == "regression": + return 0 + elif model_type == "classification": + return 1 + + +def approved_join_slice(node_i, node_j, cur_lvl): + commons = len(list(set(node_i.attributes) & set(node_j.attributes))) + return commons == cur_lvl - 1 + + +def make_first_level(features, predictions, loss, top_k, w, loss_type): + first_level = [] + # First level slices are enumerated in a "classic way" (getting data and not analyzing bounds + for feature in features: + new_node = SparkNode(loss, predictions) + new_node.parents = [feature] + new_node.attributes.append(feature) + new_node.name = new_node.make_name() + new_node.key = new_node.make_key() + new_node.process_slice(loss_type) + new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w) + new_node.c_upper = new_node.score + first_level.append(new_node) + new_node.print_debug(top_k, 0) + return first_level + + +def approved_union_slice(node_i, node_j): + for attr1 in node_i.attributes: + for attr2 in node_j.attributes: + if attr1 == attr2: + # there are common attributes which is not the case we need + return False + return True + + +def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, loss_type, w, debug, enumerator): + cur_enum_nodes = [] + for node_j in level: + if enumerator == "join": + flag = approved_join_slice(node_i, node_j, cur_lvl) + else: + flag = approved_union_slice(node_i, node_j) + if flag and int(node_i.name.split("&&")[0]) < int(node_j.name.split("&&")[0]): + new_node = SparkNode(loss, predictions) + parents_set = set(new_node.parents) + parents_set.add(node_i) + parents_set.add(node_j) + new_node.parents = list(parents_set) + parent1_attr = node_i.attributes + parent2_attr = node_j.attributes + new_node_attr = union(parent1_attr, parent2_attr) + new_node.attributes = new_node_attr + new_node.name = new_node.make_name() + new_node.key = new_node.make_key() + new_node.calc_bounds(cur_lvl, w) + to_slice = new_node.check_bounds(top_k, len(predictions), alpha) + if to_slice: + new_node.process_slice(loss_type) + new_node.score = opt_fun(new_node.loss, new_node.size, loss, len(predictions), w) + if new_node.check_constraint(top_k, len(predictions), alpha): + cur_enum_nodes.append(new_node) + if debug: + new_node.print_debug(top_k, cur_lvl) + return cur_enum_nodes + + +def nodes_enum(nodes, level, predictions, loss, top_k, alpha, k, w, loss_type, cur_lvl, debug, enumerator): + cur_enum_nodes = [] + for node_i in nodes: + partial_nodes = process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, + loss_type, w, debug, enumerator) + cur_enum_nodes.append(partial_nodes) + return cur_enum_nodes + + +def init_top_k(first_level, top_k, alpha, predictions): + # driver updates topK + for sliced in first_level: + if sliced[1].check_constraint(top_k, len(predictions), alpha): + # this method updates top k slices if needed + top_k.add_new_top_slice(sliced[1]) + + +def update_top_k(new_slices, top_k, alpha, predictions): + # driver updates topK + for sliced in new_slices.values(): + if sliced.check_constraint(top_k, len(predictions), alpha): + # this method updates top k slices if needed + top_k.add_new_top_slice(sliced) + + +def calc_bucket_metrics(bucket, loss, w, x_size, cur_lvl): + bucket.calc_error() + bucket.score = opt_fun(bucket.error, bucket.size, loss, x_size, w) + if cur_lvl == 0: + bucket.s_upper = bucket.size + bucket.c_upper = bucket.score + bucket.s_lower = 1 + return bucket diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py b/scripts/staging/slicing/spark_modules/union_data_parallel.py new file mode 100644 index 0000000..b000abe --- /dev/null +++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py @@ -0,0 +1,119 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark import SparkContext + +from slicing.base.Bucket import Bucket +from slicing.base.SparkNode import SparkNode +from slicing.base.top_k import Topk +from slicing.spark_modules import spark_utils, join_data_parallel +from slicing.spark_modules.join_data_parallel import rows_mapper, combiner +from slicing.spark_modules.spark_utils import approved_union_slice + + +def merge_values(a, b): + a.minimize_bounds(b) + return a + + +def merge_combiners(a, b): + a + b + return a + + +def union_enum(left_level, right_level, x_size, alpha, top_k, w, loss, cur_lvl): + buckets = {} + for node_i in range(len(left_level)): + for node_j in range(len(right_level)): + flag = approved_union_slice(left_level[node_i], right_level[node_j]) + if flag: + node = SparkNode(None, None) + node.attributes = list(set(left_level[node_i].attributes) | set(right_level[node_j].attributes)) + bucket = Bucket(node, cur_lvl, w, x_size, loss) + bucket.parents.append(left_level[node_i]) + bucket.parents.append(right_level[node_j]) + bucket.calc_bounds(w, x_size, loss) + if bucket.check_bounds(x_size, alpha, top_k): + buckets[bucket.name] = bucket + return buckets + + +def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, loss_type): + top_k = Topk(k) + cur_lvl = 0 + levels = [] + cur_lvl_nodes = list(all_features) + pred_pandas = predictions.toPandas() + x_size = len(pred_pandas) + b_topk = SparkContext.broadcast(sc, top_k) + b_cur_lvl = SparkContext.broadcast(sc, cur_lvl) + buckets = {} + for node in cur_lvl_nodes: + bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss) + buckets[bucket.name] = bucket + b_buckets = SparkContext.broadcast(sc, buckets) + rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \ + .map(lambda item: (item[0], item[1].tolist(), item[2])) + mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type)) + flattened = mapped.flatMap(lambda line: (line.items())) + reduced = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners) + cur_lvl_nodes = reduced.values() \ + .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w, x_size, b_cur_lvl.value)) + if debug: + cur_lvl_nodes.map(lambda bucket: bucket.print_debug(b_topk.value)).collect() + cur_lvl = 1 + prev_level = cur_lvl_nodes.collect() + b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level) + levels.append(b_cur_lvl_nodes) + top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha) + while len(levels[cur_lvl - 1].value) > 0: + b_topk = SparkContext.broadcast(sc, top_k) + b_cur_lvl = SparkContext.broadcast(sc, cur_lvl) + b_topk.value.print_topk() + buckets = [] + for left in range(int(cur_lvl / 2) + 1): + right = cur_lvl - left - 1 + nodes = union_enum(levels[left].value, levels[right].value, x_size, alpha, b_topk.value, w, loss, b_cur_lvl.value) + buckets.append(nodes) + b_buckets = sc.parallelize(buckets) + all_buckets = b_buckets.flatMap(lambda line: (line.items())) + combined = dict(all_buckets.combineByKey(combiner, merge_values, merge_combiners).collect()) + b_buckets = SparkContext.broadcast(sc, combined) + to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, alpha, b_topk.value), b_buckets.value.items())) + mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type)) + flattened = mapped.flatMap(lambda line: (line.items())) + partial = flattened.combineByKey(combiner, join_data_parallel.merge_values, join_data_parallel.merge_combiners) + prev_level = partial\ + .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], loss, w, x_size, b_cur_lvl.value)).collect() + if debug: + partial.values().map(lambda bucket: bucket.print_debug(b_topk.value)).collect() + top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha) + print("Level " + str(cur_lvl) + " had " + str( + len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 1)) + + " candidates but after pruning only " + str(len(prev_level)) + " go to the next level") + print("Program stopped at level " + str(cur_lvl)) + b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level) + levels.append(b_cur_lvl_nodes) + cur_lvl = b_cur_lvl.value + 1 + print() + print("Selected slices are: ") + b_topk.value.print_topk() + return None diff --git a/scripts/staging/slicing/tests/__init__.py b/scripts/staging/slicing/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/staging/slicing/tests/classification/__init__.py b/scripts/staging/slicing/tests/classification/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/staging/slicing/tests/classification/sparked_adults.py b/scripts/staging/slicing/tests/classification/sparked_adults.py new file mode 100644 index 0000000..9c37fef --- /dev/null +++ b/scripts/staging/slicing/tests/classification/sparked_adults.py @@ -0,0 +1,118 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark import SparkConf, SparkContext +from pyspark.ml import Pipeline +from pyspark.ml.classification import RandomForestClassifier +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler +from pyspark.sql import SQLContext +import pyspark.sql.functions as sf + +from slicing.sparked import sparked_utils, sparked_slicer, sparked_union_slicer + + +ten_binner = sf.udf(lambda arg: int(arg / 10)) +fnlwgt_binner = sf.udf(lambda arg: int(arg // 100000)) +edu_binner = sf.udf(lambda arg: int(arg / 5)) +cap_gain_binner = sf.udf(lambda arg: int(arg / 1000)) + +conf = SparkConf().setAppName("adults_test").setMaster('local[2]') +num_partitions = 2 +model_type = "classification" +label = 'Income' +sparkContext = SparkContext(conf=conf) +sqlContext = SQLContext(sparkContext) +dataset_df = sqlContext.read.csv('/slicing/datasets/adult.csv', header='true', inferSchema='true') +# initializing stages of main transformation pipeline +stages = [] +dataset_df = dataset_df.withColumn('age_bin', ten_binner(dataset_df['Age'])) +dataset_df = dataset_df.withColumn('fnlwgt_bin', fnlwgt_binner(dataset_df['fnlwgt'])) +dataset_df = dataset_df.withColumn('edu_num_bin', edu_binner(dataset_df['EducationNum'])) +dataset_df = dataset_df.withColumn('cap_gain_bin', cap_gain_binner(dataset_df['CapitalGain'])) +dataset_df = dataset_df.withColumn('hours_per_w_bin', ten_binner(dataset_df['HoursPerWeek'])) +dataset_df = dataset_df.withColumn('model_type', sf.lit(1)) + +# list of categorical features for further hot-encoding +cat_features = ["age_bin", "WorkClass", "fnlwgt_bin", "Education", "edu_num_bin", + "MaritalStatus", "Occupation", "Relationship", "Race", "Gender", + "cap_gain_bin", "CapitalLoss", "hours_per_w_bin", "NativeCountry"] + +# hot encoding categorical features +for feature in cat_features: + string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index") + encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"]) + encoder.setDropLast(False) + stages += [string_indexer, encoder] +assembler_inputs = [feature + "_vec" for feature in cat_features] +assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") +stages += [assembler] +assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features") +label_indexer = StringIndexer(inputCol=label, outputCol=label+"_idx") +stages += [assembler_final] +stages += [label_indexer] +pipeline = Pipeline(stages=stages) +pipeline_model = pipeline.fit(dataset_df) +dataset_transformed = pipeline_model.transform(dataset_df) +cat_dict = [] +decode_dict = {} +counter = 0 +cat = 0 +for feature in cat_features: + colIdx = dataset_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap() + colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])} + for item in colIdx: + decode_dict[counter] = (cat, item, colIdx[item]) + counter = counter + 1 + cat = cat + 1 +df_transform_fin = dataset_transformed.select('features', label+"_idx", 'model_type') +splits = df_transform_fin.randomSplit([0.8, 0.2], seed=1234) +train_df = splits[0] +test_df = splits[1] +rf = RandomForestClassifier(featuresCol='features', labelCol=label+"_idx", numTrees=10) +rf_model = rf.fit(train_df) +predictions = rf_model.transform(test_df) +# Select example rows to display. +predictions.select("features", label+"_idx", "prediction", "model_type") +# Select (prediction, true label) and compute test error +evaluator = MulticlassClassificationEvaluator( + labelCol=label+"_idx", predictionCol="prediction", metricName="accuracy") +accuracy = evaluator.evaluate(predictions) +loss = 1.0 - accuracy +pred_df_fin = predictions.withColumn('error', sparked_utils.calc_loss(predictions[label+"_idx"], + predictions['prediction'], + predictions['model_type'])) +predictions = pred_df_fin.select('features', 'error').repartition(num_partitions) +all_features = range(predictions.toPandas().values[0][0].size) +predictions = predictions.collect() +k = 10 +SparkContext.broadcast(sparkContext, loss) +SparkContext.broadcast(sparkContext, predictions) +SparkContext.broadcast(sparkContext, all_features) +SparkContext.broadcast(sparkContext, decode_dict) +enumerator = "join" +if enumerator == "join": + sparked_slicer.parallel_process(all_features, predictions, loss, sparkContext, debug=True, alpha=6, k=k, w=0.5, + loss_type=0, enumerator="join") +elif enumerator == "union": + sparked_union_slicer.process(all_features, predictions, loss, sparkContext, debug=True, alpha=4, k=k, w=0.5, loss_type=0, + enumerator="union") + diff --git a/scripts/staging/slicing/tests/classification/test_adult.py b/scripts/staging/slicing/tests/classification/test_adult.py new file mode 100644 index 0000000..9575592 --- /dev/null +++ b/scripts/staging/slicing/tests/classification/test_adult.py @@ -0,0 +1,121 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys + +import pandas as pd +from sklearn.preprocessing import OneHotEncoder + +from slicing.base import slicer as slicer, union_slicer +from sklearn.ensemble import RandomForestClassifier +from sklearn import preprocessing +from sklearn.model_selection import train_test_split + + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 4 + b_update = True + debug = True + loss_type = 1 + enumerator = "union" + dataset = pd.read_csv('/slicing/datasets/adult.csv') + attributes_amount = len(dataset.values[0]) + x = dataset.iloc[:, 0:attributes_amount - 1].values + y = dataset.iloc[:, attributes_amount - 1] + le = preprocessing.LabelEncoder() + le.fit(y) + y = le.transform(y) + complete_x = [] + complete_y = [] + counter = 0 + all_indexes = [] + not_encoded_columns = [ + "Age", "WorkClass", "fnlwgt", "Education", "EducationNum", + "MaritalStatus", "Occupation", "Relationship", "Race", "Gender", + "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income" + ] + for row in x: + row[0] = int(row[0] / 10) + row[2] = int(row[2]) // 100000 + row[4] = int(row[4] / 5) + row[10] = int(row[10] / 1000) + row[12] = int(row[12] / 10) + enc = OneHotEncoder(handle_unknown='ignore') + x = enc.fit_transform(x).toarray() + all_features = enc.get_feature_names() + x_size = len(complete_x) + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=0) + for item in x_test: + complete_x.append((counter, item)) + complete_y.append((counter, y_test[counter])) + counter = counter + 1 + x_size = counter + clf = RandomForestClassifier(n_jobs=2, random_state=0) + clf.fit(x_train, y_train) + RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini', + max_depth=None, max_features='auto', max_leaf_nodes=None, + min_impurity_split=1e-07, min_samples_leaf=1, + min_samples_split=2, min_weight_fraction_leaf=0.0, + n_estimators=10, n_jobs=2, oob_score=False, random_state=0, + verbose=0, warm_start=False) + + # alpha is size significance coefficient + # verbose option is for returning debug info while creating slices and printing it + # k is number of top-slices we want + # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function + # loss_type = 0 (l2 in case of regression model + # loss_type = 1 (cross entropy in case of classification model) + preds = clf.predict(x_test) + predictions = [] + counter = 0 + mistakes = 0 + for pred in preds: + predictions.append((counter, pred)) + if y_test[counter] != pred: + mistakes = mistakes + 1 + counter = counter + 1 + lossF = mistakes / counter + + # enumerator <union>/<join> indicates an approach of next level slices combination process: + # in case of <join> in order to create new node of current level slicer + # combines only nodes of previous layer with each other + # <union> case implementation is based on DPSize algorithm + if enumerator == "join": + slicer.process(all_features, complete_x, lossF, x_size, complete_y, predictions, debug=debug, alpha=alpha, k=k, + w=w, loss_type=loss_type, b_update=b_update) + elif enumerator == "union": + union_slicer.process(all_features, complete_x, lossF, x_size, complete_y, predictions, debug=debug, alpha=alpha, + k=k, w=w, loss_type=loss_type, b_update=b_update) diff --git a/scripts/staging/slicing/tests/classification/test_iris.py b/scripts/staging/slicing/tests/classification/test_iris.py new file mode 100644 index 0000000..2bd0c09 --- /dev/null +++ b/scripts/staging/slicing/tests/classification/test_iris.py @@ -0,0 +1,109 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys + +import pandas as pd +from sklearn.preprocessing import OneHotEncoder + +from slicing.base import slicer as slicer, union_slicer +from sklearn.ensemble import RandomForestClassifier +from sklearn import preprocessing +from sklearn.model_selection import train_test_split + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 6 + b_update = True + debug = True + loss_type = 1 + enumerator = "join" + dataset = pd.read_csv('/slicing/datasets/iris.csv') + attributes_amount = len(dataset.values[0]) + x = dataset.iloc[:, 0:attributes_amount - 1].values + enc = OneHotEncoder(handle_unknown='ignore') + x = enc.fit_transform(x).toarray() + y = dataset.iloc[:, attributes_amount - 1] + le = preprocessing.LabelEncoder() + le.fit(y) + y = le.transform(y) + complete_x = [] + complete_y = [] + counter = 0 + all_indexes = [] + all_features = enc.get_feature_names() + x_size = len(complete_x) + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0) + for item in x_test: + complete_x.append((counter, item)) + complete_y.append((counter, y_test[counter])) + counter = counter + 1 + x_size = counter + clf = RandomForestClassifier(n_jobs=2, random_state=0) + clf.fit(x_train, y_train) + RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini', + max_depth=None, max_features='auto', max_leaf_nodes=None, + min_impurity_split=1e-07, min_samples_leaf=1, + min_samples_split=2, min_weight_fraction_leaf=0.0, + n_estimators=10, n_jobs=2, oob_score=False, random_state=0, + verbose=0, warm_start=False) + + # alpha is size significance coefficient + # verbose option is for returning debug info while creating slices and printing it + # k is number of top-slices we want + # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function + # loss_type = 0 (l2 in case of regression model + # loss_type = 1 (cross entropy in case of classification model) + preds = clf.predict(x_test) + predictions = [] + counter = 0 + mistakes = 0 + for pred in preds: + predictions.append((counter, pred)) + if y_test[counter] != pred: + mistakes = mistakes + 1 + counter = counter + 1 + lossF = mistakes / counter + + # enumerator <union>/<join> indicates an approach of next level slices combination process: + # in case of <join> in order to create new node of current level slicer + # combines only nodes of previous layer with each other + # <union> case implementation is based on DPSize algorithm + if enumerator == "join": + slicer.process(all_features, complete_x, lossF, x_size, complete_y, predictions, debug=debug, alpha=alpha, k=k, + w=w, loss_type=1, b_update=b_update) + elif enumerator == "union": + union_slicer.process(all_features, complete_x, lossF, x_size, complete_y, predictions, debug=debug, alpha=alpha, k=k, + w=w, loss_type=loss_type, b_update=b_update) diff --git a/scripts/staging/slicing/tests/regression/__init__.py b/scripts/staging/slicing/tests/regression/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py b/scripts/staging/slicing/tests/regression/bd_spark_salary.py new file mode 100644 index 0000000..c32414b --- /dev/null +++ b/scripts/staging/slicing/tests/regression/bd_spark_salary.py @@ -0,0 +1,131 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys + +from pyspark import SparkConf, SparkContext +from pyspark.ml import Pipeline +from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString +from pyspark.ml.regression import LinearRegression +from pyspark.sql import SQLContext +import pyspark.sql.functions as sf +from pyspark.sql.functions import udf +from sklearn.model_selection import train_test_split + +from slicing.spark_modules import spark_utils, join_data_parallel, union_data_parallel + +binner = udf(lambda arg: int(int(arg) // 5)) + + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 6 + b_update = True + debug = True + loss_type = 0 + enumerator = "union" + + conf = SparkConf().setAppName("salary_test").setMaster('local[2]') + num_partitions = 2 + model_type = "regression" + label = 'salary' + sparkContext = SparkContext(conf=conf) + sqlContext = SQLContext(sparkContext) + fileRDD = sparkContext.textFile('salaries.csv', num_partitions) + header = fileRDD.first() + head_split = header.split(",") + head_split[0] = '_c0' + fileRDD = fileRDD.filter(lambda line: line != header) + data = fileRDD.map(lambda row: row.split(",")) + dataset_df = sqlContext.createDataFrame(data, head_split) + + cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"] + # initializing stages of main transformation pipeline + stages = [] + dataset_df = dataset_df.drop('_c0') + dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id()) + # bining numeric features by local binner udf function (specified for current dataset if needed) + dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd'])) + dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service'])) + dataset_df = dataset_df.withColumn('model_type', sf.lit(0)) + dataset_df = dataset_df.drop('sincephd', 'service') + dataset_df = dataset_df.withColumn('target', dataset_df[label].cast("int")) + # hot encoding categorical features + for feature in cat_features: + string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index") + encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"]) + encoder.setDropLast(False) + stages += [string_indexer, encoder] + assembler_inputs = [feature + "_vec" for feature in cat_features] + assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") + stages += [assembler] + assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features") + stages += [assembler_final] + + pipeline = Pipeline(stages=stages) + pipeline_model = pipeline.fit(dataset_df) + dataset_transformed = pipeline_model.transform(dataset_df) + df_transform_fin = dataset_transformed.select('id', 'features', 'target', 'model_type').toPandas() + + cat = 0 + counter = 0 + decode_dict = {} + for feature in cat_features: + colIdx = dataset_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap() + colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])} + for item in colIdx: + decode_dict[counter] = (cat, item, colIdx[item]) + counter = counter + 1 + cat = cat + 1 + + train, test = train_test_split(df_transform_fin, test_size=0.3, random_state=0) + train_df = sqlContext.createDataFrame(train) + test_df = sqlContext.createDataFrame(test) + lr = LinearRegression(featuresCol='features', labelCol='target', maxIter=10, regParam=0.3, elasticNetParam=0.8) + lr_model = lr.fit(train_df) + eval = lr_model.evaluate(test_df) + f_l2 = eval.meanSquaredError + pred = eval.predictions + pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type'])) + predictions = pred_df_fin.select('id', 'features', 'error').repartition(num_partitions) + converter = IndexToString(inputCol='features', outputCol='cats') + all_features = range(predictions.toPandas().values[1][1].size) + k = 10 + if enumerator == "join": + join_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, + k=k, w=w, loss_type=loss_type) + elif enumerator == "union": + union_data_parallel.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, + k=k, w=w, loss_type=loss_type) diff --git a/scripts/staging/slicing/tests/regression/spark_salary.py b/scripts/staging/slicing/tests/regression/spark_salary.py new file mode 100644 index 0000000..52d0cf2 --- /dev/null +++ b/scripts/staging/slicing/tests/regression/spark_salary.py @@ -0,0 +1,123 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys + +from pyspark import SparkConf, SparkContext +from pyspark.ml import Pipeline +from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString +from pyspark.ml.regression import LinearRegression +from pyspark.sql import SQLContext +import pyspark.sql.functions as sf +from pyspark.sql.functions import udf +from sklearn.model_selection import train_test_split + +from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer + + +binner = udf(lambda arg: int(arg / 5)) + + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 6 + b_update = True + debug = True + loss_type = 0 + enumerator = "join" + + conf = SparkConf().setAppName("salary_test").setMaster('local[2]') + num_partitions = 2 + model_type = "regression" + label = 'salary' + sparkContext = SparkContext(conf=conf) + sqlContext = SQLContext(sparkContext) + dataset_df = sqlContext.read.csv('salaries.csv', header='true', inferSchema='true') + # initializing stages of main transformation pipeline + stages = [] + # list of categorical features for further hot-encoding + cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"] + # removing column with ID field + dataset_df = dataset_df.drop('_c0') + # bining numeric features by local binner udf function (specified for current dataset if needed) + dataset_df = dataset_df.withColumn('sincephd_bin', binner(dataset_df['sincephd'])) + dataset_df = dataset_df.withColumn('service_bin', binner(dataset_df['service'])) + dataset_df = dataset_df.withColumn('model_type', sf.lit(0)) + dataset_df = dataset_df.drop('sincephd', 'service') + # hot encoding categorical features + for feature in cat_features: + string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index") + encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"]) + encoder.setDropLast(False) + stages += [string_indexer, encoder] + assembler_inputs = [feature + "_vec" for feature in cat_features] + assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") + stages += [assembler] + assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features") + stages += [assembler_final] + pipeline = Pipeline(stages=stages) + pipeline_model = pipeline.fit(dataset_df) + dataset_transformed = pipeline_model.transform(dataset_df) + df_transform_fin = dataset_transformed.select('features', label, 'model_type').toPandas() + train, test = train_test_split(df_transform_fin, test_size=0.3, random_state=0) + train_df = sqlContext.createDataFrame(train) + test_df = sqlContext.createDataFrame(test) + decode_dict = {} + counter = 0 + cat = 0 + for feature in cat_features: + colIdx = dataset_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap() + colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])} + for item in colIdx: + decode_dict[counter] = (cat, item, colIdx[item]) + counter = counter + 1 + cat = cat + 1 + lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10, regParam=0.3, elasticNetParam=0.8) + lr_model = lr.fit(train_df) + eval = lr_model.evaluate(test_df) + f_l2 = eval.meanSquaredError + pred = eval.predictions + pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type'])) + predictions = pred_df_fin.select('features', 'error').repartition(num_partitions) + converter = IndexToString(inputCol='features', outputCol='cats') + all_features = range(predictions.toPandas().values[0][0].size) + predictions = predictions.collect() + k = 10 + if enumerator == "join": + spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, k=k, w=w, + loss_type=loss_type, enumerator=enumerator) + elif enumerator == "union": + spark_union_slicer.process(all_features, predictions, f_l2, sparkContext, debug=debug, alpha=alpha, k=k, w=w, + loss_type=loss_type, enumerator=enumerator) diff --git a/scripts/staging/slicing/tests/regression/test_insurance.py b/scripts/staging/slicing/tests/regression/test_insurance.py new file mode 100644 index 0000000..64b58c4 --- /dev/null +++ b/scripts/staging/slicing/tests/regression/test_insurance.py @@ -0,0 +1,103 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys + +import pandas as pd +from sklearn.linear_model import LinearRegression +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import OneHotEncoder + +from slicing.base import slicer as slicer, union_slicer + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 4 + b_update = True + debug = True + loss_type = 0 + enumerator = "union" + file_name = '/slicing/datasets/insurance.csv' + dataset = pd.read_csv(file_name) + attributes_amount = len(dataset.values[0]) + # for now working with regression datasets, assuming that target attribute is the last one + # currently non-categorical features are not supported and should be binned + y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values + # starting with one not including id field + x = dataset.iloc[:, 0:attributes_amount - 1].values + # list of numerical columns + non_categorical = [1, 3] + for row in x: + for attribute in non_categorical: + # <attribute - 2> as we already excluded from x id column + row[attribute - 1] = int(row[attribute - 1] / 5) + # hot encoding of categorical features + enc = OneHotEncoder(handle_unknown='ignore') + x = enc.fit_transform(x).toarray() + complete_x = [] + complete_y = [] + counter = 0 + all_features = enc.get_feature_names() + # train model on a whole dataset + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0) + for item in x_test: + complete_x.append((counter, item)) + complete_y.append((counter, y_test[counter])) + counter = counter + 1 + x_size = counter + model = LinearRegression() + model.fit(x_train, y_train) + preds = (model.predict(x_test) - y_test) ** 2 + f_l2 = sum(preds)/x_size + errors = [] + counter = 0 + for pred in preds: + errors.append((counter, pred)) + counter = counter + 1 + # alpha is size significance coefficient + # verbose option is for returning debug info while creating slices and printing it + # k is number of top-slices we want + # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function + + # enumerator <union>/<join> indicates an approach of next level slices combination process: + # in case of <join> in order to create new node of current level slicer + # combines only nodes of previous layer with each other + # <union> case implementation is based on DPSize algorithm + if enumerator == "join": + slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, debug=debug, alpha=alpha, k=k, + w=w, loss_type=loss_type, b_update=b_update) + elif enumerator == "union": + union_slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, debug=debug, alpha=alpha, k=k, + w=w, loss_type=loss_type, b_update=b_update) diff --git a/scripts/staging/slicing/tests/regression/test_salary.py b/scripts/staging/slicing/tests/regression/test_salary.py new file mode 100644 index 0000000..92e0055 --- /dev/null +++ b/scripts/staging/slicing/tests/regression/test_salary.py @@ -0,0 +1,104 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import pandas as pd +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import OneHotEncoder +import sys + +from slicing.base import slicer, union_slicer + +if __name__ == "__main__": + args = sys.argv + if len(args) > 1: + k = int(args[1]) + w = float(args[2].replace(',', '.')) + alpha = int(args[3]) + if args[4] == "True": + b_update = True + else: + b_update = False + debug = args[5] + loss_type = int(args[6]) + enumerator = args[7] + else: + k = 10 + w = 0.5 + alpha = 4 + b_update = True + debug = True + loss_type = 0 + enumerator = "union" + file_name = '/slicing/datasets/salaries.csv' + dataset = pd.read_csv(file_name) + attributes_amount = len(dataset.values[0]) + # for now working with regression datasets, assuming that target attribute is the last one + # currently non-categorical features are not supported and should be binned + y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values + # starting with one not including id field + x = dataset.iloc[:, 1:attributes_amount - 1].values + # list of numerical columns + non_categorical = [4, 5] + for row in x: + for attribute in non_categorical: + # <attribute - 2> as we already excluded from x id column + row[attribute - 2] = int(row[attribute - 2] / 5) + # hot encoding of categorical features + enc = OneHotEncoder(handle_unknown='ignore') + x = enc.fit_transform(x).toarray() + complete_x = [] + complete_y = [] + counter = 0 + all_features = enc.get_feature_names() + # train model on a whole dataset + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=0) + for item in x_test: + complete_x.append((counter, item)) + complete_y.append((counter, y_test[counter])) + counter = counter + 1 + x_size = counter + model = LinearRegression() + model.fit(x_train, y_train) + predictions = model.predict(x_test) + f_l2 = mean_squared_error(y_test, predictions) + preds = (model.predict(x_test) - y_test) ** 2 + errors = [] + counter = 0 + for pred in preds: + errors.append((counter, pred)) + counter = counter + 1 + # alpha is size significance coefficient + # verbose option is for returning debug info while creating slices and printing it + # k is number of top-slices we want + # w is a weight of error function significance (1 - w) is a size significance propagated into optimization function + + # enumerator <union>/<join> indicates an approach of next level slices combination process: + # in case of <join> in order to create new node of current level slicer + # combines only nodes of previous layer with each other + # <union> case implementation is based on DPSize algorithm + if enumerator == "join": + slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, debug=debug, alpha=alpha, k=k, w=w, + loss_type=loss_type, b_update=b_update) + elif enumerator == "union": + union_slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, debug=debug, alpha=alpha, k=k, w=w, + loss_type=loss_type, b_update=b_update)