This is an automated email from the ASF dual-hosted git repository. pingsutw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push: new dee3a83 SUBMARINE-1006. Upgrade tensorflow to version 2.6 dee3a83 is described below commit dee3a837b13c030d35bddf14b5293d21da064e66 Author: featherchen <garychen0975321...@gmail.com> AuthorDate: Tue Oct 12 12:16:58 2021 +0800 SUBMARINE-1006. Upgrade tensorflow to version 2.6 ### What is this PR for? <!-- A few sentences describing the overall goals of the pull request's commits. First time? Check out the contributing guide - https://submarine.apache.org/contribution/contributions.html --> Upgrade tensorflow to 2.6 and update the test. In this PR , I used tf.compat.v1 to support tensorflow2.0, and ignored distributive training(since the config is a little different to tf1). This is a temporary step, and I will use primitive tf2 functions to rewrite it and solve the distributive training problem in the future. ### What type of PR is it? Feature ### Todos ### What is the Jira issue? <!-- * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE/ * Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. `SUBMARINE-23. PR title` --> https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1006?filter=myopenissues ### How should this be tested? <!-- * First time? Setup Travis CI as described on https://submarine.apache.org/contribution/contributions.html#continuous-integration * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. --> Use the tests in pysubmarine/tests/ml/tensorflow_v2 ### Screenshots (if appropriate) ![image](https://user-images.githubusercontent.com/57944334/136891858-6cd3b009-ab6f-4b11-894a-2afa630b7410.png) ### Questions: * Do the license files need updating? No * Are there breaking changes for older versions? No * Does this need new documentation? No Author: featherchen <garychen0975321...@gmail.com> Signed-off-by: Kevin <pings...@apache.org> Closes #776 from featherchen/SUBMARINE-1006 and squashes the following commits: da56496e [featherchen] SUBMARINE-1006. Add tf2 sdk cd8d37ea [featherchen] SUBMARINE-1006. format and workflow e7a48437 [featherchen] SUBMARINE-1006. add tf_v2 and corresponding tests --- .github/workflows/python.yml | 7 +- submarine-sdk/pysubmarine/setup.py | 2 +- .../ml/tensorflow_v2/__init__.py} | 11 - .../ml/tensorflow_v2/input/__init__.py} | 11 +- .../submarine/ml/tensorflow_v2/input/input.py | 58 +++++ .../ml/tensorflow_v2/layers/__init__.py} | 11 - .../submarine/ml/tensorflow_v2/layers/core.py | 248 +++++++++++++++++++++ .../ml/tensorflow_v2/model/__init__.py} | 14 +- .../ml/tensorflow_v2/model/base_tf_model.py | 106 +++++++++ .../submarine/ml/tensorflow_v2/model/ccpm.py | 73 ++++++ .../submarine/ml/tensorflow_v2/model/deepfm.py | 60 +++++ .../submarine/ml/tensorflow_v2/model/fm.py | 45 ++++ .../submarine/ml/tensorflow_v2/model/nfm.py | 52 +++++ .../submarine/ml/tensorflow_v2/optimizer.py | 49 ++++ .../ml/tensorflow_v2/parameters.py} | 35 ++- .../ml/tensorflow_v2/registries.py} | 11 +- .../pysubmarine/submarine/utils/tf_utils_v2.py | 146 ++++++++++++ .../tensorflow/{model/test_fm.py => __init__.py} | 11 - .../tensorflow/model/{test_fm.py => __init__.py} | 11 - .../ml/tensorflow/model/test_base_tf_model.py | 2 + .../tests/ml/tensorflow/model/test_ccpm.py | 4 + .../tests/ml/tensorflow/model/test_deepfm.py | 4 + .../tests/ml/tensorflow/model/test_fm.py | 4 + .../tests/ml/tensorflow/model/test_nfm.py | 4 + .../tests/ml/tensorflow/test_optimizer.py | 2 + .../model/test_fm.py => tensorflow_v2/__init__.py} | 11 - .../tests/ml/tensorflow_v2/model/conftest.py | 50 +++++ .../model/test_base_tf_model.py | 4 +- .../model/test_ccpm.py | 6 +- .../model/test_deepfm.py | 6 +- .../{tensorflow => tensorflow_v2}/model/test_fm.py | 6 +- .../model/test_nfm.py | 6 +- .../test_optimizer.py | 4 +- .../pysubmarine/tests/utils/test_tf_utils.py | 2 + .../{test_tf_utils.py => test_tf_utils_v2.py} | 19 +- 35 files changed, 988 insertions(+), 107 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index a741316..f5b6b64 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -25,7 +25,7 @@ jobs: strategy: matrix: python-version: [3.6, 3.7] - tf-version: [1.14.0, 1.15.0] + tf-version: [1.14.0, 1.15.0, 2.6.0] fail-fast: false steps: - uses: actions/checkout@v2 @@ -38,6 +38,8 @@ jobs: pip install --upgrade pip pip install --no-cache-dir tensorflow==${{ matrix.tf-version }} pip install --no-cache-dir torch==1.5.0 + pip install --no-cache-dir tensorflow-addons + pip install --no-cache-dir tf_slim pip install --no-cache-dir ./submarine-sdk/pysubmarine/. pip install -r ./submarine-sdk/pysubmarine/github-actions/test-requirements.txt pip install -r ./dev-support/style-check/python/lint-requirements.txt @@ -46,6 +48,7 @@ jobs: run: ./dev-support/style-check/python/lint.sh - name: Run unit test run: pytest --cov=submarine -vs -m "not e2e" + integration: runs-on: ubuntu-latest timeout-minutes: 60 @@ -93,6 +96,8 @@ jobs: run: | pip install --upgrade pip pip install --no-cache-dir -e ./submarine-sdk/pysubmarine/.[tf,pytorch] + pip install --no-cache-dir tensorflow-addons + pip install --no-cache-dir tf_slim pip install -r ./submarine-sdk/pysubmarine/github-actions/test-requirements.txt - name: Run integration test working-directory: ./submarine-sdk/pysubmarine diff --git a/submarine-sdk/pysubmarine/setup.py b/submarine-sdk/pysubmarine/setup.py index 6355434..eb2a687 100644 --- a/submarine-sdk/pysubmarine/setup.py +++ b/submarine-sdk/pysubmarine/setup.py @@ -43,7 +43,7 @@ setup( ], extras_require={ "tf": ["tensorflow>=1.14.0,<2.0.0"], - "tf-latest": ["tensorflow"], + "tf2": ["tensorflow==2.6.0", "tf_slim==1.1.0", "tensorflow-addons==0.14.0"], "pytorch": ["torch>=1.5.0", "torchvision>=0.6.0"], }, classifiers=[ diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py similarity index 78% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py index bedda94..a6eb1b5 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py @@ -12,14 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from submarine.ml.tensorflow.model import FM - - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py similarity index 79% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py index bedda94..11f1374 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py @@ -13,13 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import FM +from .input import libsvm_input_fn - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() +__all__ = ["libsvm_input_fn"] diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py new file mode 100644 index 0000000..0cf98ae --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import tensorflow as tf + +logger = logging.getLogger(__name__) + +AUTOTUNE = tf.data.experimental.AUTOTUNE + + +def libsvm_input_fn( + filepath, + batch_size=256, + num_epochs=3, # pylint: disable=W0613 + perform_shuffle=False, + delimiter=" ", + **kwargs +): + def _input_fn(): + def decode_libsvm(line): + columns = tf.compat.v1.string_split([line], delimiter) + labels = tf.strings.to_number(columns.values[0], out_type=tf.float32) + splits = tf.compat.v1.string_split(columns.values[1:], ":") + id_vals = tf.reshape(splits.values, splits.dense_shape) + feat_ids, feat_vals = tf.split(id_vals, num_or_size_splits=2, axis=1) + feat_ids = tf.strings.to_number(feat_ids, out_type=tf.int32) + feat_vals = tf.strings.to_number(feat_vals, out_type=tf.float32) + return {"feat_ids": feat_ids, "feat_vals": feat_vals}, labels + + dataset = ( + tf.data.TextLineDataset(filepath) + .map(decode_libsvm, num_parallel_calls=AUTOTUNE) + .prefetch(AUTOTUNE) + ) + + if perform_shuffle: + dataset = dataset.shuffle(buffer_size=batch_size) + + dataset = dataset.repeat(num_epochs) + dataset = dataset.batch(batch_size) + + return dataset + + return _input_fn diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py similarity index 78% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py index bedda94..a6eb1b5 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py @@ -12,14 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from submarine.ml.tensorflow.model import FM - - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py new file mode 100644 index 0000000..a75ff71 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py @@ -0,0 +1,248 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tensorflow as tf +import tf_slim as slim +from tensorflow.keras.layers import Layer + +tf.compat.v1.disable_v2_behavior() + + +def batch_norm_layer(x, train_phase, scope_bn, batch_norm_decay): + bn_train = slim.batch_norm( + x, + decay=batch_norm_decay, + center=True, + scale=True, + updates_collections=None, + is_training=True, + reuse=None, + scope=scope_bn, + ) + bn_infer = slim.batch_norm( + x, + decay=batch_norm_decay, + center=True, + scale=True, + updates_collections=None, + is_training=False, + reuse=True, + scope=scope_bn, + ) + return tf.cond( + pred=tf.cast(train_phase, tf.bool), true_fn=lambda: bn_train, false_fn=lambda: bn_infer + ) + + +def dnn_layer( + inputs, + estimator_mode, + batch_norm, + deep_layers, + dropout, + batch_norm_decay=0.9, + l2_reg=0, + **kwargs +): + """ + The Multi Layer Perceptron + :param inputs: A tensor of at least rank 2 and static value for the last dimension; i.e. + [batch_size, depth], [None, None, None, channels]. + :param estimator_mode: Standard names for Estimator model modes. `TRAIN`, `EVAL`, `PREDICT` + :param batch_norm: Whether use BatchNormalization before activation or not. + :param batch_norm_decay: Decay for the moving average. + Reasonable values for decay are close to 1.0, typically in the + multiple-nines range: 0.999, 0.99, 0.9, etc. + :param deep_layers: list of positive integer, the layer number and units in each layer. + :param dropout: float in [0,1). Fraction of the units to dropout. + :param l2_reg: float between 0 and 1. + L2 regularizer strength applied to the kernel weights matrix. + """ + with tf.compat.v1.variable_scope("DNN_Layer"): + if batch_norm: + + if estimator_mode == tf.estimator.ModeKeys.TRAIN: + train_phase = True + else: + train_phase = False + + for i in range(len(deep_layers)): + deep_inputs = slim.fully_connected( + inputs=inputs, + num_outputs=deep_layers[i], + weights_regularizer=tf.keras.regularizers.l2(0.5 * (l2_reg)), + scope="mlp%d" % i, + ) + if batch_norm: + deep_inputs = batch_norm_layer( + deep_inputs, + train_phase=train_phase, + scope_bn="bn_%d" % i, + batch_norm_decay=batch_norm_decay, + ) + if estimator_mode == tf.estimator.ModeKeys.TRAIN: + deep_inputs = tf.nn.dropout(deep_inputs, rate=1 - (dropout[i])) + + deep_out = slim.fully_connected( + inputs=deep_inputs, + num_outputs=1, + activation_fn=tf.identity, + weights_regularizer=tf.keras.regularizers.l2(0.5 * (l2_reg)), + scope="deep_out", + ) + deep_out = tf.reshape(deep_out, shape=[-1]) + return deep_out + + +def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs): + """ + Layer which represents linear function. + :param features: input features + :param feature_size: size of features + :param field_size: number of fields in the features + :param l2_reg: float between 0 and 1. + L2 regularizer strength applied to the kernel weights matrix. + """ + feat_ids = features["feat_ids"] + feat_ids = tf.reshape(feat_ids, shape=[-1, field_size]) + feat_vals = features["feat_vals"] + feat_vals = tf.reshape(feat_vals, shape=[-1, field_size]) + + regularizer = tf.keras.regularizers.l2(0.5 * (l2_reg)) + with tf.compat.v1.variable_scope("LinearLayer_Layer"): + linear_bias = tf.compat.v1.get_variable( + name="linear_bias", shape=[1], initializer=tf.compat.v1.constant_initializer(0.0) + ) + linear_weight = tf.compat.v1.get_variable( + name="linear_weight", + shape=[feature_size], + initializer=tf.compat.v1.glorot_normal_initializer(), + regularizer=regularizer, + ) + + feat_weights = tf.nn.embedding_lookup(params=linear_weight, ids=feat_ids) + linear_out = ( + tf.reduce_sum(input_tensor=tf.multiply(feat_weights, feat_vals), axis=1) + linear_bias + ) + return linear_out + + +def embedding_layer(features, feature_size, field_size, embedding_size, l2_reg=0, **kwargs): + """ + Turns positive integers (indexes) into dense vectors of fixed size. + eg. [[4], [20]] -> [[0.25, 0.1], [0.6, -0.2]] + :param features: input features + :param feature_size: size of features + :param field_size: number of fields in the features + :param embedding_size: sparse feature embedding_size + :param l2_reg: float between 0 and 1. + L2 regularizer strength applied to the kernel weights matrix. + """ + feat_ids = features["feat_ids"] + feat_ids = tf.reshape(feat_ids, shape=[-1, field_size]) + feat_vals = features["feat_vals"] + feat_vals = tf.reshape(feat_vals, shape=[-1, field_size]) + + with tf.compat.v1.variable_scope("Embedding_Layer"): + regularizer = tf.keras.regularizers.l2(0.5 * (l2_reg)) + embedding_dict = tf.compat.v1.get_variable( + name="embedding_dict", + shape=[feature_size, embedding_size], + initializer=tf.compat.v1.glorot_normal_initializer(), + regularizer=regularizer, + ) + embeddings = tf.nn.embedding_lookup(params=embedding_dict, ids=feat_ids) + feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1]) + embedding_out = tf.multiply(embeddings, feat_vals) + return embedding_out + + +def bilinear_layer(inputs, **kwargs): + """ + Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise product of features + into one single vector. + :param inputs: input features + """ + + with tf.compat.v1.variable_scope("BilinearLayer_Layer"): + sum_square = tf.square(tf.reduce_sum(input_tensor=inputs, axis=1)) + square_sum = tf.reduce_sum(input_tensor=tf.square(inputs), axis=1) + bilinear_out = 0.5 * tf.subtract(sum_square, square_sum) + return bilinear_out + + +def fm_layer(inputs, **kwargs): + """ + Factorization Machine models pairwise (order-2) feature interactions + without linear term and bias. + :param inputs: input features + """ + with tf.compat.v1.variable_scope("FM_Layer"): + sum_square = tf.square(tf.reduce_sum(input_tensor=inputs, axis=1)) + square_sum = tf.reduce_sum(input_tensor=tf.square(inputs), axis=1) + fm_out = 0.5 * tf.reduce_sum(input_tensor=tf.subtract(sum_square, square_sum), axis=1) + return fm_out + + +class KMaxPooling(Layer): + """K Max pooling that selects the k biggest value along the specific axis. + Input shape + - nD tensor with shape: ``(batch_size, ..., input_dim)``. + Output shape + - nD tensor with shape: ``(batch_size, ..., output_dim)``. + Arguments + - **k**: positive integer, number of top elements to look for along the ``axis`` dimension. + - **axis**: positive integer, the dimension to look for elements. + """ + + def __init__(self, k=1, axis=-1, **kwargs): + + self.dims = 1 + self.k = k + self.axis = axis + super(KMaxPooling, self).__init__(**kwargs) + + def build(self, input_shape): + + if self.axis < 1 or self.axis > len(input_shape): + raise ValueError("axis must be 1~%d,now is %d" % (len(input_shape), self.axis)) + + if self.k < 1 or self.k > input_shape[self.axis]: + raise ValueError("k must be in 1 ~ %d,now k is %d" % (input_shape[self.axis], self.k)) + self.dims = len(input_shape) + super(KMaxPooling, self).build(input_shape) + + def call(self, inputs): + + perm = list(range(self.dims)) + perm[-1], perm[self.axis] = perm[self.axis], perm[-1] + shifted_input = tf.transpose(a=inputs, perm=perm) + + top_k = tf.nn.top_k(shifted_input, k=self.k, sorted=True, name=None)[0] + output = tf.transpose(a=top_k, perm=perm) + + return output + + def compute_output_shape(self, input_shape): + output_shape = list(input_shape) + output_shape[self.axis] = self.k + return tuple(output_shape) + + def get_config( + self, + ): + config = {"k": self.k, "axis": self.axis} + base_config = super(KMaxPooling, self).get_config() + return dict(list(base_config.items()) + list(config.items())) diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py similarity index 79% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py index bedda94..6651f47 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py @@ -13,13 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import FM +from .ccpm import CCPM +from .deepfm import DeepFM +from .fm import FM +from .nfm import NFM - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() +__all__ = ["DeepFM", "FM", "NFM", "CCPM"] diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py new file mode 100644 index 0000000..eeb2cd4 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from abc import ABC + +import numpy as np +import tensorflow as tf + +from submarine.ml.abstract_model import AbstractModel +from submarine.ml.tensorflow_v2.parameters import default_parameters +from submarine.ml.tensorflow_v2.registries import input_fn_registry +from submarine.utils.env import get_from_dicts, get_from_json, get_from_registry +from submarine.utils.tf_utils_v2 import get_tf_config + +logger = logging.getLogger(__name__) + + +# pylint: disable=W0221 +class BaseTFModel(AbstractModel, ABC): + def __init__(self, model_params=None, json_path=None): + super().__init__() + self.model_params = get_from_dicts(model_params, default_parameters) + self.model_params = get_from_json(json_path, self.model_params) + self._sanity_checks() + logging.info("Model parameters : %s", self.model_params) + self.input_type = self.model_params["input"]["type"] + self.model_dir = self.model_params["output"]["save_model_dir"] + self.config = get_tf_config(self.model_params) + self.model = tf.estimator.Estimator( + model_fn=self.model_fn, + model_dir=self.model_dir, + params=self.model_params, + config=self.config, + ) + + def train(self, train_input_fn=None, eval_input_fn=None, **kwargs): + """ + Trains a pre-defined tensorflow estimator model with given training data + :param train_input_fn: A function that provides input data for training. + :param eval_input_fn: A function that provides input data for evaluating. + :return: None + """ + if train_input_fn is None: + train_input_fn = get_from_registry(self.input_type, input_fn_registry)( + filepath=self.model_params["input"]["train_data"], **self.model_params["training"] + ) + if eval_input_fn is None: + eval_input_fn = get_from_registry(self.input_type, input_fn_registry)( + filepath=self.model_params["input"]["valid_data"], **self.model_params["training"] + ) + + train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn) + eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn) + tf.estimator.train_and_evaluate(self.model, train_spec, eval_spec, **kwargs) + + def evaluate(self, eval_input_fn=None, **kwargs): + """ + Evaluates a pre-defined Tensorflow estimator model with given evaluate data + :param eval_input_fn: A function that provides input data for evaluating. + :return: A dict containing the evaluation metrics specified in `eval_input_fn` keyed by + name, as well as an entry `global_step` which contains the value of the + global step for which this evaluation was performed + """ + if eval_input_fn is None: + eval_input_fn = get_from_registry(self.input_type, input_fn_registry)( + filepath=self.model_params["input"]["valid_data"], **self.model_params["training"] + ) + + return self.model.evaluate(input_fn=eval_input_fn, **kwargs) + + def predict(self, predict_input_fn=None, **kwargs): + """ + Yields predictions with given features. + :param predict_input_fn: A function that constructs the features. + Prediction continues until input_fn raises an end-of-input exception + :return: Evaluated values of predictions tensors. + """ + if predict_input_fn is None: + predict_input_fn = get_from_registry(self.input_type, input_fn_registry)( + filepath=self.model_params["input"]["test_data"], **self.model_params["training"] + ) + + return self.model.predict(input_fn=predict_input_fn, **kwargs) + + def _sanity_checks(self): + assert "input" in self.model_params, "Does not define any input parameters" + assert "type" in self.model_params["input"], "Does not define any input type" + assert "output" in self.model_params, "Does not define any output parameters" + + def model_fn(self, features, labels, mode, params): + seed = params["training"]["seed"] + np.random.seed(seed) + tf.compat.v1.set_random_seed(seed) diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py new file mode 100644 index 0000000..5657294 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import tensorflow as tf + +from submarine.ml.tensorflow_v2.layers.core import ( + KMaxPooling, + dnn_layer, + embedding_layer, + linear_layer, +) +from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel +from submarine.utils.tf_utils_v2 import get_estimator_spec + +logger = logging.getLogger(__name__) + + +class CCPM(BaseTFModel): + def model_fn(self, features, labels, mode, params): + super().model_fn(features, labels, mode, params) + + if len(params["training"]["conv_kernel_width"]) != len(params["training"]["conv_filters"]): + raise ValueError("conv_kernel_width must have same element with conv_filters") + + linear_logit = linear_layer(features, **params["training"]) + embedding_outputs = embedding_layer(features, **params["training"]) + conv_filters = params["training"]["conv_filters"] + conv_kernel_width = params["training"]["conv_kernel_width"] + + n = params["training"]["embedding_size"] + conv_filters_len = len(conv_filters) + conv_input = tf.concat(embedding_outputs, axis=1) + + pooling_result = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=3))(conv_input) + + for i in range(1, conv_filters_len + 1): + filters = conv_filters[i - 1] + width = conv_kernel_width[i - 1] + p = pow(i / conv_filters_len, conv_filters_len - i) + k = max(1, int((1 - p) * n)) if i < conv_filters_len else 3 + + conv_result = tf.keras.layers.Conv2D( + filters=filters, + kernel_size=(width, 1), + strides=(1, 1), + padding="same", + activation="tanh", + use_bias=True, + )(pooling_result) + + pooling_result = KMaxPooling(k=min(k, int(conv_result.shape[1])), axis=1)(conv_result) + + flatten_result = tf.keras.layers.Flatten()(pooling_result) + deep_logit = dnn_layer(flatten_result, mode, **params["training"]) + + with tf.compat.v1.variable_scope("CCPM_out"): + logit = linear_logit + deep_logit + + return get_estimator_spec(logit, labels, mode, params) diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py new file mode 100644 index 0000000..397ac4f --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Tensorflow implementation of DeepFM + +Reference: +[1] DeepFM: A Factorization-Machine based Neural Network for CTR Prediction, + Huifeng Guo, Ruiming Tang, Yunming Yey, Zhenguo Li, Xiuqiang He +[2] Tensorflow implementation of DeepFM for CTR prediction + https://github.com/ChenglongChen/tensorflow-DeepFM +[3] DeepCTR implementation of DeepFM for CTR prediction + https://github.com/shenweichen/DeepCTR +""" + +import logging + +import tensorflow as tf + +from submarine.ml.tensorflow_v2.layers.core import ( + dnn_layer, + embedding_layer, + fm_layer, + linear_layer, +) +from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel +from submarine.utils.tf_utils_v2 import get_estimator_spec + +logger = logging.getLogger(__name__) + + +class DeepFM(BaseTFModel): + def model_fn(self, features, labels, mode, params): + super().model_fn(features, labels, mode, params) + + linear_logit = linear_layer(features, **params["training"]) + + embedding_outputs = embedding_layer(features, **params["training"]) + fm_logit = fm_layer(embedding_outputs, **params["training"]) + + field_size = params["training"]["field_size"] + embedding_size = params["training"]["embedding_size"] + deep_inputs = tf.reshape(embedding_outputs, shape=[-1, field_size * embedding_size]) + deep_logit = dnn_layer(deep_inputs, mode, **params["training"]) + + with tf.compat.v1.variable_scope("DeepFM_out"): + logit = linear_logit + fm_logit + deep_logit + + return get_estimator_spec(logit, labels, mode, params) diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py new file mode 100644 index 0000000..e94a15e --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +TensorFlow implementation of FM + +Reference: +[1] Factorization machines for CTR Prediction, + Steffen Rendle +""" + +import logging + +import tensorflow as tf + +from submarine.ml.tensorflow_v2.layers.core import embedding_layer, fm_layer, linear_layer +from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel +from submarine.utils.tf_utils_v2 import get_estimator_spec + +logger = logging.getLogger(__name__) + + +class FM(BaseTFModel): + def model_fn(self, features, labels, mode, params): + super().model_fn(features, labels, mode, params) + + linear_logit = linear_layer(features, **params["training"]) + embedding_outputs = embedding_layer(features, **params["training"]) + fm_logit = fm_layer(embedding_outputs, **params["training"]) + + with tf.compat.v1.variable_scope("FM_out"): + logit = linear_logit + fm_logit + + return get_estimator_spec(logit, labels, mode, params) diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py new file mode 100644 index 0000000..19f0a58 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +TensorFlow implementation of NFM + +Reference: + [1] He X, Chua T S. Neural factorization machines for sparse predictive + analytics[C]//Proceedings of the 40th International ACM SIGIR conference on Research and + Development in Information Retrieval. ACM, 2017: 355-364. (https://arxiv.org/abs/1708.05027) +""" + +import logging + +import tensorflow as tf + +from submarine.ml.tensorflow_v2.layers.core import ( + bilinear_layer, + dnn_layer, + embedding_layer, + linear_layer, +) +from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel +from submarine.utils.tf_utils_v2 import get_estimator_spec + +logger = logging.getLogger(__name__) + + +class NFM(BaseTFModel): + def model_fn(self, features, labels, mode, params): + super().model_fn(features, labels, mode, params) + + linear_logit = linear_layer(features, **params["training"]) + embedding_outputs = embedding_layer(features, **params["training"]) + deep_inputs = bilinear_layer(embedding_outputs, **params["training"]) + deep_logit = dnn_layer(deep_inputs, mode, **params["training"]) + + with tf.compat.v1.variable_scope("NFM_out"): + logit = linear_logit + deep_logit + + return get_estimator_spec(logit, labels, mode, params) diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py new file mode 100644 index 0000000..bdba587 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import tensorflow as tf + +logger = logging.getLogger(__name__) + + +class OptimizerKey(object): + """Optimizer key strings.""" + + ADAM = "adam" + ADAGRAD = "adagrad" + MOMENTUM = "momentum" + FTRL = "ftrl" + + +def get_optimizer(optimizer_key, learning_rate): + optimizer_key = optimizer_key.lower() + + if optimizer_key == OptimizerKey.ADAM: + op = tf.compat.v1.train.AdamOptimizer( + learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8 + ) + elif optimizer_key == OptimizerKey.ADAGRAD: + op = tf.compat.v1.train.AdagradOptimizer( + learning_rate=learning_rate, initial_accumulator_value=1e-8 + ) + elif optimizer_key == OptimizerKey.MOMENTUM: + op = tf.compat.v1.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.95) + elif optimizer_key == OptimizerKey.FTRL: + op = tf.compat.v1.train.FtrlOptimizer(learning_rate) + else: + raise ValueError("Invalid optimizer_key :", optimizer_key) + return op diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py similarity index 50% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py index 55f8e37..261f62b 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py @@ -13,13 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import DeepFM - - -def test_run_deepfm(get_model_param): - params = get_model_param - - model = DeepFM(model_params=params) - model.train() - model.evaluate() - model.predict() +default_parameters = { + "output": {"save_model_dir": "./experiment", "metric": "auc"}, + "training": { + "batch_size": 512, + "field_size": 39, + "num_epochs": 3, + "feature_size": 117581, + "embedding_size": 256, + "learning_rate": 0.0005, + "batch_norm_decay": 0.9, + "l2_reg": 0.0001, + "deep_layers": [400, 400, 400], + "conv_kernel_width": [6, 5], + "conv_filters": [4, 4], + "dropout": [0.3, 0.3, 0.3], + "batch_norm": "false", + "optimizer": "adam", + "log_steps": 10, + "num_threads": 4, + "num_gpu": 0, + "seed": 77, + "mode": "local", + }, + "resource": {"num_cpu": 4, "num_gpu": 0, "num_thread": 0}, # tf determines automatically +} diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py similarity index 79% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py index bedda94..2dca91f 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py @@ -13,13 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import FM +from submarine.ml.tensorflow_v2.input import libsvm_input_fn +LIBSVM = "libsvm" -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() +input_fn_registry = {LIBSVM: libsvm_input_fn} diff --git a/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py b/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py new file mode 100644 index 0000000..3c776ef --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import tensorflow as tf + +from submarine.ml.tensorflow_v2.optimizer import get_optimizer + + +def _get_session_config_from_env_var(params): + """Returns a tf.ConfigProto instance with appropriate device_filters set.""" + tf.compat.v1.disable_v2_behavior() + tf_config = json.loads(os.environ.get("TF_CONFIG", "{}")) + + if ( + tf_config + and "task" in tf_config + and "type" in tf_config["task"] + and "index" in tf_config["task"] + ): + # Master should only communicate with itself and ps. + if tf_config["task"]["type"] == "master": + return tf.compat.v1.ConfigProto( + device_filters=["/job:ps", "/job:master"], + intra_op_parallelism_threads=params["resource"]["num_thread"], + inter_op_parallelism_threads=params["resource"]["num_thread"], + ) + # Worker should only communicate with itself and ps. + elif tf_config["task"]["type"] == "worker": + return tf.compat.v1.ConfigProto( # gpu_options=gpu_options, + device_filters=["/job:ps", "/job:worker/task:%d" % tf_config["task"]["index"]], + intra_op_parallelism_threads=params["resource"]["num_thread"], + inter_op_parallelism_threads=params["resource"]["num_thread"], + ) + return None + + +def get_tf_config(params): + """ + Get TF_CONFIG to run local or distributed training, If user don't set TF_CONFIG environment + variables, by default set local mode + :param params: model parameters that contain total number of gpu or cpu the model + intends to use + :type params: Dictionary + :return: The class specifies the configurations for an Estimator run + """ + if params["training"]["mode"] == "local": # local mode + tf.compat.v1.disable_v2_behavior() + tf_config = tf.estimator.RunConfig().replace( + session_config=tf.compat.v1.ConfigProto( + device_count={ + "GPU": params["resource"]["num_gpu"], + "CPU": params["resource"]["num_cpu"], + }, + intra_op_parallelism_threads=params["resource"]["num_thread"], + inter_op_parallelism_threads=params["resource"]["num_thread"], + ), + log_step_count_steps=params["training"]["log_steps"], + save_summary_steps=params["training"]["log_steps"], + ) + + elif params["training"]["mode"] == "distributed": + pass + # need to be rewrite via tf2 + # ParameterServerStrategy = tf.distribute.experimental.ParameterServerStrategy() + # tf_config = tf.estimator.RunConfig( + # train_distribute=ParameterServerStrategy, + # eval_distribute=ParameterServerStrategy, + # session_config=_get_session_config_from_env_var(params), + # save_summary_steps=params["training"]["log_steps"], + # log_step_count_steps=params["training"]["log_steps"], + # ) + else: + raise ValueError("mode should be local or distributed") + return tf_config + + +def get_estimator_spec(logit, labels, mode, params): + """ + Returns `EstimatorSpec` that a model_fn can return. + :param logit: logits `Tensor` to be used. + :param labels: Labels `Tensor`, or `dict` of same. + :param mode: Estimator's `ModeKeys`. + :param params: Optional dict of hyperparameters. Will receive what is passed to Estimator + in params parameter. + :return: + """ + learning_rate = params["training"]["learning_rate"] + optimizer = params["training"]["optimizer"] + metric = params["output"]["metric"] + + output = tf.sigmoid(logit) + predictions = {"probabilities": output} + export_outputs = { + # https://github.com/psf/black/issues/2434 + # fmt: off + tf.saved_model.DEFAULT_SERVING_SIGNATURE_DEF_KEY: + tf.estimator.export.PredictOutput(predictions) + # fmt: on + } + # Provide an estimator spec for `ModeKeys.PREDICT` + if mode == tf.estimator.ModeKeys.PREDICT: + return tf.estimator.EstimatorSpec( + mode=mode, predictions=predictions, export_outputs=export_outputs + ) + + with tf.compat.v1.name_scope("Loss"): + loss = tf.reduce_mean( + input_tensor=tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, labels=labels) + ) + + # Provide an estimator spec for `ModeKeys.EVAL` + eval_metric_ops = {} + if metric == "auc": + eval_metric_ops["auc"] = tf.compat.v1.metrics.auc(labels, output) + else: + raise TypeError("Invalid metric :", metric) + + if mode == tf.estimator.ModeKeys.EVAL: + return tf.estimator.EstimatorSpec( + mode=mode, predictions=predictions, loss=loss, eval_metric_ops=eval_metric_ops + ) + + with tf.compat.v1.name_scope("Train"): + op = get_optimizer(optimizer, learning_rate) + train_op = op.minimize(loss, global_step=tf.compat.v1.train.get_global_step()) + + # Provide an estimator spec for `ModeKeys.TRAIN` modes + if mode == tf.estimator.ModeKeys.TRAIN: + return tf.estimator.EstimatorSpec( + mode=mode, predictions=predictions, loss=loss, train_op=train_op + ) diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py similarity index 78% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py index bedda94..a6eb1b5 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py @@ -12,14 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from submarine.ml.tensorflow.model import FM - - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py similarity index 78% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py index bedda94..a6eb1b5 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py @@ -12,14 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from submarine.ml.tensorflow.model import FM - - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py index 9c31908..8bf7915 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf from submarine.ml.tensorflow.model.base_tf_model import BaseTFModel +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_create_base_tf_model(): params = {"learning rate": 0.05} with pytest.raises(AssertionError, match="Does not define any input parameters"): diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py index 536c049..bc0fa87 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import tensorflow as tf + from submarine.ml.tensorflow.model import CCPM +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_run_ccpm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py index 55f8e37..2a6bd8d 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import tensorflow as tf + from submarine.ml.tensorflow.model import DeepFM +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_run_deepfm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py index bedda94..fb5abb8 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import tensorflow as tf + from submarine.ml.tensorflow.model import FM +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_run_fm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py index dab76b5..5000810 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import tensorflow as tf + from submarine.ml.tensorflow.model import NFM +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_run_nfm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py index 831d427..776ed49 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf from submarine.ml.tensorflow.optimizer import get_optimizer +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_get_optimizer(): optimizer_keys = ["adam", "adagrad", "momentum", "ftrl"] invalid_optimizer_keys = ["adddam"] diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py similarity index 78% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py index bedda94..a6eb1b5 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py @@ -12,14 +12,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from submarine.ml.tensorflow.model import FM - - -def test_run_fm(get_model_param): - params = get_model_param - - model = FM(model_params=params) - model.train() - model.evaluate() - model.predict() diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py new file mode 100644 index 0000000..132bdea --- /dev/null +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest + +LIBSVM_DATA = """1 1:0 2:0.051495 3:0.5 4:0.1 5:0.113437 6:0.874 7:0.01 8:0.08 9:0.028 10:0 +1 1:1.35 2:0.031561 3:0.45 4:0.56 5:0.000031 6:0.056 7:0.27 8:0.58 9:0.056 10:0.166667 +1 1:0.05 2:0.004983 3:0.19 4:0.14 5:0.000016 6:0.006 7:0.01 8:0.14 9:0.014 10:0.166667 +1 1:0.2 2:0.004983 3:0 4:0.12 5:0.016422 6:0.268 7:0.04 8:0.7 9:0.144 10:0.166667 +1 1:0 2:0.051495 3:0.5 4:0.1 5:0.113437 6:0.874 7:0.01 8:0.08 9:0.028 10:0 +1 1:1.35 2:0.031561 3:0.45 4:0.56 5:0.000031 6:0.056 7:0.27 8:0.58 9:0.056 10:0.166667 +1 1:0.05 2:0.004983 3:0.19 4:0.14 5:0.000016 6:0.006 7:0.01 8:0.14 9:0.014 10:0.166667 +1 1:0.2 2:0.004983 3:0 4:0.12 5:0.016422 6:0.268 7:0.04 8:0.7 9:0.144 10:0.166667 +""" + + +@pytest.fixture +def get_model_param(tmpdir): + data_file = os.path.join(str(tmpdir), "libsvm.txt") + save_model_dir = os.path.join(str(tmpdir), "experiment") + with open(data_file, "wt") as writer: + writer.write(LIBSVM_DATA) + + params = { + "input": { + "train_data": data_file, + "valid_data": data_file, + "test_data": data_file, + "type": "libsvm", + }, + "output": {"save_model_dir": save_model_dir, "metric": "auc"}, + "training": {"batch_size": 256, "num_epochs": 1, "field_size": 10, "feature_size": 1000}, + } + + yield params + os.remove(data_file) diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py similarity index 88% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py index 9c31908..e706abb 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf -from submarine.ml.tensorflow.model.base_tf_model import BaseTFModel +from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_create_base_tf_model(): params = {"learning rate": 0.05} with pytest.raises(AssertionError, match="Does not define any input parameters"): diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py similarity index 85% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py index 536c049..858b6bb 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import CCPM +import pytest +import tensorflow as tf +from submarine.ml.tensorflow_v2.model import CCPM + +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_run_ccpm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py similarity index 85% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py index 55f8e37..c6f2d4d 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import DeepFM +import pytest +import tensorflow as tf +from submarine.ml.tensorflow_v2.model import DeepFM + +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_run_deepfm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py similarity index 85% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py index bedda94..1d95304 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import FM +import pytest +import tensorflow as tf +from submarine.ml.tensorflow_v2.model import FM + +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_run_fm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py similarity index 85% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py index dab76b5..9de05f3 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.tensorflow.model import NFM +import pytest +import tensorflow as tf +from submarine.ml.tensorflow_v2.model import NFM + +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_run_nfm(get_model_param): params = get_model_param diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py similarity index 88% copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py index 831d427..1ef8fa1 100644 --- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py +++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf -from submarine.ml.tensorflow.optimizer import get_optimizer +from submarine.ml.tensorflow_v2.optimizer import get_optimizer +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_get_optimizer(): optimizer_keys = ["adam", "adagrad", "momentum", "ftrl"] invalid_optimizer_keys = ["adddam"] diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py index a03d434..776ac41 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf from submarine.utils.tf_utils import get_tf_config +@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1") def test_get_tf_config(): params = {"training": {"mode": "test"}} with pytest.raises(ValueError, match="mode should be local or distributed"): diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py similarity index 76% copy from submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py copy to submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py index a03d434..53add06 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py @@ -14,10 +14,12 @@ # limitations under the License. import pytest +import tensorflow as tf -from submarine.utils.tf_utils import get_tf_config +from submarine.utils.tf_utils_v2 import get_tf_config +@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2") def test_get_tf_config(): params = {"training": {"mode": "test"}} with pytest.raises(ValueError, match="mode should be local or distributed"): @@ -33,10 +35,11 @@ def test_get_tf_config(): get_tf_config(params) # conf for distributed training - params.update( - { - "training": {"mode": "distributed", "log_steps": 10}, - "resource": {"num_cpu": 4, "num_thread": 4, "num_gpu": 2}, - } - ) - get_tf_config(params) + # skip temporary + # params.update( + # { + # "training": {"mode": "distributed", "log_steps": 10}, + # "resource": {"num_cpu": 4, "num_thread": 4, "num_gpu": 2}, + # } + # ) + # get_tf_config(params) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org For additional commands, e-mail: dev-h...@submarine.apache.org