This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cbc85a  [SYSTEMDS-253] Distributed slice finding (task/data parallel, 
fixes)
8cbc85a is described below

commit 8cbc85a949b3699cde8ed3cf3e3abec6a27fbc60
Author: gilgenbergg <lanasv...@gmail.com>
AuthorDate: Sun May 3 17:17:57 2020 +0200

    [SYSTEMDS-253] Distributed slice finding (task/data parallel, fixes)
    
    Closes #881.
---
 docs/Tasks.txt                                     |   3 +-
 scripts/staging/hmm/HMM.py                         |   2 -
 scripts/staging/slicing/__init__.py                |   0
 scripts/staging/slicing/base/Bucket.py             | 168 +++++++++++++++++++++
 .../staging/slicing/base/{node.py => SparkNode.py} |  79 ++++++----
 scripts/staging/slicing/base/__init__.py           |   0
 scripts/staging/slicing/base/node.py               |  28 +++-
 scripts/staging/slicing/base/slicer.py             | 135 +++++++++--------
 .../base/tests/classification/test_adult.py        | 101 -------------
 .../slicing/base/tests/classification/test_iris.py |  88 -----------
 .../base/tests/regression/test_insurance.py        |  81 ----------
 .../slicing/base/tests/regression/test_salary.py   |  87 -----------
 scripts/staging/slicing/base/top_k.py              |   7 +-
 scripts/staging/slicing/base/union_slicer.py       |  78 ++++------
 .../slicing/spark_modules/join_data_parallel.py    | 120 +++++++++++++++
 .../staging/slicing/spark_modules/spark_slicer.py  | 100 ++++++++++++
 .../slicing/spark_modules/spark_union_slicer.py    |  70 +++++++++
 .../staging/slicing/spark_modules/spark_utils.py   | 141 +++++++++++++++++
 .../slicing/spark_modules/union_data_parallel.py   | 119 +++++++++++++++
 scripts/staging/slicing/tests/__init__.py          |   0
 .../slicing/tests/classification/__init__.py       |   0
 .../slicing/tests/classification/sparked_adults.py | 118 +++++++++++++++
 .../slicing/tests/classification/test_adult.py     | 121 +++++++++++++++
 .../slicing/tests/classification/test_iris.py      | 109 +++++++++++++
 .../staging/slicing/tests/regression/__init__.py   |   0
 .../slicing/tests/regression/bd_spark_salary.py    | 131 ++++++++++++++++
 .../slicing/tests/regression/spark_salary.py       | 123 +++++++++++++++
 .../slicing/tests/regression/test_insurance.py     | 103 +++++++++++++
 .../slicing/tests/regression/test_salary.py        | 104 +++++++++++++
 29 files changed, 1717 insertions(+), 499 deletions(-)

diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 9d2dba4..9fa9a6f 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -203,7 +203,8 @@ SYSTEMDS-240 GPU Backend Improvements
 
 SYSTEMDS-250 Extended Slice Finding
  * 251 Alternative slice enumeration approach                         OK
- * 252 Initial data slicing implementation Python
+ * 252 Initial data slicing implementation Python                     OK
+ * 253 Distributed slicing algorithms (task/data parallel)            OK
 
 SYSTEMDS-260 Misc Tools
  * 261 Stable marriage algorithm                                      OK
diff --git a/scripts/staging/hmm/HMM.py b/scripts/staging/hmm/HMM.py
index 61fa0d0..d9eb187 100644
--- a/scripts/staging/hmm/HMM.py
+++ b/scripts/staging/hmm/HMM.py
@@ -19,8 +19,6 @@
 #
 #-------------------------------------------------------------
 
-#Author: Afan Secic
-
 from bs4 import BeautifulSoup,SoupStrainer
 import nltk
 from nltk.tokenize import sent_tokenize, word_tokenize
diff --git a/scripts/staging/slicing/__init__.py 
b/scripts/staging/slicing/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/base/Bucket.py 
b/scripts/staging/slicing/base/Bucket.py
new file mode 100644
index 0000000..0277f6d
--- /dev/null
+++ b/scripts/staging/slicing/base/Bucket.py
@@ -0,0 +1,168 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+class Bucket:
+
+    key: []
+    attributes: []
+    name: ""
+    error: float
+    e_upper: float
+    size: float
+    sum_error: float
+    max_tuple_error: float
+    s_upper: float
+    s_lower: float
+    e_max: float
+    e_max_upper: float
+    score: float
+    c_upper: float
+    parents: []
+    x_size: int
+    loss: float
+    w: float
+
+    def __init__(self, node, cur_lvl, w, x_size, loss):
+        self.attributes = []
+        self.parents = []
+        self.sum_error = 0
+        self.size = 0
+        self.score = 0
+        self.error = 0
+        self.max_tuple_error = 0
+        self.x_size = x_size
+        self.w = w
+        self.loss = loss
+        if cur_lvl == 0:
+            self.key = node
+            self.attributes.append(node)
+        else:
+            self.attributes = node.attributes
+            self.key = node.attributes
+        self.name = self.make_name()
+        self.__hash__()
+
+    def __hash__(self):
+        return hash(self.name)
+
+    def __add__(self, other):
+        self.size += other.size
+        self.sum_error += other.sum_error
+        return self
+
+    def combine_with(self, other):
+        self.size = max(self.size, other.size)
+        self.sum_error = max(self.sum_error, other.sum_error)
+        return self
+
+    def minimize_bounds(self, other):
+        minimized = min(self.s_upper, other.s_upper)
+        self.s_upper = minimized
+        minimized = min(other.s_lower, self.s_lower)
+        self.s_lower = minimized
+        minimized = min(other.e_upper, self.e_upper)
+        self.e_upper = minimized
+        minimized = min(other.e_max_upper, self.e_max_upper)
+        self.e_max_upper = minimized
+        c_upper = self.calc_c_upper(self.w, self.x_size, self.loss)
+        minimized = min(c_upper, self.c_upper)
+        self.c_upper = minimized
+
+    def __eq__(self, other):
+        return (
+            self.__hash__ == other.__hash__ and self.key == other.key)
+
+    def update_metrics(self, row, loss_type):
+        self.size += 1
+        if loss_type == 0:
+            self.sum_error += row[2]
+            if row[2] > self.max_tuple_error:
+                self.max_tuple_error = row[2]
+        else:
+            if row[2] != 0:
+                self.sum_error += 1
+
+    def calc_error(self):
+        if self.size != 0:
+            self.error = self.sum_error / self.size
+        else:
+            self.error = 0
+        self.e_max = self.max_tuple_error
+        self.e_max_upper = self.e_max
+        self.e_upper = self.error
+
+    def check_constraint(self, top_k, x_size, alpha):
+        return self.score >= top_k.min_score and self.size >= x_size / alpha
+
+    def make_name(self):
+        name = ""
+        for attribute in self.attributes:
+            name = name + str(attribute) + " && "
+        name = name[0: len(name) - 4]
+        return name
+
+    def calc_bounds(self, w, x_size, loss):
+        self.s_upper = self.calc_s_upper()
+        self.s_lower = self.calc_s_lower(x_size)
+        self.e_upper = self.calc_e_upper()
+        self.e_max_upper = self.calc_e_max_upper()
+        self.c_upper = self.calc_c_upper(w, x_size, loss)
+
+    def check_bounds(self, x_size, alpha, top_k):
+        return self.s_upper >= x_size / alpha and self.c_upper >= 
top_k.min_score
+
+    def calc_s_upper(self):
+        cur_min = self.parents[0].size
+        for parent in self.parents:
+            cur_min = min(cur_min, parent.s_upper)
+        return cur_min
+
+    def calc_e_max_upper(self):
+        e_max_min = self.parents[0].e_max_upper
+        for parent in self.parents:
+            e_max_min = min(e_max_min, parent.e_max_upper)
+        return e_max_min
+
+    def calc_s_lower(self, x_size):
+        size_value = x_size
+        for parent in self.parents:
+            size_value = size_value - (size_value - parent.s_lower)
+        return max(size_value, 1)
+
+    def calc_e_upper(self):
+        prev_e_uppers = []
+        for parent in self.parents:
+            prev_e_uppers.append(parent.e_upper)
+        return float(min(prev_e_uppers))
+
+    def calc_c_upper(self, w, x_size, loss):
+        upper_score = w * (self.e_upper / self.s_lower) / (loss / x_size) + (
+                    1 - w) * self.s_upper
+        return float(upper_score)
+
+    def print_debug(self, topk):
+        print("new node has been created: " + self.make_name() + "\n")
+        print("s_upper = " + str(self.s_upper))
+        print("s_lower = " + str(self.s_lower))
+        print("e_upper = " + str(self.e_upper))
+        print("c_upper = " + str(self.c_upper))
+        print("current topk min score = " + str(topk.min_score))
+        
print("-------------------------------------------------------------------------------------")
diff --git a/scripts/staging/slicing/base/node.py 
b/scripts/staging/slicing/base/SparkNode.py
similarity index 70%
copy from scripts/staging/slicing/base/node.py
copy to scripts/staging/slicing/base/SparkNode.py
index 6acd65a..fbaa0bd 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/SparkNode.py
@@ -19,7 +19,7 @@
 #
 #-------------------------------------------------------------
 
-class Node:
+class SparkNode:
     error: float
     name: ""
     attributes: []
@@ -36,34 +36,33 @@ class Node:
     e_max_upper: float
     key: ""
 
-    def __init__(self, all_features, model, complete_x, loss, x_size, y_test, 
preds):
-        self.error = loss,
+    def __init__(self, loss, preds):
+        if loss:
+            self.error = loss,
+        if preds:
+            self.preds = preds
         self.parents = []
         self.attributes = []
         self.size = 0
         self.score = 0
-        self.model = model
-        self.complete_x = complete_x
         self.loss = 0
-        self.x_size = x_size
-        self.preds = preds
         self.s_lower = 1
-        self.y_test = y_test
-        self.all_features = all_features
         self.key = ''
 
     def calc_c_upper(self, w):
-        upper_score = w * (self.e_upper / self.s_lower) / 
(float(self.error[0]) / self.x_size) + (1 - w) * self.s_upper
+        upper_score = w * (self.e_upper / self.s_lower) / (self.error[0] / 
self.preds[0][0].size) + (1 - w) * self.s_upper
         return float(upper_score)
 
     def make_slice_mask(self):
         mask = []
         for feature in self.attributes:
-            mask.append(feature[1])
+            mask.append(feature)
         return mask
 
     def process_slice(self, loss_type):
         mask = self.make_slice_mask()
+        print("mask")
+        print(mask)
         if loss_type == 0:
             self.calc_l2(mask)
         if loss_type == 1:
@@ -73,15 +72,15 @@ class Node:
         self.e_max = 1
         size = 0
         mistakes = 0
-        for row in self.complete_x:
+        for row in self.preds:
             flag = True
             for attr in mask:
-                if row[1][attr] == 0:
+                if attr not in row[0].indices:
                     flag = False
             if flag:
                 size = size + 1
-                if self.y_test[row[0]][1] != self.preds[row[0]][1]:
-                    mistakes = mistakes + 1
+                if row[1] == 0:
+                    mistakes += 1
         self.size = size
         if size != 0:
             self.loss = mistakes / size
@@ -93,23 +92,25 @@ class Node:
         max_tuple_error = 0
         sum_error = 0
         size = 0
