This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push: new 4939933 [FLINK-30099] Add test case for algorithms' python APIs 4939933 is described below commit 4939933a00eff0a4dccf3b9d0fa06595cdfd06e6 Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com> AuthorDate: Tue Nov 22 12:12:10 2022 +0800 [FLINK-30099] Add test case for algorithms' python APIs This closes #178. --- .../ml/lib/classification/tests/test_knn.py | 15 +++- .../ml/lib/classification/tests/test_linearsvc.py | 49 ++++++++++++- .../ml/lib/classification/tests/test_naivebayes.py | 21 +++++- .../pyflink/ml/lib/clustering/tests/test_kmeans.py | 33 ++++++++- .../pyflink/ml/lib/feature/tests/test_idf.py | 23 +++++- .../pyflink/ml/lib/feature/tests/test_imputer.py | 30 +++++++- .../lib/feature/tests/test_indextostringmodel.py | 28 ++++++++ .../ml/lib/feature/tests/test_kbinsdiscretizer.py | 22 +++++- .../ml/lib/feature/tests/test_maxabsscaler.py | 66 +++++++++++++---- .../ml/lib/feature/tests/test_minmaxscaler.py | 82 ++++++++++++++++------ .../ml/lib/feature/tests/test_onehotencoder.py | 41 ++++++++++- .../ml/lib/feature/tests/test_robustscaler.py | 34 ++++++++- .../ml/lib/feature/tests/test_standardscaler.py | 46 +++++++++++- .../ml/lib/feature/tests/test_stringindexer.py | 54 +++++++++++++- .../tests/test_variancethresholdselector.py | 32 ++++++++- .../ml/lib/feature/tests/test_vectorindexer.py | 27 ++++++- flink-ml-python/pyflink/ml/tests/test_utils.py | 15 ++++ 17 files changed, 566 insertions(+), 52 deletions(-) diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py index 77ad3fa..d140678 100644 --- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py +++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py @@ -21,7 +21,7 @@ from pyflink.common import Types from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseMatrix, DenseVector -from pyflink.ml.lib.classification.knn import KNN +from pyflink.ml.lib.classification.knn import KNN, KNNModel from pyflink.ml.tests.test_utils import PyFlinkMLTestCase @@ -157,6 +157,19 @@ class KNNTest(PyFlinkMLTestCase): self.assertEqual(packed_features.num_cols(), labels.size()) self.assertEqual(feature_norm_squares.size(), labels.size()) + def test_set_model_data(self): + knn = KNN() + model_a = knn.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = KNNModel().set_model_data(model_data) + output = model_b.transform(self.predict_data)[0] + field_names = output.get_schema().get_field_names() + self.verify_predict_result( + output, + field_names.index(knn.label_col), + field_names.index(knn.prediction_col)) + def verify_predict_result( self, output: Table, label_index, prediction_index): with self.t_env.to_data_stream(output).execute_and_collect() as results: diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py index ada27e7..2cb6a4b 100644 --- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py +++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py @@ -20,8 +20,8 @@ from pyflink.common import Types from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo -from pyflink.ml.lib.classification.linearsvc import LinearSVC -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.classification.linearsvc import LinearSVC, LinearSVCModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class LinearSVCTest(PyFlinkMLTestCase): @@ -44,6 +44,8 @@ class LinearSVCTest(PyFlinkMLTestCase): ['features', 'label', 'weight'], [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()]))) + self.eps = 0.1 + def test_param(self): linear_svc = LinearSVC() self.assertEqual('features', linear_svc.features_col) @@ -113,6 +115,49 @@ class LinearSVCTest(PyFlinkMLTestCase): linear_svc.prediction_col, linear_svc.raw_prediction_col) + def test_get_model_data(self): + linear_svc = LinearSVC().set_weight_col('weight') + model_data = linear_svc.fit(self.train_data).get_model_data()[0] + expected_field_names = ['coefficient'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertListAlmostEqual( + [0.470, -0.273, -0.410, -0.546], + model_rows[expected_field_names.index('coefficient')][0].to_array(), + delta=self.eps) + + def test_set_model_data(self): + linear_svc = LinearSVC().set_weight_col('weight') + model_a = linear_svc.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = LinearSVCModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.train_data)[0] + self.verify_prediction_result( + output, + output.get_schema().get_field_names(), + linear_svc.features_col, + linear_svc.prediction_col, + linear_svc.raw_prediction_col) + + def test_save_load_and_predict(self): + linear_svc = LinearSVC().set_weight_col('weight') + reloaded_linear_svc = self.save_and_reload(linear_svc) + model = reloaded_linear_svc.fit(self.train_data) + reloaded_model = self.save_and_reload(model) + output = reloaded_model.transform(self.train_data)[0] + self.verify_prediction_result( + output, + output.get_schema().get_field_names(), + linear_svc.features_col, + linear_svc.prediction_col, + linear_svc.raw_prediction_col) + def verify_prediction_result(self, output: Table, field_names, diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py index 35fc816..16f0c8c 100644 --- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py +++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py @@ -22,7 +22,7 @@ from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo from pyflink.ml.lib.classification.naivebayes import NaiveBayes, NaiveBayesModel -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class NaiveBayesTest(PyFlinkMLTestCase): @@ -112,6 +112,25 @@ class NaiveBayesTest(PyFlinkMLTestCase): actual_output = self.execute_and_collect(output_table) self.assertEqual(self.expected_output, actual_output) + def test_get_model_data(self): + model = self.estimator.fit(self.train_data) + model_data = model.get_model_data()[0] + expected_field_names = ['theta', 'piArray', 'labels'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved. + + def test_set_model_data(self): + model_a = self.estimator.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = NaiveBayesModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output_table = model_b.transform(self.predict_data)[0] + actual_output = self.execute_and_collect(output_table) + self.assertEqual(self.expected_output, actual_output) + def execute_and_collect(self, output: Table): res = {} with self.t_env.to_data_stream(output).execute_and_collect() as results: diff --git a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py index f4738c5..c63b26b 100644 --- a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py +++ b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py @@ -22,7 +22,7 @@ from typing import List, Dict, Set from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector from pyflink.ml.lib.clustering.kmeans import KMeans, KMeansModel, OnlineKMeans -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params def group_features_by_prediction( @@ -146,6 +146,37 @@ class KMeansTest(PyFlinkMLTestCase): actual_groups[0] == self.expected_groups[1] and actual_groups[1] == self.expected_groups[0]) + def test_get_model_data(self): + kmeans = KMeans().set_max_iter(2).set_k(2) + model = kmeans.fit(self.data_table) + model_data = model.get_model_data()[0] + expected_field_names = ['centroids', 'weights'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved. + + def test_set_model_data(self): + kmeans = KMeans().set_max_iter(2).set_k(2) + model_a = kmeans.fit(self.data_table) + model_data = model_a.get_model_data()[0] + + model_b = KMeansModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.data_table)[0] + self.assertEqual(['features', 'prediction'], output.get_schema().get_field_names()) + results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()] + field_names = output.get_schema().get_field_names() + actual_groups = group_features_by_prediction( + results, + field_names.index(kmeans.features_col), + field_names.index(kmeans.prediction_col)) + + self.assertTrue(actual_groups[0] == self.expected_groups[0] + and actual_groups[1] == self.expected_groups[1] or + actual_groups[0] == self.expected_groups[1] + and actual_groups[1] == self.expected_groups[0]) + def test_save_load_and_predict(self): kmeans = KMeans().set_max_iter(2).set_k(2) model = kmeans.fit(self.data_table) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py index d192ef3..11f1887 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py @@ -22,7 +22,7 @@ from pyflink.common import Types from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo from pyflink.ml.lib.feature.idf import IDF, IDFModel -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class IDFTest(PyFlinkMLTestCase): @@ -112,6 +112,27 @@ class IDFTest(PyFlinkMLTestCase): output = idf.fit(self.input_data).transform(self.input_data)[0] self.verify_prediction_result(self.expected_output_min_doc_freq_as_two, output) + def test_get_model_data(self): + idf = IDF() + model = idf.fit(self.input_data) + model_data = model.get_model_data()[0] + expected_field_names = ['idf', 'docFreq', 'numDocs'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after Flink dependency + # is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477 + + def test_set_model_data(self): + idf = IDF() + model_a = idf.fit(self.input_data) + model_data = model_a.get_model_data()[0] + + model_b = IDFModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.input_data)[0] + self.verify_prediction_result(self.expected_output, output) + def test_save_load_predict(self): idf = IDF() estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_idf') diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py index 3addeee..8f045ac 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py @@ -20,8 +20,8 @@ from typing import List import numpy as np from pyflink.table import Table from pyflink.common import Types, Row -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase -from pyflink.ml.lib.feature.imputer import Imputer +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params +from pyflink.ml.lib.feature.imputer import Imputer, ImputerModel class ImputerTest(PyFlinkMLTestCase): @@ -107,6 +107,32 @@ class ImputerTest(PyFlinkMLTestCase): self.verify_output_result( output, imputer.get_output_cols(), field_names, expected_output) + def test_get_model_data(self): + imputer = Imputer().\ + set_input_cols('f1', 'f2', 'f3').\ + set_output_cols('o1', 'o2', 'o3') + model = imputer.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['surrogates'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved. + + def test_set_model_data(self): + imputer = Imputer().\ + set_input_cols('f1', 'f2', 'f3').\ + set_output_cols('o1', 'o2', 'o3') + model_a = imputer.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = ImputerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.train_table)[0] + field_names = output.get_schema().get_field_names() + self.verify_output_result( + output, imputer.get_output_cols(), field_names, self.expected_mean_strategy_output) + def test_save_load_predict(self): imputer = Imputer(). \ set_input_cols('f1', 'f2', 'f3'). \ diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py index 42df909..41ef7bd 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py @@ -75,3 +75,31 @@ class IndexToStringModelTest(PyFlinkMLTestCase): predicted_results.sort(key=lambda x: x[0]) self.assertEqual(predicted_results, self.expected_prediction) + + def test_get_model_data(self): + model = IndexToStringModel() \ + .set_input_cols('input_col1', 'input_col2') \ + .set_output_cols('output_col1', 'output_col2') \ + .set_model_data(self.model_data_table) + model_data = model.get_model_data()[0] + expected_field_names = ['stringArrays'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved. + + def test_save_load_and_predict(self): + model = IndexToStringModel() \ + .set_input_cols('input_col1', 'input_col2') \ + .set_output_cols('output_col1', 'output_col2') \ + .set_model_data(self.model_data_table) + + reloaded_model = self.save_and_reload(model) + + output = reloaded_model.transform(self.predict_table)[0] + + predicted_results = [result for result in + self.t_env.to_data_stream(output).execute_and_collect()] + + predicted_results.sort(key=lambda x: x[0]) + + self.assertEqual(predicted_results, self.expected_prediction) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py index 2dacb68..70bc9a5 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py @@ -22,7 +22,7 @@ from pyflink.common import Types from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo from pyflink.ml.lib.feature.kbinsdiscretizer import KBinsDiscretizer, KBinsDiscretizerModel -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class KBinsDiscretizerTest(PyFlinkMLTestCase): @@ -156,6 +156,26 @@ class KBinsDiscretizerTest(PyFlinkMLTestCase): output = k_bins_discretizer.fit(self.train_table).transform(self.predict_table)[0] self.verify_prediction_result(self.kmeans_output, output) + def test_get_model_data(self): + k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform') + model = k_bins_discretizer.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['binEdges'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved. + + def test_set_model_data(self): + k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform') + model_a = k_bins_discretizer.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = KBinsDiscretizerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_table)[0] + self.verify_prediction_result(self.uniform_output, output) + def test_save_load_predict(self): k_bins_discretizer = KBinsDiscretizer().set_num_bins(3) estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_kbinsdiscretizer') diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py index 925ae4a..97d974d 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py @@ -21,8 +21,8 @@ from pyflink.common import Types from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector -from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler, MaxAbsScalerModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class MaxAbsScalerTest(PyFlinkMLTestCase): @@ -55,32 +55,72 @@ class MaxAbsScalerTest(PyFlinkMLTestCase): Vectors.dense(0.75, 0.225)] def test_param(self): - max_abs_scalar = MaxAbsScaler() - self.assertEqual("input", max_abs_scalar.input_col) - self.assertEqual("output", max_abs_scalar.output_col) - max_abs_scalar.set_input_col('test_input') \ + max_abs_scaler = MaxAbsScaler() + self.assertEqual("input", max_abs_scaler.input_col) + self.assertEqual("output", max_abs_scaler.output_col) + max_abs_scaler.set_input_col('test_input') \ .set_output_col('test_output') - self.assertEqual('test_input', max_abs_scalar.input_col) - self.assertEqual('test_output', max_abs_scalar.output_col) + self.assertEqual('test_input', max_abs_scaler.input_col) + self.assertEqual('test_output', max_abs_scaler.output_col) def test_output_schema(self): - max_abs_scalar = MaxAbsScaler() \ + max_abs_scaler = MaxAbsScaler() \ .set_input_col('test_input') \ .set_output_col('test_output') - model = max_abs_scalar.fit(self.train_data.alias('test_input')) + model = max_abs_scaler.fit(self.train_data.alias('test_input')) output = model.transform(self.predict_data.alias('test_input'))[0] self.assertEqual( ['test_input', 'test_output'], output.get_schema().get_field_names()) def test_fit_and_predict(self): - max_abs_scalar = MaxAbsScaler() - model = max_abs_scalar.fit(self.train_data) + max_abs_scaler = MaxAbsScaler() + model = max_abs_scaler.fit(self.train_data) output = model.transform(self.predict_data)[0] self.verify_output_result( output, - max_abs_scalar.get_output_col(), + max_abs_scaler.get_output_col(), + output.get_schema().get_field_names(), + self.expected_data) + + def test_get_model_data(self): + max_abs_scaler = MaxAbsScaler() + model = max_abs_scaler.fit(self.train_data) + model_data = model.get_model_data()[0] + expected_field_names = ['maxVector'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertListAlmostEqual([200.0, 400.0], + model_rows[0][expected_field_names.index('maxVector')]) + + def test_set_model_data(self): + max_abs_scaler = MaxAbsScaler() + model_a = max_abs_scaler.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = MaxAbsScalerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_data)[0] + self.verify_output_result( + output, + max_abs_scaler.get_output_col(), + output.get_schema().get_field_names(), + self.expected_data) + + def test_save_load_and_predict(self): + max_abs_scaler = MaxAbsScaler() + reloaded_max_abs_scaler = self.save_and_reload(max_abs_scaler) + model = reloaded_max_abs_scaler.fit(self.train_data) + reloaded_model = self.save_and_reload(model) + output = reloaded_model.transform(self.predict_data)[0] + self.verify_output_result( + output, + max_abs_scaler.get_output_col(), output.get_schema().get_field_names(), self.expected_data) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py index 230ce7b..3a699ac 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py @@ -21,8 +21,8 @@ from pyflink.common import Types from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector -from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler, MinMaxScalerModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class MinMaxScalerTest(PyFlinkMLTestCase): @@ -55,28 +55,28 @@ class MinMaxScalerTest(PyFlinkMLTestCase): Vectors.dense(0.75, 0.225)] def test_param(self): - min_max_scalar = MinMaxScaler() - self.assertEqual("input", min_max_scalar.input_col) - self.assertEqual("output", min_max_scalar.output_col) - self.assertEqual(0.0, min_max_scalar.min) - self.assertEqual(1.0, min_max_scalar.max) - min_max_scalar.set_input_col('test_input') \ + min_max_scaler = MinMaxScaler() + self.assertEqual("input", min_max_scaler.input_col) + self.assertEqual("output", min_max_scaler.output_col) + self.assertEqual(0.0, min_max_scaler.min) + self.assertEqual(1.0, min_max_scaler.max) + min_max_scaler.set_input_col('test_input') \ .set_output_col('test_output') \ .set_min(1.0) \ .set_max(4.0) - self.assertEqual('test_input', min_max_scalar.input_col) - self.assertEqual(1.0, min_max_scalar.min) - self.assertEqual(4.0, min_max_scalar.max) - self.assertEqual('test_output', min_max_scalar.output_col) + self.assertEqual('test_input', min_max_scaler.input_col) + self.assertEqual(1.0, min_max_scaler.min) + self.assertEqual(4.0, min_max_scaler.max) + self.assertEqual('test_output', min_max_scaler.output_col) def test_output_schema(self): - min_max_scalar = MinMaxScaler() \ + min_max_scaler = MinMaxScaler() \ .set_input_col('test_input') \ .set_output_col('test_output') \ .set_min(1.0) \ .set_max(4.0) - model = min_max_scalar.fit(self.train_data.alias('test_input')) + model = min_max_scaler.fit(self.train_data.alias('test_input')) output = model.transform(self.predict_data.alias('test_input'))[0] self.assertEqual( ['test_input', 'test_output'], @@ -99,25 +99,67 @@ class MinMaxScalerTest(PyFlinkMLTestCase): ['input'], [DenseVectorTypeInfo()]))) - min_max_scalar = MinMaxScaler() \ + min_max_scaler = MinMaxScaler() \ .set_min(0.0) \ .set_max(10.0) - model = min_max_scalar.fit(train_data) + model = min_max_scaler.fit(train_data) result = model.transform(predict_data)[0] self.verify_output_result( result, - min_max_scalar.get_output_col(), + min_max_scaler.get_output_col(), result.get_schema().get_field_names(), [Vectors.dense(5.0, 5.0)]) def test_fit_and_predict(self): - min_max_scalar = MinMaxScaler() - model = min_max_scalar.fit(self.train_data) + min_max_scaler = MinMaxScaler() + model = min_max_scaler.fit(self.train_data) output = model.transform(self.predict_data)[0] self.verify_output_result( output, - min_max_scalar.get_output_col(), + min_max_scaler.get_output_col(), + output.get_schema().get_field_names(), + self.expected_data) + + def test_get_model_data(self): + min_max_scaler = MinMaxScaler() + model = min_max_scaler.fit(self.train_data) + model_data = model.get_model_data()[0] + expected_field_names = ['minVector', 'maxVector'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertListAlmostEqual( + [0.0, 0.0], model_rows[0][expected_field_names.index('minVector')]) + self.assertListAlmostEqual( + [200.0, 400.0], model_rows[0][expected_field_names.index('maxVector')]) + + def test_set_model_data(self): + min_max_scaler = MinMaxScaler() + model_a = min_max_scaler.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = MinMaxScalerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_data)[0] + self.verify_output_result( + output, + min_max_scaler.get_output_col(), + output.get_schema().get_field_names(), + self.expected_data) + + def test_save_load_and_predict(self): + min_max_scaler = MinMaxScaler() + reloaded_min_max_scaler = self.save_and_reload(min_max_scaler) + model = reloaded_min_max_scaler.fit(self.train_data) + reloaded_model = self.save_and_reload(model) + output = reloaded_model.transform(self.predict_data)[0] + self.verify_output_result( + output, + min_max_scaler.get_output_col(), output.get_schema().get_field_names(), self.expected_data) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py index 795200b..fe26e30 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py @@ -23,7 +23,7 @@ from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, SparseVector from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder, OneHotEncoderModel -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class OneHotEncoderTest(PyFlinkMLTestCase): @@ -106,6 +106,45 @@ class OneHotEncoderTest(PyFlinkMLTestCase): output_table.get_schema().get_field_names(), expected_data) + def test_get_model_data(self): + model = self.estimator.fit(self.train_data) + model_data = model.get_model_data()[0] + expected_field_names = ['f0', 'f1'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertEqual(0, model_rows[0][expected_field_names.index('f0')]) + self.assertEqual(2, model_rows[0][expected_field_names.index('f1')]) + + def test_set_model_data(self): + model_a = self.estimator.fit(self.train_data) + model_data = model_a.get_model_data()[0] + + model_b = OneHotEncoderModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_data)[0] + self.verify_output_result( + output, + model_b.input_cols, + model_b.output_cols, + output.get_schema().get_field_names(), + self.expected_data) + + def test_save_load_and_predict(self): + reloaded_estimator = self.save_and_reload(self.estimator) + model = reloaded_estimator.fit(self.train_data) # type: OneHotEncoderModel + reloaded_model = self.save_and_reload(model) + output_table = reloaded_model.transform(self.predict_data)[0] + self.verify_output_result( + output_table, + model.input_cols, + model.output_cols, + output_table.get_schema().get_field_names(), + self.expected_data) + def verify_output_result( self, output: Table, diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py index d423aee..5c51aac 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py @@ -18,11 +18,11 @@ from typing import List from pyflink.common import Types -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector -from pyflink.ml.lib.feature.robustscaler import RobustScaler +from pyflink.ml.lib.feature.robustscaler import RobustScaler, RobustScalerModel from pyflink.table import Table @@ -108,6 +108,36 @@ class RobustScalerTest(PyFlinkMLTestCase): output.get_schema().get_field_names(), self.expected_output) + def test_get_model_data(self): + robust_scaler = RobustScaler() + model = robust_scaler.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['medians', 'ranges'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertListAlmostEqual( + [4.0, -4.0], model_rows[0][expected_field_names.index('medians')]) + self.assertListAlmostEqual( + [4.0, 4.0], model_rows[0][expected_field_names.index('ranges')]) + + def test_set_model_data(self): + robust_scaler = RobustScaler() + model_a = robust_scaler.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = RobustScalerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_table)[0] + self.verify_output_result( + output, + robust_scaler.get_output_col(), + output.get_schema().get_field_names(), + self.expected_output) + def test_save_load_predict(self): robust_scaler = RobustScaler() reloaded_robust_scaler = self.save_and_reload(robust_scaler) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py index a4216af..469d839 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py @@ -21,8 +21,8 @@ from pyflink.table import Table from typing import List from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector -from pyflink.ml.lib.feature.standardscaler import StandardScaler -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.feature.standardscaler import StandardScaler, StandardScalerModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class StandardScalerTest(PyFlinkMLTestCase): @@ -113,6 +113,48 @@ class StandardScalerTest(PyFlinkMLTestCase): standard_scaler.get_output_col(), self.expected_res_with_mean_and_std) + def test_get_model_data(self): + standard_scaler = StandardScaler() + model = standard_scaler.fit(self.dense_input) + model_data = model.get_model_data()[0] + expected_field_names = ['mean', 'std'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + model_rows = [result for result in + self.t_env.to_data_stream(model_data).execute_and_collect()] + self.assertEqual(1, len(model_rows)) + self.assertListAlmostEqual( + self.expected_mean, model_rows[0][expected_field_names.index('mean')]) + self.assertListAlmostEqual( + self.expected_std, model_rows[0][expected_field_names.index('std')]) + + def test_set_model_data(self): + standard_scaler = StandardScaler() + model_a = standard_scaler.fit(self.dense_input) + model_data = model_a.get_model_data()[0] + + model_b = StandardScalerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.dense_input)[0] + self.verify_output_result( + output, + output.get_schema().get_field_names(), + standard_scaler.get_output_col(), + self.expected_res_with_std) + + def test_save_load_and_predict(self): + standard_scaler = StandardScaler() + reloaded_standard_scaler = self.save_and_reload(standard_scaler) + model = reloaded_standard_scaler.fit(self.dense_input) + reloaded_model = self.save_and_reload(model) + output = reloaded_model.transform(self.dense_input)[0] + self.verify_output_result( + output, + output.get_schema().get_field_names(), + standard_scaler.get_output_col(), + self.expected_res_with_std) + def verify_output_result( self, output: Table, diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py index 460da9b..0f6204b 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py @@ -18,8 +18,8 @@ from pyflink.common import Types, Row -from pyflink.ml.lib.feature.stringindexer import StringIndexer -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.feature.stringindexer import StringIndexer, StringIndexerModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class StringIndexerTest(PyFlinkMLTestCase): @@ -118,3 +118,53 @@ class StringIndexerTest(PyFlinkMLTestCase): predicted_results.sort(key=lambda x: x[0]) self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data) + + def test_get_model_data(self): + string_indexer = StringIndexer() \ + .set_input_cols('input_col1', 'input_col2') \ + .set_output_cols('output_col1', 'output_col2') \ + .set_string_order_type('alphabetAsc') + model = string_indexer.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['stringArrays'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved. + + def test_set_model_data(self): + string_indexer = StringIndexer() \ + .set_input_cols('input_col1', 'input_col2') \ + .set_output_cols('output_col1', 'output_col2') \ + .set_string_order_type('alphabetAsc') \ + .set_handle_invalid('keep') + model_a = string_indexer.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = StringIndexerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_table)[0] + + predicted_results = [result for result in + self.t_env.to_data_stream(output).execute_and_collect()] + + predicted_results.sort(key=lambda x: x[0]) + + self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data) + + def test_save_load_and_predict(self): + string_indexer = StringIndexer() \ + .set_input_cols('input_col1', 'input_col2') \ + .set_output_cols('output_col1', 'output_col2') \ + .set_string_order_type('alphabetAsc') \ + .set_handle_invalid('keep') + reloaded_string_indexer = self.save_and_reload(string_indexer) + + model = reloaded_string_indexer.fit(self.train_table) + reloaded_model = self.save_and_reload(model) + + output = reloaded_model.transform(self.predict_table)[0] + predicted_results = [result for result in + self.t_env.to_data_stream(output).execute_and_collect()] + predicted_results.sort(key=lambda x: x[0]) + self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py index b7a49a4..013b383 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py @@ -21,8 +21,9 @@ from pyflink.common import Types from pyflink.table import Table from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector -from pyflink.ml.lib.feature.variancethresholdselector import VarianceThresholdSelector -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.lib.feature.variancethresholdselector import \ + VarianceThresholdSelector, VarianceThresholdSelectorModel +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class VarianceThresholdSelectorTest(PyFlinkMLTestCase): @@ -112,6 +113,33 @@ class VarianceThresholdSelectorTest(PyFlinkMLTestCase): output.get_schema().get_field_names(), self.expected_output) + def test_get_model_data(self): + variance_threshold_selector = VarianceThresholdSelector() \ + .set_variance_threshold(8.0) + model = variance_threshold_selector.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['numOfFeatures', 'indices'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after Flink dependency + # is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477 + + def test_set_model_data(self): + variance_threshold_selector = VarianceThresholdSelector() \ + .set_variance_threshold(8.0) + model_a = variance_threshold_selector.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = VarianceThresholdSelectorModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_table)[0] + self.verify_output_result( + output, + variance_threshold_selector.get_output_col(), + output.get_schema().get_field_names(), + self.expected_output) + def test_save_load_predict(self): variance_threshold_selector = VarianceThresholdSelector() \ .set_variance_threshold(8.0) diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py index 8e63427..9ca499b 100644 --- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py +++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py @@ -22,7 +22,7 @@ from pyflink.common import Types from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo from pyflink.ml.lib.feature.vectorindexer import VectorIndexer, VectorIndexerModel -from pyflink.ml.tests.test_utils import PyFlinkMLTestCase +from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params class VectorIndexerTest(PyFlinkMLTestCase): @@ -101,3 +101,28 @@ class VectorIndexerTest(PyFlinkMLTestCase): predicted_results.sort(key=lambda x: x[1]) self.expected_output.sort(key=lambda x: x[1]) self.assertEqual(self.expected_output, predicted_results) + + def test_get_model_data(self): + vector_indexer = VectorIndexer().set_handle_invalid('keep') + model = vector_indexer.fit(self.train_table) + model_data = model.get_model_data()[0] + expected_field_names = ['categoryMaps'] + self.assertEqual(expected_field_names, model_data.get_schema().get_field_names()) + + # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved. + + def test_set_model_data(self): + vector_indexer = VectorIndexer().set_handle_invalid('keep') + model_a = vector_indexer.fit(self.train_table) + model_data = model_a.get_model_data()[0] + + model_b = VectorIndexerModel().set_model_data(model_data) + update_existing_params(model_b, model_a) + + output = model_b.transform(self.predict_table)[0] + predicted_results = [result[1] for result in + self.t_env.to_data_stream(output).execute_and_collect()] + + predicted_results.sort(key=lambda x: x[1]) + self.expected_output.sort(key=lambda x: x[1]) + self.assertEqual(self.expected_output, predicted_results) diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py index 04104c4..4b376dc 100644 --- a/flink-ml-python/pyflink/ml/tests/test_utils.py +++ b/flink-ml-python/pyflink/ml/tests/test_utils.py @@ -24,9 +24,17 @@ import uuid from pyflink.common import RestartStrategies, Configuration from pyflink.datastream import StreamExecutionEnvironment +from pyflink.java_gateway import get_gateway from pyflink.table import StreamTableEnvironment from pyflink.util.java_utils import get_j_env_configuration +from pyflink.ml.core.wrapper import JavaWithParams + + +def update_existing_params(target: JavaWithParams, source: JavaWithParams): + get_gateway().jvm.org.apache.flink.ml.util.ReadWriteUtils \ + .updateExistingParams(target._java_obj, source._java_obj.getParamMap()) + class PyFlinkMLTestCase(unittest.TestCase): def setUp(self): @@ -84,3 +92,10 @@ class PyFlinkMLTestCase(unittest.TestCase): raise e load_func = getattr(stage, 'load') return load_func(self.t_env, path) + + def assertListAlmostEqual(self, expected_list, actual_list, places=None, msg=None, + delta=None): + self.assertEqual(len(expected_list), len(actual_list)) + for i in range(len(expected_list)): + self.assertAlmostEqual(expected_list[i], actual_list[i], + places=places, msg=msg, delta=delta)