-        for row in self.complete_x:
+        for row in self.preds:
             flag = True
             for attr in mask:
-                if row[1][attr] == 0:
+                if attr not in row[0].indices:
                     flag = False
             if flag:
                 size = size + 1
-                if float(self.preds[row[0]][1]) > max_tuple_error:
-                    max_tuple_error = float(self.preds[row[0]][1])
-                sum_error = sum_error + float(self.preds[row[0]][1])
+                if row[1] > max_tuple_error:
+                    max_tuple_error = row[1]
+                sum_error = sum_error + row[1]
         self.e_max = max_tuple_error
         self.e_upper = max_tuple_error
+        self.e_max_upper = max_tuple_error
         if size != 0:
             self.loss = sum_error/size
         else:
             self.loss = 0
         self.size = size
+        self.s_upper = size
 
     def calc_s_upper(self, cur_lvl):
         cur_min = self.parents[0].size
@@ -132,10 +133,10 @@ class Node:
                 e_max_min = min(e_max_min, parent.e_max_upper)
         return e_max_min
 
-    def calc_s_lower(self, cur_lvl):
-        size_value = self.x_size
+    def calc_s_lower(self):
+        size_value = len(self.preds)
         for parent in self.parents:
-            size_value = size_value - (self.x_size - parent.s_lower)
+            size_value = size_value - (size_value - parent.s_lower)
         return max(size_value, 1)
 
     def calc_e_upper(self):
@@ -146,7 +147,7 @@ class Node:
 
     def calc_bounds(self, cur_lvl, w):
         self.s_upper = self.calc_s_upper(cur_lvl)
-        self.s_lower = self.calc_s_lower(cur_lvl)
+        self.s_lower = self.calc_s_lower()
         self.e_upper = self.calc_e_upper()
         self.e_max_upper = self.calc_e_max_upper(cur_lvl)
         self.c_upper = self.calc_c_upper(w)
@@ -154,12 +155,12 @@ class Node:
     def make_name(self):
         name = ""
         for attribute in self.attributes:
-            name = name + str(attribute[0]) + " && "
+            name = name + str(attribute) + " && "
         name = name[0: len(name) - 4]
         return name
 
-    def make_key(self, new_id):
-        return new_id, self.name
+    def make_key(self):
+        return self.name
 
     def check_constraint(self, top_k, x_size, alpha):
         return self.score >= top_k.min_score and self.size >= x_size / alpha
@@ -167,6 +168,30 @@ class Node:
     def check_bounds(self, top_k, x_size, alpha):
         return self.s_upper >= x_size / alpha and self.c_upper >= 
top_k.min_score
 
+    def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
+        try:
+            minimized = min(s_upper, self.s_upper)
+            self.s_upper = minimized
+            minimized = min(s_lower, self.s_lower)
+            self.s_lower = minimized
+            minimized = min(e_upper, self.e_upper)
+            self.e_upper = minimized
+            minimized = min(e_max_upper, self.e_max_upper)
+            self.e_max_upper = minimized
+            c_upper = self.calc_c_upper(w)
+            minimized = min(c_upper, self.c_upper)
+            self.c_upper = minimized
+        except AttributeError:
+            # initial bounds calculation
+            self.s_upper = s_upper
+            self.s_lower = s_lower
+            self.e_upper = e_upper
+            self.e_max_upper = e_max_upper
+            c_upper = self.calc_c_upper(w)
+            self.c_upper = c_upper
+        minimized = min(c_upper, self.c_upper)
+        self.c_upper = minimized
+
     def print_debug(self, topk, level):
         print("new node has been created: " + self.make_name() + "\n")
         if level >= 1:
diff --git a/scripts/staging/slicing/base/__init__.py 
b/scripts/staging/slicing/base/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/base/node.py 
b/scripts/staging/slicing/base/node.py
index 6acd65a..33091a5 100644
--- a/scripts/staging/slicing/base/node.py
+++ b/scripts/staging/slicing/base/node.py
@@ -36,20 +36,18 @@ class Node:
     e_max_upper: float
     key: ""
 
-    def __init__(self, all_features, model, complete_x, loss, x_size, y_test, 
preds):
+    def __init__(self, complete_x, loss, x_size, y_test, preds):
         self.error = loss,
         self.parents = []
         self.attributes = []
         self.size = 0
         self.score = 0
-        self.model = model
         self.complete_x = complete_x
         self.loss = 0
         self.x_size = x_size
         self.preds = preds
         self.s_lower = 1
         self.y_test = y_test
-        self.all_features = all_features
         self.key = ''
 
     def calc_c_upper(self, w):
@@ -167,6 +165,30 @@ class Node:
     def check_bounds(self, top_k, x_size, alpha):
         return self.s_upper >= x_size / alpha and self.c_upper >= 
top_k.min_score
 
+    def update_bounds(self, s_upper, s_lower, e_upper, e_max_upper, w):
+        try:
+            minimized = min(s_upper, self.s_upper)
+            self.s_upper = minimized
+            minimized = min(s_lower, self.s_lower)
+            self.s_lower = minimized
+            minimized = min(e_upper, self.e_upper)
+            self.e_upper = minimized
+            minimized = min(e_max_upper, self.e_max_upper)
+            self.e_max_upper = minimized
+            c_upper = self.calc_c_upper(w)
+            minimized = min(c_upper, self.c_upper)
+            self.c_upper = minimized
+        except AttributeError:
+            # initial bounds calculation
+            self.s_upper = s_upper
+            self.s_lower = s_lower
+            self.e_upper = e_upper
+            self.e_max_upper = e_max_upper
+            c_upper = self.calc_c_upper(w)
+            self.c_upper = c_upper
+        minimized = min(c_upper, self.c_upper)
+        self.c_upper = minimized
+
     def print_debug(self, topk, level):
         print("new node has been created: " + self.make_name() + "\n")
         if level >= 1:
diff --git a/scripts/staging/slicing/base/slicer.py 
b/scripts/staging/slicing/base/slicer.py
index 4bc3415..be967b6 100644
--- a/scripts/staging/slicing/base/slicer.py
+++ b/scripts/staging/slicing/base/slicer.py
@@ -19,8 +19,8 @@
 #
 #-------------------------------------------------------------
 
-from slicing.node import Node
-from slicing.top_k import Topk
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
 
 
 # optimization function calculation:
@@ -33,7 +33,7 @@ def opt_fun(fi, si, f, x_size, w):
 
 
 # slice_name_nonsense function defines if combination of nodes on current 
level is fine or impossible:
-# there is a dependency between common nodes' attributes number and current 
level is such that:
+# there is dependency between common nodes' attributes number and current 
level is such that:
 # commons == cur_lvl - 1
 # valid combination example: node ABC + node BCD (on 4th level) // three 
attributes nodes have two common attributes
 # invalid combination example: node ABC + CDE (on 4th level) // result node - 
ABCDE (absurd for 4th level)
@@ -51,21 +51,13 @@ def union(lst1, lst2):
     return final_list
 
 
-# alpha is size significance coefficient (required for optimization function)
-# verbose option is for returning debug info while creating slices and 
printing it (in console)
-# k is number of top-slices we want to receive as a result (maximum output, if 
less all of subsets will be printed)
-# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
-# loss_type = 0 (in case of regression model)
-# loss_type = 1 (in case of classification model)
-def process(all_features, model, complete_x, loss, x_size, y_test, errors, 
debug, alpha, k, w, loss_type):
-    counter = 0
-    # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
+def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, 
loss_type, top_k, alpha, w):
     first_level = []
-    levels = []
+    counter = 0
     all_nodes = {}
-    top_k = Topk(k)
+    # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
     for feature in all_features:
-        new_node = Node(all_features, model, complete_x, loss, x_size, y_test, 
errors)
+        new_node = Node(complete_x, loss, x_size, y_test, errors)
         new_node.parents = [(feature, counter)]
         new_node.attributes.append((feature, counter))
         new_node.name = new_node.make_name()
@@ -82,11 +74,69 @@ def process(all_features, model, complete_x, loss, x_size, 
y_test, errors, debug
             # this method updates top k slices if needed
             top_k.add_new_top_slice(new_node)
         counter = counter + 1
-    levels.append(first_level)
+    return first_level, all_nodes
+
 
+def join_enum(node_i, prev_lvl, complete_x, loss, x_size, y_test, errors, 
debug, alpha, w, loss_type, b_update, cur_lvl,
+              all_nodes, top_k, cur_lvl_nodes):
+    for node_j in range(len(prev_lvl)):
+        flag = slice_name_nonsense(prev_lvl[node_i], prev_lvl[node_j], cur_lvl)
+        if not flag and prev_lvl[node_j].key[0] > prev_lvl[node_i].key[0]:
+            new_node = Node(complete_x, loss, x_size, y_test, errors)
+            parents_set = set(new_node.parents)
+            parents_set.add(prev_lvl[node_i])
+            parents_set.add(prev_lvl[node_j])
+            new_node.parents = list(parents_set)
+            parent1_attr = prev_lvl[node_i].attributes
+            parent2_attr = prev_lvl[node_j].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_id = len(all_nodes)
+            new_node.key = new_node.make_key(new_id)
+            if new_node.key[1] in all_nodes:
+                existing_item = all_nodes[new_node.key[1]]
+                parents_set = set(existing_item.parents)
+                existing_item.parents = parents_set
+                if b_update:
+                    s_upper = new_node.calc_s_upper(cur_lvl)
+                    s_lower = new_node.calc_s_lower(cur_lvl)
+                    e_upper = new_node.calc_e_upper()
+                    e_max_upper = new_node.calc_e_max_upper(cur_lvl)
+                    new_node.update_bounds(s_upper, s_lower, e_upper, 
e_max_upper, w)
+            else:
+                new_node.calc_bounds(cur_lvl, w)
+                all_nodes[new_node.key[1]] = new_node
+                # check if concrete data should be extracted or not (only for 
those that have score upper
+                # big enough and if size of subset is big enough
+                to_slice = new_node.check_bounds(top_k, x_size, alpha)
+                if to_slice:
+                    new_node.process_slice(loss_type)
+                    new_node.score = opt_fun(new_node.loss, new_node.size, 
loss, x_size, w)
+                    # we decide to add node to current level nodes (in order 
to make new combinations
+                    # on the next one or not basing on its score value
+                    if new_node.check_constraint(top_k, x_size, alpha) and 
new_node.key not in top_k.keys:
+                        top_k.add_new_top_slice(new_node)
+                    cur_lvl_nodes.append(new_node)
+                if debug:
+                    new_node.print_debug(top_k, cur_lvl)
+    return cur_lvl_nodes, all_nodes
+
+
+# alpha is size significance coefficient (required for optimization function)
+# verbose option is for returning debug info while creating slices and 
printing it (in console)
+# k is number of top-slices we want to receive as a result (maximum output, if 
less all of subsets will be printed)
+# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
+# loss_type = 0 (in case of regression model)
+# loss_type = 1 (in case of classification model)
+def process(all_features, complete_x, loss, x_size, y_test, errors, debug, 
alpha, k, w, loss_type, b_update):
+    levels = []
+    top_k = Topk(k)
+    first_level = make_first_level(all_features, complete_x, loss, x_size, 
y_test, errors, loss_type, top_k, alpha, w)
+    all_nodes = first_level[1]
+    levels.append(first_level[0])
     # cur_lvl - index of current level, correlates with number of slice 
forming features
     cur_lvl = 1  # currently filled level after first init iteration
-
     # currently for debug
     print("Level 1 had " + str(len(all_features)) + " candidates")
     print()
@@ -95,52 +145,17 @@ def process(all_features, model, complete_x, loss, x_size, 
y_test, errors, debug
     # combining each candidate of previous level with every till it becomes 
useless (one node can't make a pair)
     while len(levels[cur_lvl - 1]) > 1:
         cur_lvl_nodes = []
-        for node_i in range(len(levels[cur_lvl - 1]) - 1):
-            for node_j in range(node_i + 1, len(levels[cur_lvl - 1])):
-                flag = slice_name_nonsense(levels[cur_lvl - 1][node_i], 
levels[cur_lvl - 1][node_j], cur_lvl)
-                if not flag:
-                    new_node = Node(all_features, model, complete_x, loss, 
x_size, y_test, errors)
-                    parents_set = set(new_node.parents)
-                    parents_set.add(levels[cur_lvl - 1][node_i])
-                    parents_set.add(levels[cur_lvl - 1][node_j])
-                    new_node.parents = list(parents_set)
-                    parent1_attr = levels[cur_lvl - 1][node_i].attributes
-                    parent2_attr = levels[cur_lvl - 1][node_j].attributes
-                    new_node_attr = union(parent1_attr, parent2_attr)
-                    new_node.attributes = new_node_attr
-                    new_node.name = new_node.make_name()
-                    new_id = len(all_nodes)
-                    new_node.key = new_node.make_key(new_id)
-                    if new_node.key[1] in all_nodes:
-                        existing_item = all_nodes[new_node.key[1]]
-                        parents_set = set(existing_item.parents)
-                        existing_item.parents = parents_set
-                    else:
-                        new_node.calc_bounds(cur_lvl, w)
-                        all_nodes[new_node.key[1]] = new_node
-                        # check if concrete data should be extracted or not 
(only for those that have score upper
-                        # big enough and if size of subset is big enough
-                        to_slice = new_node.check_bounds(top_k, x_size, alpha)
-                        if to_slice:
-                            new_node.process_slice(loss_type)
-                            new_node.score = opt_fun(new_node.loss, 
new_node.size, loss, x_size, w)
-                            # we decide to add node to current level nodes (in 
order to make new combinations
-                            # on the next one or not basing on its score value
-                            if new_node.check_constraint(top_k, x_size, alpha) 
and new_node.key not in top_k.keys:
-                                cur_lvl_nodes.append(new_node)
-                                top_k.add_new_top_slice(new_node)
-                            elif new_node.check_bounds(top_k, x_size, alpha):
-                                cur_lvl_nodes.append(new_node)
-                        else:
-                            if new_node.check_bounds(top_k, x_size, alpha):
-                                cur_lvl_nodes.append(new_node)
-                        if debug:
-                            new_node.print_debug(top_k, cur_lvl)
-        print("Level " + str(cur_lvl + 1) + " had " + str(len(levels[cur_lvl - 
1]) * (len(levels[cur_lvl - 1]) - 1)) +
-              " candidates but after pruning only " + str(len(cur_lvl_nodes)) 
+ " go to the next level")
+        prev_lvl = levels[cur_lvl - 1]
+        for node_i in range(len(prev_lvl)):
+            partial = join_enum(node_i, prev_lvl, complete_x, loss, x_size, 
y_test, errors, debug, alpha, w, loss_type,
+                                b_update, cur_lvl, all_nodes, top_k, 
cur_lvl_nodes)
+            cur_lvl_nodes = partial[0]
+            all_nodes = partial[1]
         cur_lvl = cur_lvl + 1
         levels.append(cur_lvl_nodes)
         top_k.print_topk()
+        print("Level " + str(cur_lvl) + " had " + str(len(prev_lvl) * 
(len(prev_lvl) - 1)) +
+              " candidates but after pruning only " + str(len(cur_lvl_nodes)) 
+ " go to the next level")
     print("Program stopped at level " + str(cur_lvl + 1))
     print()
     print("Selected slices are: ")
diff --git a/scripts/staging/slicing/base/tests/classification/test_adult.py 
b/scripts/staging/slicing/base/tests/classification/test_adult.py
deleted file mode 100644
index 247d5c4..0000000
--- a/scripts/staging/slicing/base/tests/classification/test_adult.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
-from sklearn.model_selection import train_test_split
-
-
-dataset = pd.read_csv('adult.csv')
-attributes_amount = len(dataset.values[0])
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-# enc = OneHotEncoder(handle_unknown='ignore')
-# x = enc.fit_transform(x).toarray()
-y = dataset.iloc[:, attributes_amount - 1]
-le = preprocessing.LabelEncoder()
-le.fit(y)
-y = le.transform(y)
-complete_x = []
-complete_y = []
-counter = 0
-all_indexes = []
-not_encoded_columns = [
-    "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
-    "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
-    "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
-]
-for row in x:
-        row[0] = int(row[0] / 10)
-        row[2] = int(row[2]) // 100000
-        row[4] = int(row[4] / 5)
-        row[10] = int(row[10] / 1000)
-        row[12] = int(row[12] / 10)
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-all_features = enc.get_feature_names()
-x_size = len(complete_x)
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, 
random_state=0)
-for item in x_test:
-    complete_x.append((counter, item))
-    complete_y.append((counter, y_test[counter]))
-    counter = counter + 1
-x_size = counter
-clf = RandomForestClassifier(n_jobs=2, random_state=0)
-clf.fit(x_train, y_train)
-RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
-            max_depth=None, max_features='auto', max_leaf_nodes=None,
-            min_impurity_split=1e-07, min_samples_leaf=1,
-            min_samples_split=2, min_weight_fraction_leaf=0.0,
-            n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
-            verbose=0, warm_start=False)
-
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and 
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
-# loss_type = 0 (l2 in case of regression model
-# loss_type = 1 (cross entropy in case of classification model)
-preds = clf.predict(x_test)
-predictions = []
-counter = 0
-mistakes = 0
-for pred in preds:
-    predictions.append((counter, pred))
-    if y_test[counter] != pred:
-        mistakes = mistakes + 1
-    counter = counter + 1
-lossF = mistakes / counter
-
-# enumerator <union>/<join> indicates an approach of next level slices 
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
-    slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, 
predictions, debug=True, alpha=4, k=10,
-                   w=0.5, loss_type=1)
-elif enumerator == "union":
-    union_slicer.process(all_features, clf, complete_x, lossF, x_size, 
complete_y, predictions, debug=True, alpha=4,
-                         k=10, w=0.5, loss_type=1)
diff --git a/scripts/staging/slicing/base/tests/classification/test_iris.py 
b/scripts/staging/slicing/base/tests/classification/test_iris.py
deleted file mode 100644
index ce8effd..0000000
--- a/scripts/staging/slicing/base/tests/classification/test_iris.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-from sklearn.ensemble import RandomForestClassifier
-from sklearn import preprocessing
-from sklearn.model_selection import train_test_split
-
-
-dataset = pd.read_csv('iris.csv')
-attributes_amount = len(dataset.values[0])
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-y = dataset.iloc[:, attributes_amount - 1]
-le = preprocessing.LabelEncoder()
-le.fit(y)
-y = le.transform(y)
-complete_x = []
-complete_y = []
-counter = 0
-all_indexes = []
-all_features = enc.get_feature_names()
-x_size = len(complete_x)
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, 
random_state=0)
-for item in x_test:
-    complete_x.append((counter, item))
-    complete_y.append((counter, y_test[counter]))
-    counter = counter + 1
-x_size = counter
-clf = RandomForestClassifier(n_jobs=2, random_state=0)
-clf.fit(x_train, y_train)
-RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
-            max_depth=None, max_features='auto', max_leaf_nodes=None,
-            min_impurity_split=1e-07, min_samples_leaf=1,
-            min_samples_split=2, min_weight_fraction_leaf=0.0,
-            n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
-            verbose=0, warm_start=False)
-
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and 
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
-# loss_type = 0 (l2 in case of regression model
-# loss_type = 1 (cross entropy in case of classification model)
-preds = clf.predict(x_test)
-predictions = []
-counter = 0
-mistakes = 0
-for pred in preds:
-    predictions.append((counter, pred))
-    if y_test[counter] != pred:
-        mistakes = mistakes + 1
-    counter = counter + 1
-lossF = mistakes / counter
-
-# enumerator <union>/<join> indicates an approach of next level slices 
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "join"
-if enumerator == "join":
-    slicer.process(all_features, clf, complete_x, lossF, x_size, complete_y, 
predictions, debug=True, alpha=6, k=10,
-                   w=0.5, loss_type=1)
-elif enumerator == "union":
-    union_slicer.process(all_features, clf, complete_x, lossF, x_size, 
complete_y, predictions, debug=True, alpha=6, k=10,
-               w=0.5, loss_type=1)
diff --git a/scripts/staging/slicing/base/tests/regression/test_insurance.py 
b/scripts/staging/slicing/base/tests/regression/test_insurance.py
deleted file mode 100644
index 34a5d37..0000000
--- a/scripts/staging/slicing/base/tests/regression/test_insurance.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.linear_model import LinearRegression
-from sklearn.model_selection import train_test_split
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-
-file_name = 'insurance.csv'
-dataset = pd.read_csv(file_name)
-attributes_amount = len(dataset.values[0])
-# for now working with regression datasets, assuming that target attribute is 
the last one
-# currently non-categorical features are not supported and should be binned
-y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
-# starting with one not including id field
-x = dataset.iloc[:, 0:attributes_amount - 1].values
-# list of numerical columns
-non_categorical = [1, 3]
-for row in x:
-    for attribute in non_categorical:
-        # <attribute - 2> as we already excluded from x id column
-        row[attribute - 1] = int(row[attribute - 1] / 5)
-# hot encoding of categorical features
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-complete_x = []
-complete_y = []
-counter = 0
-all_features = enc.get_feature_names()
-# train model on a whole dataset
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, 
random_state=0)
-for item in x_test:
-    complete_x.append((counter, item))
-    complete_y.append((counter, y_test[counter]))
-    counter = counter + 1
-x_size = counter
-model = LinearRegression()
-model.fit(x_train, y_train)
-preds = (model.predict(x_test) - y_test) ** 2
-f_l2 = sum(preds)/x_size
-errors = []
-counter = 0
-for pred in preds:
-    errors.append((counter, pred))
-    counter = counter + 1
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and 
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
-
-# enumerator <union>/<join> indicates an approach of next level slices 
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
-    slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, 
errors, debug=True, alpha=5, k=10,
-                   w=0.5, loss_type=0)
-elif enumerator == "union":
-    union_slicer.process(all_features, model, complete_x, f_l2, x_size, 
y_test, errors, debug=True, alpha=5, k=10,
-                   w=0.5, loss_type=0)
diff --git a/scripts/staging/slicing/base/tests/regression/test_salary.py 
b/scripts/staging/slicing/base/tests/regression/test_salary.py
deleted file mode 100644
index c51f4b2..0000000
--- a/scripts/staging/slicing/base/tests/regression/test_salary.py
+++ /dev/null
@@ -1,87 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-import pandas as pd
-from sklearn.linear_model import LinearRegression
-from sklearn.model_selection import train_test_split
-from sklearn.preprocessing import OneHotEncoder
-
-import slicing.slicer as slicer
-
-file_name = 'salary.csv'
-dataset = pd.read_csv(file_name)
-attributes_amount = len(dataset.values[0])
-# for now working with regression datasets, assuming that target attribute is 
the last one
-# currently non-categorical features are not supported and should be binned
-y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
-# starting with one not including id field
-x = dataset.iloc[:, 1:attributes_amount - 1].values
-# list of numerical columns
-non_categorical = [4, 5]
-for row in x:
-    for attribute in non_categorical:
-        # <attribute - 2> as we already excluded from x id column
-        row[attribute - 2] = int(row[attribute - 2] / 5)
-# hot encoding of categorical features
-enc = OneHotEncoder(handle_unknown='ignore')
-x = enc.fit_transform(x).toarray()
-complete_x = []
-complete_y = []
-counter = 0
-all_features = enc.get_feature_names()
-# train model on a whole dataset
-x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, 
random_state=0)
-for item in x_test:
-    complete_x.append((counter, item))
-    complete_y.append((counter, y_test[counter]))
-    counter = counter + 1
-x_size = counter
-model = LinearRegression()
-model.fit(x_train, y_train)
-preds = (model.predict(x_test) - y_test) ** 2
-f_l2 = sum(preds)/x_size
-errors = []
-counter = 0
-for pred in preds:
-    errors.append((counter, pred))
-    counter = counter + 1
-# alpha is size significance coefficient
-# verbose option is for returning debug info while creating slices and 
printing it
-# k is number of top-slices we want
-# w is a weight of error function significance (1 - w) is a size significance 
propagated into optimization function
-<<<<<<< HEAD:scripts/staging/slicing/base/tests/regression/test_salary.py
-slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, errors, 
debug=True, alpha=4, k=10, w=0.5,
-               loss_type=0)
-
-=======
-
-# enumerator <union>/<join> indicates an approach of next level slices 
combination process:
-# in case of <join> in order to create new node of current level slicer
-# combines only nodes of previous layer with each other
-# <union> case implementation is based on DPSize algorithm
-enumerator = "union"
-if enumerator == "join":
-    slicer.process(all_features, model, complete_x, f_l2, x_size, y_test, 
errors, debug=True, alpha=4, k=10, w=0.5,
-               loss_type=0)
-elif enumerator == "union":
-    union_slicer.process(all_features, model, complete_x, f_l2, x_size, 
y_test, errors, debug=True, alpha=4, k=10, w=0.5,
-               loss_type=0)
->>>>>>> [SYSTEMDS-xxx] Alternative slice enumeration 
algorithm:scripts/staging/slicing/base/tests/regression/test_salary.py
diff --git a/scripts/staging/slicing/base/top_k.py 
b/scripts/staging/slicing/base/top_k.py
index 87a5d56..3957fea 100644
--- a/scripts/staging/slicing/base/top_k.py
+++ b/scripts/staging/slicing/base/top_k.py
@@ -51,5 +51,8 @@ class Topk:
         for candidate in self.slices:
             print(candidate.name + ": " + "score = " + str(candidate.score) + 
"; size = " + str(candidate.size))
 
-            print(candidate.name + ": " + "score = " + str(candidate.score) + 
"; size = " + str(candidate.size))
-
+    def buckets_top_k(self, cur_lvl_slices, x_size, alpha):
+        for bucket in cur_lvl_slices:
+            if bucket.check_constraint(self, x_size, alpha):
+                self.add_new_top_slice(bucket)
+        return self
diff --git a/scripts/staging/slicing/base/union_slicer.py 
b/scripts/staging/slicing/base/union_slicer.py
index 6ca003f..6b5fb0e 100644
--- a/scripts/staging/slicing/base/union_slicer.py
+++ b/scripts/staging/slicing/base/union_slicer.py
@@ -19,9 +19,9 @@
 #
 #-------------------------------------------------------------
 
-from slicing.node import Node
-from slicing.top_k import Topk
-from slicing.slicer import opt_fun, union
+from slicing.base.node import Node
+from slicing.base.top_k import Topk
+from slicing.base.slicer import opt_fun, union
 
 
 def check_attributes(left_node, right_node):
@@ -35,15 +35,12 @@ def check_attributes(left_node, right_node):
     return flag
 
 
-def process(all_features, model, complete_x, loss, x_size, y_test, errors, 
debug, alpha, k, w, loss_type):
+def make_first_level(all_features, complete_x, loss, x_size, y_test, errors, 
loss_type, w, alpha, top_k):
+    all_nodes = {}
     counter = 0
-    # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
     first_level = []
-    levels = []
-    all_nodes = {}
-    top_k = Topk(k)
     for feature in all_features:
-        new_node = Node(all_features, model, complete_x, loss, x_size, y_test, 
errors)
+        new_node = Node(complete_x, loss, x_size, y_test, errors)
         new_node.parents = [(feature, counter)]
         new_node.attributes.append((feature, counter))
         new_node.name = new_node.make_name()
@@ -65,12 +62,23 @@ def process(all_features, model, complete_x, loss, x_size, 
y_test, errors, debug
             # this method updates top k slices if needed
             top_k.add_new_top_slice(new_node)
         counter = counter + 1
-    # double appending of first level nodes in order to enumerating second 
level in the same way as others
-    levels.append((first_level, len(all_features)))
-    levels.append((first_level, len(all_features)))
+    return first_level, all_nodes
+
 
+def union_enum():
+    return None
+
+
+def process(all_features, complete_x, loss, x_size, y_test, errors, debug, 
alpha, k, w, loss_type, b_update):
+    top_k = Topk(k)
+    # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
+    levels = []
+    first_level = make_first_level(all_features, complete_x, loss, x_size, 
y_test, errors, loss_type, w, alpha, top_k)
+    # double appending of first level nodes in order to enumerating second 
level in the same way as others
+    levels.append((first_level[0], len(all_features)))
+    all_nodes = first_level[1]
     # cur_lvl - index of current level, correlates with number of slice 
forming features
-    cur_lvl = 2  # level that is planned to be filled later
+    cur_lvl = 1  # level that is planned to be filled later
     cur_lvl_nodes = first_level
     # currently for debug
     print("Level 1 had " + str(len(all_features)) + " candidates")
@@ -81,13 +89,13 @@ def process(all_features, model, complete_x, loss, x_size, 
y_test, errors, debug
     while len(cur_lvl_nodes) > 0:
         cur_lvl_nodes = []
         count = 0
-        for left in range(int(cur_lvl / 2)):
+        for left in range(int(cur_lvl / 2) + 1):
             right = cur_lvl - 1 - left
             for node_i in range(len(levels[left][0])):
                 for node_j in range(len(levels[right][0])):
                     flag = check_attributes(levels[left][0][node_i], 
levels[right][0][node_j])
                     if not flag:
-                        new_node = Node(all_features, model, complete_x, loss, 
x_size, y_test, errors)
+                        new_node = Node(complete_x, loss, x_size, y_test, 
errors)
                         parents_set = set(new_node.parents)
                         parents_set.add(levels[left][0][node_i])
                         parents_set.add(levels[right][0][node_j])
@@ -103,32 +111,12 @@ def process(all_features, model, complete_x, loss, 
x_size, y_test, errors, debug
                             existing_item = all_nodes[new_node.key[1]]
                             parents_set = set(existing_item.parents)
                             existing_item.parents = parents_set
-                            s_upper = new_node.calc_s_upper(cur_lvl)
-                            s_lower = new_node.calc_s_lower(cur_lvl)
-                            e_upper = new_node.calc_e_upper()
-                            e_max_upper = new_node.calc_e_max_upper(cur_lvl)
-                            try:
-                                minimized = min(s_upper, new_node.s_upper)
-                                new_node.s_upper = minimized
-                                minimized = min(s_lower, new_node.s_lower)
-                                new_node.s_lower = minimized
-                                minimized = min(e_upper, new_node.e_upper)
-                                new_node.e_upper = minimized
-                                minimized= min(e_max_upper, 
new_node.e_max_upper)
-                                new_node.e_max_upper = minimized
-                                c_upper = new_node.calc_c_upper(w)
-                                minimized= min(c_upper, new_node.c_upper)
-                                new_node.c_upper = minimized
-                            except AttributeError:
-                                # initial bounds calculation
-                                new_node.s_upper = s_upper
-                                new_node.s_lower = s_lower
-                                new_node.e_upper = e_upper
-                                new_node.e_max_upper = e_max_upper
-                                c_upper = new_node.calc_c_upper(w)
-                                new_node.c_upper = c_upper
-                            minimized = min(c_upper, new_node.c_upper)
-                            new_node.c_upper = minimized
+                            if b_update:
+                                s_upper = new_node.calc_s_upper(cur_lvl)
+                                s_lower = new_node.calc_s_lower(cur_lvl)
+                                e_upper = new_node.calc_e_upper()
+                                e_max_upper = 
new_node.calc_e_max_upper(cur_lvl)
+                                new_node.update_bounds(s_upper, s_lower, 
e_upper, e_max_upper, w)
                         else:
                             new_node.calc_bounds(cur_lvl, w)
                             all_nodes[new_node.key[1]] = new_node
@@ -140,13 +128,9 @@ def process(all_features, model, complete_x, loss, x_size, 
y_test, errors, debug
                                 new_node.score = opt_fun(new_node.loss, 
new_node.size, loss, x_size, w)
                                 # we decide to add node to current level nodes 
(in order to make new combinations
                                 # on the next one or not basing on its score 
value
-                                if new_node.score >= top_k.min_score and 
new_node.size >= x_size / alpha \
-                                        and new_node.key not in top_k.keys:
-                                    cur_lvl_nodes.append(new_node)
+                                if new_node.check_constraint(top_k, x_size, 
alpha) and new_node.key not in top_k.keys:
                                     top_k.add_new_top_slice(new_node)
-                            else:
-                                if new_node.s_upper >= x_size / alpha and 
new_node.c_upper >= top_k.min_score:
-                                    cur_lvl_nodes.append(new_node)
+                                cur_lvl_nodes.append(new_node)
                             if debug:
                                 new_node.print_debug(top_k, cur_lvl)
             count = count + levels[left][1] * levels[right][1]
diff --git a/scripts/staging/slicing/spark_modules/join_data_parallel.py 
b/scripts/staging/slicing/spark_modules/join_data_parallel.py
new file mode 100644
index 0000000..78156c3
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/join_data_parallel.py
@@ -0,0 +1,120 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.Bucket import Bucket
+from slicing.base.SparkNode import SparkNode
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import approved_join_slice
+
+
+def rows_mapper(row, buckets, loss_type):
+    filtered = dict(filter(lambda bucket: all(attr in row[1] for attr in 
bucket[1].attributes), buckets.items()))
+    for item in filtered:
+        filtered[item].update_metrics(row, loss_type)
+    return filtered
+
+
+def join_enum(cur_lvl_nodes, cur_lvl, x_size, alpha, top_k, w, loss):
+    buckets = {}
+    for node_i in range(len(cur_lvl_nodes)):
+        for node_j in range(node_i + 1, len(cur_lvl_nodes)):
+            flag = approved_join_slice(cur_lvl_nodes[node_i], 
cur_lvl_nodes[node_j], cur_lvl)
+            if flag:
+                node = SparkNode(None, None)
+                node.attributes = list(set(cur_lvl_nodes[node_i].attributes) | 
set(cur_lvl_nodes[node_j].attributes))
+                bucket = Bucket(node, cur_lvl, w, x_size, loss)
+                bucket.parents.append(cur_lvl_nodes[node_i])
+                bucket.parents.append(cur_lvl_nodes[node_j])
+                bucket.calc_bounds(w, x_size, loss)
+                if bucket.check_bounds(x_size, alpha, top_k):
+                    buckets[bucket.name] = bucket
+    return buckets
+
+
+def combiner(a):
+    return a
+
+
+def merge_values(a, b):
+    a.combine_with(b)
+    return a
+
+
+def merge_combiners(a, b):
+    a + b
+    return a
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, 
loss_type):
+    top_k = Topk(k)
+    cur_lvl = 0
+    cur_lvl_nodes = list(all_features)
+    pred_pandas = predictions.toPandas()
+    x_size = len(pred_pandas)
+    b_topk = SparkContext.broadcast(sc, top_k)
+    b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+    b_cur_lvl_nodes = SparkContext.broadcast(sc, cur_lvl_nodes)
+    buckets = {}
+    for node in b_cur_lvl_nodes.value:
+        bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+        buckets[bucket.name] = bucket
+    b_buckets = SparkContext.broadcast(sc, buckets)
+    rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2]))\
+        .map(lambda item: (item[0], item[1].tolist(), item[2]))
+    mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
+    flattened = mapped.flatMap(lambda line: (line.items()))
+    reduced = flattened.combineByKey(combiner, merge_values, merge_combiners)
+    cur_lvl_nodes = reduced.values()\
+        .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w, 
x_size, b_cur_lvl.value))
+    if debug:
+        cur_lvl_nodes.map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
+    cur_lvl = 1
+    prev_level = cur_lvl_nodes.collect()
+    top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+    while len(prev_level) > 0:
+        b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+        b_topk = SparkContext.broadcast(sc, top_k)
+        b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+        b_topk.value.print_topk()
+        buckets = join_enum(b_cur_lvl_nodes.value, b_cur_lvl.value, x_size, 
alpha, b_topk.value, w, loss)
+        b_buckets = SparkContext.broadcast(sc, buckets)
+        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, b_topk.value), b_buckets.value.items()))
+        mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+        flattened = mapped.flatMap(lambda line: (line.items()))
+        to_process = flattened.combineByKey(combiner, merge_values, 
merge_combiners)
+        if debug:
+            to_process.values().map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
+        prev_level = to_process\
+            .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], 
loss, w, x_size, b_cur_lvl.value))\
+            .collect()
+        cur_lvl = b_cur_lvl.value + 1
+        top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+        print("Level " + str(b_cur_lvl.value) + " had " + str(
+            len(b_cur_lvl_nodes.value * (len(b_cur_lvl_nodes.value) - 1)))+" 
candidates but after pruning only " +
+              str(len(prev_level)) + " go to the next level")
+        print("Program stopped at level " + str(cur_lvl))
+    print()
+    print("Selected slices are: ")
+    b_topk.value.print_topk()
+    return None
diff --git a/scripts/staging/slicing/spark_modules/spark_slicer.py 
b/scripts/staging/slicing/spark_modules/spark_slicer.py
new file mode 100644
index 0000000..86f2b34
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_slicer.py
@@ -0,0 +1,100 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.SparkNode import SparkNode
+from slicing.base.slicer import union, opt_fun
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import update_top_k
+
+
+def join_enum_fun(node_a, list_b, predictions, f_l2, debug, alpha, w, 
loss_type, cur_lvl, top_k):
+    x_size = len(predictions)
+    nodes = []
+    for node_i in range(len(list_b)):
+        flag = spark_utils.approved_join_slice(node_i, node_a, cur_lvl)
+        if not flag:
+            new_node = SparkNode(predictions, f_l2)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_a)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_a.attributes
+            parent2_attr = list_b[node_i].attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.calc_bounds(cur_lvl, w)
+            # check if concrete data should be extracted or not (only for 
those that have score upper
+            # and if size of subset is big enough
+            to_slice = new_node.check_bounds(top_k, x_size, alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, f_l2, 
x_size, w)
+                # we decide to add node to current level nodes (in order to 
make new combinations
+                # on the next one or not basing on its score value
+                if new_node.check_constraint(top_k, x_size, alpha) and 
new_node.key not in top_k.keys:
+                    top_k.add_new_top_slice(new_node)
+                nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return nodes
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, 
loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    first_level = {}
+    all_features = list(all_features)
+    first_tasks = sc.parallelize(all_features)
+    b_topk = SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(lambda features: 
spark_utils.make_first_level(features, predictions, loss,
+                                                                               
           b_topk.value, w, loss_type)) \
+        .map(lambda node: (node.key, node)).collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, b_topk.value, alpha, predictions)
+    prev_level = SparkContext.broadcast(sc, first_level)
+    levels.append(prev_level)
+    cur_lvl = cur_lvl + 1
+    b_topk.value.print_topk()
+    # checking the first partition of level. if not empty then processing 
otherwise no elements were added to this level
+    while len(levels[cur_lvl - 1].value) > 0:
+        nodes_list = {}
+        partitions = sc.parallelize(levels[cur_lvl - 1].value.values())
+        mapped = partitions.mapPartitions(lambda nodes: 
spark_utils.nodes_enum(nodes, levels[cur_lvl - 1].value.values(),
+                                                                               
predictions, loss, b_topk.value, alpha, k, w,
+                                                                               
loss_type, cur_lvl, debug, enumerator))
+        flattened = mapped.flatMap(lambda node: node)
+        nodes_list.update(flattened.map(lambda node: (node.key, 
node)).distinct().collect())
+        prev_level = SparkContext.broadcast(sc, nodes_list)
+        levels.append(prev_level)
+        update_top_k(nodes_list, b_topk.value, alpha, predictions)
+        cur_lvl = cur_lvl + 1
+        b_topk.value.print_topk()
+        print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
+              " candidates but after pruning only " + str(len(nodes_list)) + " 
go to the next level")
+    print("Program stopped at level " + str(cur_lvl))
+    print()
+    print("Selected slices are: ")
+    b_topk.value.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_union_slicer.py 
b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
new file mode 100644
index 0000000..811baff
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_union_slicer.py
@@ -0,0 +1,70 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+from pyspark import SparkContext
+
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils
+from slicing.spark_modules.spark_utils import update_top_k
+
+
+def process(all_features, predictions, loss, sc, debug, alpha, k, w, 
loss_type, enumerator):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    all_features = list(all_features)
+    first_level = {}
+    first_tasks = sc.parallelize(all_features)
+    b_topk = SparkContext.broadcast(sc, top_k)
+    init_slices = first_tasks.mapPartitions(lambda features: 
spark_utils.make_first_level(features, predictions, loss,
+                                                                               
           b_topk.value, w, loss_type)) \
+        .map(lambda node: (node.key, node)) \
+        .collect()
+    first_level.update(init_slices)
+    update_top_k(first_level, b_topk.value, alpha, predictions)
+    prev_level = SparkContext.broadcast(sc, first_level)
+    levels.append(prev_level)
+    cur_lvl = 1
+    b_topk.value.print_topk()
+    while len(levels[cur_lvl - 1].value) > 0:
+        cur_lvl_res = {}
+        for left in range(int(cur_lvl / 2) + 1):
+            right = cur_lvl - left - 1
+            partitions = sc.parallelize(levels[left].value.values())
+            mapped = partitions.mapPartitions(lambda nodes: 
spark_utils.nodes_enum(nodes, levels[right].value.values(),
+                                                                               
    predictions, loss, b_topk.value, alpha, k,
+                                                                               
    w, loss_type, cur_lvl, debug,
+                                                                               
    enumerator))
+            flattened = mapped.flatMap(lambda node: node)
+            partial = flattened.map(lambda node: (node.key, node)).collect()
+            cur_lvl_res.update(partial)
+        prev_level = SparkContext.broadcast(sc, cur_lvl_res)
+        levels.append(prev_level)
+        update_top_k(cur_lvl_res, b_topk.value, alpha, predictions)
+        cur_lvl = cur_lvl + 1
+        top_k.print_topk()
+        print("Level " + str(cur_lvl) + " had " + str(len(levels[cur_lvl - 
1].value) * (len(levels[cur_lvl - 1].value) - 1)) +
+              " candidates but after pruning only " + 
str(len(prev_level.value)) + " go to the next level")
+    print("Program stopped at level " + str(cur_lvl))
+    print()
+    print("Selected slices are: ")
+    b_topk.value.print_topk()
diff --git a/scripts/staging/slicing/spark_modules/spark_utils.py 
b/scripts/staging/slicing/spark_modules/spark_utils.py
new file mode 100644
index 0000000..a12c84b
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/spark_utils.py
@@ -0,0 +1,141 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+from pyspark.sql.functions import udf
+from pyspark.sql.types import FloatType
+
+from slicing.base.SparkNode import SparkNode
+from slicing.base.slicer import opt_fun, union
+
+calc_loss = udf(lambda target, prediction, type: calc_loss_fun(target, 
prediction, type), FloatType())
+model_type_init = udf(lambda type: init_model_type(type))
+
+
+def calc_loss_fun(target, prediction, type):
+    if type == 0:
+        return (prediction - target) ** 2
+    elif type == 1:
+        return float(target == prediction)
+
+
+def init_model_type(model_type):
+    if model_type == "regression":
+        return 0
+    elif model_type == "classification":
+        return 1
+
+
+def approved_join_slice(node_i, node_j, cur_lvl):
+    commons = len(list(set(node_i.attributes) & set(node_j.attributes)))
+    return commons == cur_lvl - 1
+
+
+def make_first_level(features, predictions, loss, top_k, w, loss_type):
+    first_level = []
+    # First level slices are enumerated in a "classic way" (getting data and 
not analyzing bounds
+    for feature in features:
+        new_node = SparkNode(loss, predictions)
+        new_node.parents = [feature]
+        new_node.attributes.append(feature)
+        new_node.name = new_node.make_name()
+        new_node.key = new_node.make_key()
+        new_node.process_slice(loss_type)
+        new_node.score = opt_fun(new_node.loss, new_node.size, loss, 
len(predictions), w)
+        new_node.c_upper = new_node.score
+        first_level.append(new_node)
+        new_node.print_debug(top_k, 0)
+    return first_level
+
+
+def approved_union_slice(node_i, node_j):
+    for attr1 in node_i.attributes:
+        for attr2 in node_j.attributes:
+            if attr1 == attr2:
+                # there are common attributes which is not the case we need
+                return False
+    return True
+
+
+def process_node(node_i, level, loss, predictions, cur_lvl, top_k, alpha, 
loss_type, w, debug, enumerator):
+    cur_enum_nodes = []
+    for node_j in level:
+        if enumerator == "join":
+            flag = approved_join_slice(node_i, node_j, cur_lvl)
+        else:
+            flag = approved_union_slice(node_i, node_j)
+        if flag and int(node_i.name.split("&&")[0]) < 
int(node_j.name.split("&&")[0]):
+            new_node = SparkNode(loss, predictions)
+            parents_set = set(new_node.parents)
+            parents_set.add(node_i)
+            parents_set.add(node_j)
+            new_node.parents = list(parents_set)
+            parent1_attr = node_i.attributes
+            parent2_attr = node_j.attributes
+            new_node_attr = union(parent1_attr, parent2_attr)
+            new_node.attributes = new_node_attr
+            new_node.name = new_node.make_name()
+            new_node.key = new_node.make_key()
+            new_node.calc_bounds(cur_lvl, w)
+            to_slice = new_node.check_bounds(top_k, len(predictions), alpha)
+            if to_slice:
+                new_node.process_slice(loss_type)
+                new_node.score = opt_fun(new_node.loss, new_node.size, loss, 
len(predictions), w)
+                if new_node.check_constraint(top_k, len(predictions), alpha):
+                    cur_enum_nodes.append(new_node)
+            if debug:
+                new_node.print_debug(top_k, cur_lvl)
+    return cur_enum_nodes
+
+
+def nodes_enum(nodes, level, predictions, loss, top_k, alpha, k, w, loss_type, 
cur_lvl, debug, enumerator):
+    cur_enum_nodes = []
+    for node_i in nodes:
+        partial_nodes = process_node(node_i, level, loss, predictions, 
cur_lvl, top_k, alpha,
+                                     loss_type, w, debug, enumerator)
+        cur_enum_nodes.append(partial_nodes)
+    return cur_enum_nodes
+
+
+def init_top_k(first_level, top_k, alpha, predictions):
+    # driver updates topK
+    for sliced in first_level:
+        if sliced[1].check_constraint(top_k, len(predictions), alpha):
+            # this method updates top k slices if needed
+            top_k.add_new_top_slice(sliced[1])
+
+
+def update_top_k(new_slices, top_k, alpha, predictions):
+    # driver updates topK
+    for sliced in new_slices.values():
+        if sliced.check_constraint(top_k, len(predictions), alpha):
+            # this method updates top k slices if needed
+            top_k.add_new_top_slice(sliced)
+
+
+def calc_bucket_metrics(bucket, loss, w, x_size, cur_lvl):
+    bucket.calc_error()
+    bucket.score = opt_fun(bucket.error, bucket.size, loss, x_size, w)
+    if cur_lvl == 0:
+        bucket.s_upper = bucket.size
+        bucket.c_upper = bucket.score
+        bucket.s_lower = 1
+    return bucket
diff --git a/scripts/staging/slicing/spark_modules/union_data_parallel.py 
b/scripts/staging/slicing/spark_modules/union_data_parallel.py
new file mode 100644
index 0000000..b000abe
--- /dev/null
+++ b/scripts/staging/slicing/spark_modules/union_data_parallel.py
@@ -0,0 +1,119 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkContext
+
+from slicing.base.Bucket import Bucket
+from slicing.base.SparkNode import SparkNode
+from slicing.base.top_k import Topk
+from slicing.spark_modules import spark_utils, join_data_parallel
+from slicing.spark_modules.join_data_parallel import rows_mapper, combiner
+from slicing.spark_modules.spark_utils import approved_union_slice
+
+
+def merge_values(a, b):
+    a.minimize_bounds(b)
+    return a
+
+
+def merge_combiners(a, b):
+    a + b
+    return a
+
+
+def union_enum(left_level, right_level, x_size, alpha, top_k, w, loss, 
cur_lvl):
+    buckets = {}
+    for node_i in range(len(left_level)):
+        for node_j in range(len(right_level)):
+            flag = approved_union_slice(left_level[node_i], 
right_level[node_j])
+            if flag:
+                node = SparkNode(None, None)
+                node.attributes = list(set(left_level[node_i].attributes) | 
set(right_level[node_j].attributes))
+                bucket = Bucket(node, cur_lvl, w, x_size, loss)
+                bucket.parents.append(left_level[node_i])
+                bucket.parents.append(right_level[node_j])
+                bucket.calc_bounds(w, x_size, loss)
+                if bucket.check_bounds(x_size, alpha, top_k):
+                    buckets[bucket.name] = bucket
+    return buckets
+
+
+def parallel_process(all_features, predictions, loss, sc, debug, alpha, k, w, 
loss_type):
+    top_k = Topk(k)
+    cur_lvl = 0
+    levels = []
+    cur_lvl_nodes = list(all_features)
+    pred_pandas = predictions.toPandas()
+    x_size = len(pred_pandas)
+    b_topk = SparkContext.broadcast(sc, top_k)
+    b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+    buckets = {}
+    for node in cur_lvl_nodes:
+        bucket = Bucket(node, b_cur_lvl.value, w, x_size, loss)
+        buckets[bucket.name] = bucket
+    b_buckets = SparkContext.broadcast(sc, buckets)
+    rows = predictions.rdd.map(lambda row: (row[0], row[1].indices, row[2])) \
+        .map(lambda item: (item[0], item[1].tolist(), item[2]))
+    mapped = rows.map(lambda row: rows_mapper(row, b_buckets.value, loss_type))
+    flattened = mapped.flatMap(lambda line: (line.items()))
+    reduced = flattened.combineByKey(combiner, 
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
+    cur_lvl_nodes = reduced.values() \
+        .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket, loss, w, 
x_size, b_cur_lvl.value))
+    if debug:
+        cur_lvl_nodes.map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
+    cur_lvl = 1
+    prev_level = cur_lvl_nodes.collect()
+    b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+    levels.append(b_cur_lvl_nodes)
+    top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+    while len(levels[cur_lvl - 1].value) > 0:
+        b_topk = SparkContext.broadcast(sc, top_k)
+        b_cur_lvl = SparkContext.broadcast(sc, cur_lvl)
+        b_topk.value.print_topk()
+        buckets = []
+        for left in range(int(cur_lvl / 2) + 1):
+            right = cur_lvl - left - 1
+            nodes = union_enum(levels[left].value, levels[right].value, 
x_size, alpha, b_topk.value, w, loss, b_cur_lvl.value)
+            buckets.append(nodes)
+        b_buckets = sc.parallelize(buckets)
+        all_buckets = b_buckets.flatMap(lambda line: (line.items()))
+        combined = dict(all_buckets.combineByKey(combiner, merge_values, 
merge_combiners).collect())
+        b_buckets = SparkContext.broadcast(sc, combined)
+        to_slice = dict(filter(lambda bucket: bucket[1].check_bounds(x_size, 
alpha, b_topk.value), b_buckets.value.items()))
+        mapped = rows.map(lambda row: rows_mapper(row, to_slice, loss_type))
+        flattened = mapped.flatMap(lambda line: (line.items()))
+        partial = flattened.combineByKey(combiner, 
join_data_parallel.merge_values, join_data_parallel.merge_combiners)
+        prev_level = partial\
+            .map(lambda bucket: spark_utils.calc_bucket_metrics(bucket[1], 
loss, w, x_size, b_cur_lvl.value)).collect()
+        if debug:
+            partial.values().map(lambda bucket: 
bucket.print_debug(b_topk.value)).collect()
+        top_k = b_topk.value.buckets_top_k(prev_level, x_size, alpha)
+        print("Level " + str(cur_lvl) + " had " + str(
+            len(levels[cur_lvl - 1].value) * (len(levels[cur_lvl - 1].value) - 
1)) +
+              " candidates but after pruning only " + str(len(prev_level)) + " 
go to the next level")
+        print("Program stopped at level " + str(cur_lvl))
+        b_cur_lvl_nodes = SparkContext.broadcast(sc, prev_level)
+        levels.append(b_cur_lvl_nodes)
+        cur_lvl = b_cur_lvl.value + 1
+    print()
+    print("Selected slices are: ")
+    b_topk.value.print_topk()
+    return None
diff --git a/scripts/staging/slicing/tests/__init__.py 
b/scripts/staging/slicing/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/classification/__init__.py 
b/scripts/staging/slicing/tests/classification/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/classification/sparked_adults.py 
b/scripts/staging/slicing/tests/classification/sparked_adults.py
new file mode 100644
index 0000000..9c37fef
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/sparked_adults.py
@@ -0,0 +1,118 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.classification import RandomForestClassifier
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, 
VectorAssembler
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+
+from slicing.sparked import sparked_utils, sparked_slicer, sparked_union_slicer
+
+
+ten_binner = sf.udf(lambda arg: int(arg / 10))
+fnlwgt_binner = sf.udf(lambda arg: int(arg // 100000))
+edu_binner = sf.udf(lambda arg: int(arg / 5))
+cap_gain_binner = sf.udf(lambda arg: int(arg / 1000))
+
+conf = SparkConf().setAppName("adults_test").setMaster('local[2]')
+num_partitions = 2
+model_type = "classification"
+label = 'Income'
+sparkContext = SparkContext(conf=conf)
+sqlContext = SQLContext(sparkContext)
+dataset_df = sqlContext.read.csv('/slicing/datasets/adult.csv', header='true', 
inferSchema='true')
+# initializing stages of main transformation pipeline
+stages = []
+dataset_df = dataset_df.withColumn('age_bin', ten_binner(dataset_df['Age']))
+dataset_df = dataset_df.withColumn('fnlwgt_bin', 
fnlwgt_binner(dataset_df['fnlwgt']))
+dataset_df = dataset_df.withColumn('edu_num_bin', 
edu_binner(dataset_df['EducationNum']))
+dataset_df = dataset_df.withColumn('cap_gain_bin', 
cap_gain_binner(dataset_df['CapitalGain']))
+dataset_df = dataset_df.withColumn('hours_per_w_bin', 
ten_binner(dataset_df['HoursPerWeek']))
+dataset_df = dataset_df.withColumn('model_type', sf.lit(1))
+
+# list of categorical features for further hot-encoding
+cat_features = ["age_bin", "WorkClass", "fnlwgt_bin", "Education", 
"edu_num_bin",
+        "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
+        "cap_gain_bin", "CapitalLoss", "hours_per_w_bin", "NativeCountry"]
+
+# hot encoding categorical features
+for feature in cat_features:
+    string_indexer = StringIndexer(inputCol=feature, outputCol=feature + 
"_index")
+    encoder = 
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], 
outputCols=[feature + "_vec"])
+    encoder.setDropLast(False)
+    stages += [string_indexer, encoder]
+assembler_inputs = [feature + "_vec" for feature in cat_features]
+assembler = VectorAssembler(inputCols=assembler_inputs, 
outputCol="assembled_inputs")
+stages += [assembler]
+assembler_final = VectorAssembler(inputCols=["assembled_inputs"], 
outputCol="features")
+label_indexer = StringIndexer(inputCol=label, outputCol=label+"_idx")
+stages += [assembler_final]
+stages += [label_indexer]
+pipeline = Pipeline(stages=stages)
+pipeline_model = pipeline.fit(dataset_df)
+dataset_transformed = pipeline_model.transform(dataset_df)
+cat_dict = []
+decode_dict = {}
+counter = 0
+cat = 0
+for feature in cat_features:
+    colIdx = dataset_transformed.select(feature, feature + 
"_index").distinct().rdd.collectAsMap()
+    colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: 
item[1])}
+    for item in colIdx:
+        decode_dict[counter] = (cat, item, colIdx[item])
+        counter = counter + 1
+    cat = cat + 1
+df_transform_fin = dataset_transformed.select('features', label+"_idx", 
'model_type')
+splits = df_transform_fin.randomSplit([0.8, 0.2], seed=1234)
+train_df = splits[0]
+test_df = splits[1]
+rf = RandomForestClassifier(featuresCol='features', labelCol=label+"_idx", 
numTrees=10)
+rf_model = rf.fit(train_df)
+predictions = rf_model.transform(test_df)
+# Select example rows to display.
+predictions.select("features", label+"_idx", "prediction", "model_type")
+# Select (prediction, true label) and compute test error
+evaluator = MulticlassClassificationEvaluator(
+    labelCol=label+"_idx", predictionCol="prediction", metricName="accuracy")
+accuracy = evaluator.evaluate(predictions)
+loss = 1.0 - accuracy
+pred_df_fin = predictions.withColumn('error', 
sparked_utils.calc_loss(predictions[label+"_idx"],
+                                                                      
predictions['prediction'],
+                                                                      
predictions['model_type']))
+predictions = pred_df_fin.select('features', 
'error').repartition(num_partitions)
+all_features = range(predictions.toPandas().values[0][0].size)
+predictions = predictions.collect()
+k = 10
+SparkContext.broadcast(sparkContext, loss)
+SparkContext.broadcast(sparkContext, predictions)
+SparkContext.broadcast(sparkContext, all_features)
+SparkContext.broadcast(sparkContext, decode_dict)
+enumerator = "join"
+if enumerator == "join":
+    sparked_slicer.parallel_process(all_features, predictions, loss, 
sparkContext, debug=True, alpha=6, k=k, w=0.5,
+                                    loss_type=0, enumerator="join")
+elif enumerator == "union":
+    sparked_union_slicer.process(all_features, predictions, loss, 
sparkContext, debug=True, alpha=4, k=k, w=0.5, loss_type=0,
+                                 enumerator="union")
+
diff --git a/scripts/staging/slicing/tests/classification/test_adult.py 
b/scripts/staging/slicing/tests/classification/test_adult.py
new file mode 100644
index 0000000..9575592
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/test_adult.py
@@ -0,0 +1,121 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+from sklearn.ensemble import RandomForestClassifier
+from sklearn import preprocessing
+from sklearn.model_selection import train_test_split
+
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 4
+        b_update = True
+        debug = True
+        loss_type = 1
+        enumerator = "union"
+    dataset = pd.read_csv('/slicing/datasets/adult.csv')
+    attributes_amount = len(dataset.values[0])
+    x = dataset.iloc[:, 0:attributes_amount - 1].values
+    y = dataset.iloc[:, attributes_amount - 1]
+    le = preprocessing.LabelEncoder()
+    le.fit(y)
+    y = le.transform(y)
+    complete_x = []
+    complete_y = []
+    counter = 0
+    all_indexes = []
+    not_encoded_columns = [
+        "Age", "WorkClass", "fnlwgt", "Education", "EducationNum",
+        "MaritalStatus", "Occupation", "Relationship", "Race", "Gender",
+        "CapitalGain", "CapitalLoss", "HoursPerWeek", "NativeCountry", "Income"
+    ]
+    for row in x:
+            row[0] = int(row[0] / 10)
+            row[2] = int(row[2]) // 100000
+            row[4] = int(row[4] / 5)
+            row[10] = int(row[10] / 1000)
+            row[12] = int(row[12] / 10)
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x).toarray()
+    all_features = enc.get_feature_names()
+    x_size = len(complete_x)
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, 
random_state=0)
+    for item in x_test:
+        complete_x.append((counter, item))
+        complete_y.append((counter, y_test[counter]))
+        counter = counter + 1
+    x_size = counter
+    clf = RandomForestClassifier(n_jobs=2, random_state=0)
+    clf.fit(x_train, y_train)
+    RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
+                max_depth=None, max_features='auto', max_leaf_nodes=None,
+                min_impurity_split=1e-07, min_samples_leaf=1,
+                min_samples_split=2, min_weight_fraction_leaf=0.0,
+                n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+                verbose=0, warm_start=False)
+
+    # alpha is size significance coefficient
+    # verbose option is for returning debug info while creating slices and 
printing it
+    # k is number of top-slices we want
+    # w is a weight of error function significance (1 - w) is a size 
significance propagated into optimization function
+    # loss_type = 0 (l2 in case of regression model
+    # loss_type = 1 (cross entropy in case of classification model)
+    preds = clf.predict(x_test)
+    predictions = []
+    counter = 0
+    mistakes = 0
+    for pred in preds:
+        predictions.append((counter, pred))
+        if y_test[counter] != pred:
+            mistakes = mistakes + 1
+        counter = counter + 1
+    lossF = mistakes / counter
+
+    # enumerator <union>/<join> indicates an approach of next level slices 
combination process:
+    # in case of <join> in order to create new node of current level slicer
+    # combines only nodes of previous layer with each other
+    # <union> case implementation is based on DPSize algorithm
+    if enumerator == "join":
+        slicer.process(all_features, complete_x, lossF, x_size, complete_y, 
predictions, debug=debug, alpha=alpha, k=k,
+                       w=w, loss_type=loss_type, b_update=b_update)
+    elif enumerator == "union":
+        union_slicer.process(all_features, complete_x, lossF, x_size, 
complete_y, predictions, debug=debug, alpha=alpha,
+                             k=k, w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/classification/test_iris.py 
b/scripts/staging/slicing/tests/classification/test_iris.py
new file mode 100644
index 0000000..2bd0c09
--- /dev/null
+++ b/scripts/staging/slicing/tests/classification/test_iris.py
@@ -0,0 +1,109 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+from sklearn.ensemble import RandomForestClassifier
+from sklearn import preprocessing
+from sklearn.model_selection import train_test_split
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 6
+        b_update = True
+        debug = True
+        loss_type = 1
+        enumerator = "join"
+    dataset = pd.read_csv('/slicing/datasets/iris.csv')
+    attributes_amount = len(dataset.values[0])
+    x = dataset.iloc[:, 0:attributes_amount - 1].values
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x).toarray()
+    y = dataset.iloc[:, attributes_amount - 1]
+    le = preprocessing.LabelEncoder()
+    le.fit(y)
+    y = le.transform(y)
+    complete_x = []
+    complete_y = []
+    counter = 0
+    all_indexes = []
+    all_features = enc.get_feature_names()
+    x_size = len(complete_x)
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, 
random_state=0)
+    for item in x_test:
+        complete_x.append((counter, item))
+        complete_y.append((counter, y_test[counter]))
+        counter = counter + 1
+    x_size = counter
+    clf = RandomForestClassifier(n_jobs=2, random_state=0)
+    clf.fit(x_train, y_train)
+    RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
+                max_depth=None, max_features='auto', max_leaf_nodes=None,
+                min_impurity_split=1e-07, min_samples_leaf=1,
+                min_samples_split=2, min_weight_fraction_leaf=0.0,
+                n_estimators=10, n_jobs=2, oob_score=False, random_state=0,
+                verbose=0, warm_start=False)
+
+    # alpha is size significance coefficient
+    # verbose option is for returning debug info while creating slices and 
printing it
+    # k is number of top-slices we want
+    # w is a weight of error function significance (1 - w) is a size 
significance propagated into optimization function
+    # loss_type = 0 (l2 in case of regression model
+    # loss_type = 1 (cross entropy in case of classification model)
+    preds = clf.predict(x_test)
+    predictions = []
+    counter = 0
+    mistakes = 0
+    for pred in preds:
+        predictions.append((counter, pred))
+        if y_test[counter] != pred:
+            mistakes = mistakes + 1
+        counter = counter + 1
+    lossF = mistakes / counter
+
+    # enumerator <union>/<join> indicates an approach of next level slices 
combination process:
+    # in case of <join> in order to create new node of current level slicer
+    # combines only nodes of previous layer with each other
+    # <union> case implementation is based on DPSize algorithm
+    if enumerator == "join":
+        slicer.process(all_features, complete_x, lossF, x_size, complete_y, 
predictions, debug=debug, alpha=alpha, k=k,
+                       w=w, loss_type=1, b_update=b_update)
+    elif enumerator == "union":
+        union_slicer.process(all_features, complete_x, lossF, x_size, 
complete_y, predictions, debug=debug, alpha=alpha, k=k,
+                             w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/regression/__init__.py 
b/scripts/staging/slicing/tests/regression/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scripts/staging/slicing/tests/regression/bd_spark_salary.py 
b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
new file mode 100644
index 0000000..c32414b
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/bd_spark_salary.py
@@ -0,0 +1,131 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, 
VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+from pyspark.sql.functions import udf
+from sklearn.model_selection import train_test_split
+
+from slicing.spark_modules import spark_utils, join_data_parallel, 
union_data_parallel
+
+binner = udf(lambda arg: int(int(arg) // 5))
+
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 6
+        b_update = True
+        debug = True
+        loss_type = 0
+        enumerator = "union"
+
+    conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
+    num_partitions = 2
+    model_type = "regression"
+    label = 'salary'
+    sparkContext = SparkContext(conf=conf)
+    sqlContext = SQLContext(sparkContext)
+    fileRDD = sparkContext.textFile('salaries.csv', num_partitions)
+    header = fileRDD.first()
+    head_split = header.split(",")
+    head_split[0] = '_c0'
+    fileRDD = fileRDD.filter(lambda line: line != header)
+    data = fileRDD.map(lambda row: row.split(","))
+    dataset_df = sqlContext.createDataFrame(data, head_split)
+
+    cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+    # initializing stages of main transformation pipeline
+    stages = []
+    dataset_df = dataset_df.drop('_c0')
+    dataset_df = dataset_df.withColumn("id", sf.monotonically_increasing_id())
+    # bining numeric features by local binner udf function (specified for 
current dataset if needed)
+    dataset_df = dataset_df.withColumn('sincephd_bin', 
binner(dataset_df['sincephd']))
+    dataset_df = dataset_df.withColumn('service_bin', 
binner(dataset_df['service']))
+    dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
+    dataset_df = dataset_df.drop('sincephd', 'service')
+    dataset_df = dataset_df.withColumn('target', dataset_df[label].cast("int"))
+    # hot encoding categorical features
+    for feature in cat_features:
+        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + 
"_index")
+        encoder = 
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], 
outputCols=[feature + "_vec"])
+        encoder.setDropLast(False)
+        stages += [string_indexer, encoder]
+    assembler_inputs = [feature + "_vec" for feature in cat_features]
+    assembler = VectorAssembler(inputCols=assembler_inputs, 
outputCol="assembled_inputs")
+    stages += [assembler]
+    assembler_final = VectorAssembler(inputCols=["assembled_inputs"], 
outputCol="features")
+    stages += [assembler_final]
+
+    pipeline = Pipeline(stages=stages)
+    pipeline_model = pipeline.fit(dataset_df)
+    dataset_transformed = pipeline_model.transform(dataset_df)
+    df_transform_fin = dataset_transformed.select('id', 'features', 'target', 
'model_type').toPandas()
+
+    cat = 0
+    counter = 0
+    decode_dict = {}
+    for feature in cat_features:
+        colIdx = dataset_transformed.select(feature, feature + 
"_index").distinct().rdd.collectAsMap()
+        colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: 
item[1])}
+        for item in colIdx:
+            decode_dict[counter] = (cat, item, colIdx[item])
+            counter = counter + 1
+        cat = cat + 1
+
+    train, test = train_test_split(df_transform_fin, test_size=0.3, 
random_state=0)
+    train_df = sqlContext.createDataFrame(train)
+    test_df = sqlContext.createDataFrame(test)
+    lr = LinearRegression(featuresCol='features', labelCol='target', 
maxIter=10, regParam=0.3, elasticNetParam=0.8)
+    lr_model = lr.fit(train_df)
+    eval = lr_model.evaluate(test_df)
+    f_l2 = eval.meanSquaredError
+    pred = eval.predictions
+    pred_df_fin = pred.withColumn('error', 
spark_utils.calc_loss(pred['target'], pred['prediction'], pred['model_type']))
+    predictions = pred_df_fin.select('id', 'features', 
'error').repartition(num_partitions)
+    converter = IndexToString(inputCol='features', outputCol='cats')
+    all_features = range(predictions.toPandas().values[1][1].size)
+    k = 10
+    if enumerator == "join":
+        join_data_parallel.parallel_process(all_features, predictions, f_l2, 
sparkContext, debug=debug, alpha=alpha,
+                                            k=k, w=w, loss_type=loss_type)
+    elif enumerator == "union":
+        union_data_parallel.parallel_process(all_features, predictions, f_l2, 
sparkContext, debug=debug, alpha=alpha,
+                                             k=k, w=w, loss_type=loss_type)
diff --git a/scripts/staging/slicing/tests/regression/spark_salary.py 
b/scripts/staging/slicing/tests/regression/spark_salary.py
new file mode 100644
index 0000000..52d0cf2
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/spark_salary.py
@@ -0,0 +1,123 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+from pyspark import SparkConf, SparkContext
+from pyspark.ml import Pipeline
+from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, 
VectorAssembler, IndexToString
+from pyspark.ml.regression import LinearRegression
+from pyspark.sql import SQLContext
+import pyspark.sql.functions as sf
+from pyspark.sql.functions import udf
+from sklearn.model_selection import train_test_split
+
+from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer
+
+
+binner = udf(lambda arg: int(arg / 5))
+
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 6
+        b_update = True
+        debug = True
+        loss_type = 0
+        enumerator = "join"
+
+    conf = SparkConf().setAppName("salary_test").setMaster('local[2]')
+    num_partitions = 2
+    model_type = "regression"
+    label = 'salary'
+    sparkContext = SparkContext(conf=conf)
+    sqlContext = SQLContext(sparkContext)
+    dataset_df = sqlContext.read.csv('salaries.csv', header='true', 
inferSchema='true')
+    # initializing stages of main transformation pipeline
+    stages = []
+    # list of categorical features for further hot-encoding
+    cat_features = ["rank", "discipline", "sincephd_bin", "service_bin", "sex"]
+    # removing column with ID field
+    dataset_df = dataset_df.drop('_c0')
+    # bining numeric features by local binner udf function (specified for 
current dataset if needed)
+    dataset_df = dataset_df.withColumn('sincephd_bin', 
binner(dataset_df['sincephd']))
+    dataset_df = dataset_df.withColumn('service_bin', 
binner(dataset_df['service']))
+    dataset_df = dataset_df.withColumn('model_type', sf.lit(0))
+    dataset_df = dataset_df.drop('sincephd', 'service')
+    # hot encoding categorical features
+    for feature in cat_features:
+        string_indexer = StringIndexer(inputCol=feature, outputCol=feature + 
"_index")
+        encoder = 
OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], 
outputCols=[feature + "_vec"])
+        encoder.setDropLast(False)
+        stages += [string_indexer, encoder]
+    assembler_inputs = [feature + "_vec" for feature in cat_features]
+    assembler = VectorAssembler(inputCols=assembler_inputs, 
outputCol="assembled_inputs")
+    stages += [assembler]
+    assembler_final = VectorAssembler(inputCols=["assembled_inputs"], 
outputCol="features")
+    stages += [assembler_final]
+    pipeline = Pipeline(stages=stages)
+    pipeline_model = pipeline.fit(dataset_df)
+    dataset_transformed = pipeline_model.transform(dataset_df)
+    df_transform_fin = dataset_transformed.select('features', label, 
'model_type').toPandas()
+    train, test = train_test_split(df_transform_fin, test_size=0.3, 
random_state=0)
+    train_df = sqlContext.createDataFrame(train)
+    test_df = sqlContext.createDataFrame(test)
+    decode_dict = {}
+    counter = 0
+    cat = 0
+    for feature in cat_features:
+        colIdx = dataset_transformed.select(feature, feature + 
"_index").distinct().rdd.collectAsMap()
+        colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: 
item[1])}
+        for item in colIdx:
+            decode_dict[counter] = (cat, item, colIdx[item])
+            counter = counter + 1
+        cat = cat + 1
+    lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10, 
regParam=0.3, elasticNetParam=0.8)
+    lr_model = lr.fit(train_df)
+    eval = lr_model.evaluate(test_df)
+    f_l2 = eval.meanSquaredError
+    pred = eval.predictions
+    pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], 
pred['prediction'], pred['model_type']))
+    predictions = pred_df_fin.select('features', 
'error').repartition(num_partitions)
+    converter = IndexToString(inputCol='features', outputCol='cats')
+    all_features = range(predictions.toPandas().values[0][0].size)
+    predictions = predictions.collect()
+    k = 10
+    if enumerator == "join":
+        spark_slicer.parallel_process(all_features, predictions, f_l2, 
sparkContext, debug=debug, alpha=alpha, k=k, w=w,
+                                      loss_type=loss_type, 
enumerator=enumerator)
+    elif enumerator == "union":
+        spark_union_slicer.process(all_features, predictions, f_l2, 
sparkContext, debug=debug, alpha=alpha, k=k, w=w,
+                                   loss_type=loss_type, enumerator=enumerator)
diff --git a/scripts/staging/slicing/tests/regression/test_insurance.py 
b/scripts/staging/slicing/tests/regression/test_insurance.py
new file mode 100644
index 0000000..64b58c4
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/test_insurance.py
@@ -0,0 +1,103 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import sys
+
+import pandas as pd
+from sklearn.linear_model import LinearRegression
+from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+
+from slicing.base import slicer as slicer, union_slicer
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 4
+        b_update = True
+        debug = True
+        loss_type = 0
+        enumerator = "union"
+    file_name = '/slicing/datasets/insurance.csv'
+    dataset = pd.read_csv(file_name)
+    attributes_amount = len(dataset.values[0])
+    # for now working with regression datasets, assuming that target attribute 
is the last one
+    # currently non-categorical features are not supported and should be binned
+    y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+    # starting with one not including id field
+    x = dataset.iloc[:, 0:attributes_amount - 1].values
+    # list of numerical columns
+    non_categorical = [1, 3]
+    for row in x:
+        for attribute in non_categorical:
+            # <attribute - 2> as we already excluded from x id column
+            row[attribute - 1] = int(row[attribute - 1] / 5)
+    # hot encoding of categorical features
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x).toarray()
+    complete_x = []
+    complete_y = []
+    counter = 0
+    all_features = enc.get_feature_names()
+    # train model on a whole dataset
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, 
random_state=0)
+    for item in x_test:
+        complete_x.append((counter, item))
+        complete_y.append((counter, y_test[counter]))
+        counter = counter + 1
+    x_size = counter
+    model = LinearRegression()
+    model.fit(x_train, y_train)
+    preds = (model.predict(x_test) - y_test) ** 2
+    f_l2 = sum(preds)/x_size
+    errors = []
+    counter = 0
+    for pred in preds:
+        errors.append((counter, pred))
+        counter = counter + 1
+    # alpha is size significance coefficient
+    # verbose option is for returning debug info while creating slices and 
printing it
+    # k is number of top-slices we want
+    # w is a weight of error function significance (1 - w) is a size 
significance propagated into optimization function
+
+    # enumerator <union>/<join> indicates an approach of next level slices 
combination process:
+    # in case of <join> in order to create new node of current level slicer
+    # combines only nodes of previous layer with each other
+    # <union> case implementation is based on DPSize algorithm
+    if enumerator == "join":
+        slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, 
debug=debug, alpha=alpha, k=k,
+                       w=w, loss_type=loss_type, b_update=b_update)
+    elif enumerator == "union":
+        union_slicer.process(all_features, complete_x, f_l2, x_size, y_test, 
errors, debug=debug, alpha=alpha, k=k,
+                             w=w, loss_type=loss_type, b_update=b_update)
diff --git a/scripts/staging/slicing/tests/regression/test_salary.py 
b/scripts/staging/slicing/tests/regression/test_salary.py
new file mode 100644
index 0000000..92e0055
--- /dev/null
+++ b/scripts/staging/slicing/tests/regression/test_salary.py
@@ -0,0 +1,104 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+import pandas as pd
+from sklearn.linear_model import LinearRegression
+from sklearn.metrics import mean_squared_error
+from sklearn.model_selection import train_test_split
+from sklearn.preprocessing import OneHotEncoder
+import sys
+
+from slicing.base import slicer, union_slicer
+
+if __name__ == "__main__":
+    args = sys.argv
+    if len(args) > 1:
+        k = int(args[1])
+        w = float(args[2].replace(',', '.'))
+        alpha = int(args[3])
+        if args[4] == "True":
+            b_update = True
+        else:
+            b_update = False
+        debug = args[5]
+        loss_type = int(args[6])
+        enumerator = args[7]
+    else:
+        k = 10
+        w = 0.5
+        alpha = 4
+        b_update = True
+        debug = True
+        loss_type = 0
+        enumerator = "union"
+    file_name = '/slicing/datasets/salaries.csv'
+    dataset = pd.read_csv(file_name)
+    attributes_amount = len(dataset.values[0])
+    # for now working with regression datasets, assuming that target attribute 
is the last one
+    # currently non-categorical features are not supported and should be binned
+    y = dataset.iloc[:, attributes_amount - 1:attributes_amount].values
+    # starting with one not including id field
+    x = dataset.iloc[:, 1:attributes_amount - 1].values
+    # list of numerical columns
+    non_categorical = [4, 5]
+    for row in x:
+        for attribute in non_categorical:
+            # <attribute - 2> as we already excluded from x id column
+            row[attribute - 2] = int(row[attribute - 2] / 5)
+    # hot encoding of categorical features
+    enc = OneHotEncoder(handle_unknown='ignore')
+    x = enc.fit_transform(x).toarray()
+    complete_x = []
+    complete_y = []
+    counter = 0
+    all_features = enc.get_feature_names()
+    # train model on a whole dataset
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, 
random_state=0)
+    for item in x_test:
+        complete_x.append((counter, item))
+        complete_y.append((counter, y_test[counter]))
+        counter = counter + 1
+    x_size = counter
+    model = LinearRegression()
+    model.fit(x_train, y_train)
+    predictions = model.predict(x_test)
+    f_l2 = mean_squared_error(y_test, predictions)
+    preds = (model.predict(x_test) - y_test) ** 2
+    errors = []
+    counter = 0
+    for pred in preds:
+        errors.append((counter, pred))
+        counter = counter + 1
+    # alpha is size significance coefficient
+    # verbose option is for returning debug info while creating slices and 
printing it
+    # k is number of top-slices we want
+    # w is a weight of error function significance (1 - w) is a size 
significance propagated into optimization function
+
+    # enumerator <union>/<join> indicates an approach of next level slices 
combination process:
+    # in case of <join> in order to create new node of current level slicer
+    # combines only nodes of previous layer with each other
+    # <union> case implementation is based on DPSize algorithm
+    if enumerator == "join":
+        slicer.process(all_features, complete_x, f_l2, x_size, y_test, errors, 
debug=debug, alpha=alpha, k=k, w=w,
+                       loss_type=loss_type, b_update=b_update)
+    elif enumerator == "union":
+        union_slicer.process(all_features, complete_x, f_l2, x_size, y_test, 
errors, debug=debug, alpha=alpha, k=k, w=w,
+                             loss_type=loss_type, b_update=b_update)

Reply via email to