This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new dee3a83  SUBMARINE-1006. Upgrade tensorflow to version 2.6
dee3a83 is described below

commit dee3a837b13c030d35bddf14b5293d21da064e66
Author: featherchen <garychen0975321...@gmail.com>
AuthorDate: Tue Oct 12 12:16:58 2021 +0800

    SUBMARINE-1006. Upgrade tensorflow to version 2.6
    
    ### What is this PR for?
    <!-- A few sentences describing the overall goals of the pull request's 
commits.
    First time? Check out the contributing guide - 
https://submarine.apache.org/contribution/contributions.html
    -->
    Upgrade tensorflow to 2.6 and update the test.
    In this PR , I used tf.compat.v1 to support tensorflow2.0, and ignored 
distributive training(since the config is a little different to tf1). This is a 
temporary step, and I will use primitive tf2 functions to rewrite it and solve 
the distributive training problem in the future.
    ### What type of PR is it?
    Feature
    
    ### Todos
    
    ### What is the Jira issue?
    <!-- * Open an issue on Jira 
https://issues.apache.org/jira/browse/SUBMARINE/
    * Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. 
`SUBMARINE-23. PR title`
    -->
    
https://issues.apache.org/jira/projects/SUBMARINE/issues/SUBMARINE-1006?filter=myopenissues
    ### How should this be tested?
    <!--
    * First time? Setup Travis CI as described on 
https://submarine.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed 
behavior
    * Outline any manual steps to test the PR here.
    -->
    Use the tests in pysubmarine/tests/ml/tensorflow_v2
    ### Screenshots (if appropriate)
    
![image](https://user-images.githubusercontent.com/57944334/136891858-6cd3b009-ab6f-4b11-894a-2afa630b7410.png)
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: featherchen <garychen0975321...@gmail.com>
    
    Signed-off-by: Kevin <pings...@apache.org>
    
    Closes #776 from featherchen/SUBMARINE-1006 and squashes the following 
commits:
    
    da56496e [featherchen] SUBMARINE-1006. Add tf2 sdk
    cd8d37ea [featherchen] SUBMARINE-1006. format and workflow
    e7a48437 [featherchen] SUBMARINE-1006. add tf_v2 and corresponding tests
---
 .github/workflows/python.yml                       |   7 +-
 submarine-sdk/pysubmarine/setup.py                 |   2 +-
 .../ml/tensorflow_v2/__init__.py}                  |  11 -
 .../ml/tensorflow_v2/input/__init__.py}            |  11 +-
 .../submarine/ml/tensorflow_v2/input/input.py      |  58 +++++
 .../ml/tensorflow_v2/layers/__init__.py}           |  11 -
 .../submarine/ml/tensorflow_v2/layers/core.py      | 248 +++++++++++++++++++++
 .../ml/tensorflow_v2/model/__init__.py}            |  14 +-
 .../ml/tensorflow_v2/model/base_tf_model.py        | 106 +++++++++
 .../submarine/ml/tensorflow_v2/model/ccpm.py       |  73 ++++++
 .../submarine/ml/tensorflow_v2/model/deepfm.py     |  60 +++++
 .../submarine/ml/tensorflow_v2/model/fm.py         |  45 ++++
 .../submarine/ml/tensorflow_v2/model/nfm.py        |  52 +++++
 .../submarine/ml/tensorflow_v2/optimizer.py        |  49 ++++
 .../ml/tensorflow_v2/parameters.py}                |  35 ++-
 .../ml/tensorflow_v2/registries.py}                |  11 +-
 .../pysubmarine/submarine/utils/tf_utils_v2.py     | 146 ++++++++++++
 .../tensorflow/{model/test_fm.py => __init__.py}   |  11 -
 .../tensorflow/model/{test_fm.py => __init__.py}   |  11 -
 .../ml/tensorflow/model/test_base_tf_model.py      |   2 +
 .../tests/ml/tensorflow/model/test_ccpm.py         |   4 +
 .../tests/ml/tensorflow/model/test_deepfm.py       |   4 +
 .../tests/ml/tensorflow/model/test_fm.py           |   4 +
 .../tests/ml/tensorflow/model/test_nfm.py          |   4 +
 .../tests/ml/tensorflow/test_optimizer.py          |   2 +
 .../model/test_fm.py => tensorflow_v2/__init__.py} |  11 -
 .../tests/ml/tensorflow_v2/model/conftest.py       |  50 +++++
 .../model/test_base_tf_model.py                    |   4 +-
 .../model/test_ccpm.py                             |   6 +-
 .../model/test_deepfm.py                           |   6 +-
 .../{tensorflow => tensorflow_v2}/model/test_fm.py |   6 +-
 .../model/test_nfm.py                              |   6 +-
 .../test_optimizer.py                              |   4 +-
 .../pysubmarine/tests/utils/test_tf_utils.py       |   2 +
 .../{test_tf_utils.py => test_tf_utils_v2.py}      |  19 +-
 35 files changed, 988 insertions(+), 107 deletions(-)

diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index a741316..f5b6b64 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -25,7 +25,7 @@ jobs:
     strategy:
       matrix:
         python-version: [3.6, 3.7]
-        tf-version: [1.14.0, 1.15.0]
+        tf-version: [1.14.0, 1.15.0, 2.6.0]
       fail-fast: false
     steps:
       - uses: actions/checkout@v2
@@ -38,6 +38,8 @@ jobs:
           pip install --upgrade pip
           pip install --no-cache-dir tensorflow==${{ matrix.tf-version }}
           pip install --no-cache-dir torch==1.5.0
+          pip install --no-cache-dir tensorflow-addons
+          pip install --no-cache-dir tf_slim
           pip install --no-cache-dir ./submarine-sdk/pysubmarine/.
           pip install -r 
./submarine-sdk/pysubmarine/github-actions/test-requirements.txt
           pip install -r ./dev-support/style-check/python/lint-requirements.txt
@@ -46,6 +48,7 @@ jobs:
         run: ./dev-support/style-check/python/lint.sh
       - name: Run unit test
         run: pytest --cov=submarine -vs -m "not e2e"
+
   integration:
     runs-on: ubuntu-latest
     timeout-minutes: 60
@@ -93,6 +96,8 @@ jobs:
         run: |
           pip install --upgrade pip
           pip install --no-cache-dir -e 
./submarine-sdk/pysubmarine/.[tf,pytorch]
+          pip install --no-cache-dir tensorflow-addons
+          pip install --no-cache-dir tf_slim
           pip install -r 
./submarine-sdk/pysubmarine/github-actions/test-requirements.txt
       - name: Run integration test
         working-directory: ./submarine-sdk/pysubmarine
diff --git a/submarine-sdk/pysubmarine/setup.py 
b/submarine-sdk/pysubmarine/setup.py
index 6355434..eb2a687 100644
--- a/submarine-sdk/pysubmarine/setup.py
+++ b/submarine-sdk/pysubmarine/setup.py
@@ -43,7 +43,7 @@ setup(
     ],
     extras_require={
         "tf": ["tensorflow>=1.14.0,<2.0.0"],
-        "tf-latest": ["tensorflow"],
+        "tf2": ["tensorflow==2.6.0", "tf_slim==1.1.0", 
"tensorflow-addons==0.14.0"],
         "pytorch": ["torch>=1.5.0", "torchvision>=0.6.0"],
     },
     classifiers=[
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py
similarity index 78%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py
index bedda94..a6eb1b5 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/__init__.py
@@ -12,14 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from submarine.ml.tensorflow.model import FM
-
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py
similarity index 79%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py
index bedda94..11f1374 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/__init__.py
@@ -13,13 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import FM
+from .input import libsvm_input_fn
 
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
+__all__ = ["libsvm_input_fn"]
diff --git 
a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py
new file mode 100644
index 0000000..0cf98ae
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/input/input.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import tensorflow as tf
+
+logger = logging.getLogger(__name__)
+
+AUTOTUNE = tf.data.experimental.AUTOTUNE
+
+
+def libsvm_input_fn(
+    filepath,
+    batch_size=256,
+    num_epochs=3,  # pylint: disable=W0613
+    perform_shuffle=False,
+    delimiter=" ",
+    **kwargs
+):
+    def _input_fn():
+        def decode_libsvm(line):
+            columns = tf.compat.v1.string_split([line], delimiter)
+            labels = tf.strings.to_number(columns.values[0], 
out_type=tf.float32)
+            splits = tf.compat.v1.string_split(columns.values[1:], ":")
+            id_vals = tf.reshape(splits.values, splits.dense_shape)
+            feat_ids, feat_vals = tf.split(id_vals, num_or_size_splits=2, 
axis=1)
+            feat_ids = tf.strings.to_number(feat_ids, out_type=tf.int32)
+            feat_vals = tf.strings.to_number(feat_vals, out_type=tf.float32)
+            return {"feat_ids": feat_ids, "feat_vals": feat_vals}, labels
+
+        dataset = (
+            tf.data.TextLineDataset(filepath)
+            .map(decode_libsvm, num_parallel_calls=AUTOTUNE)
+            .prefetch(AUTOTUNE)
+        )
+
+        if perform_shuffle:
+            dataset = dataset.shuffle(buffer_size=batch_size)
+
+        dataset = dataset.repeat(num_epochs)
+        dataset = dataset.batch(batch_size)
+
+        return dataset
+
+    return _input_fn
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py
similarity index 78%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py
index bedda94..a6eb1b5 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/__init__.py
@@ -12,14 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from submarine.ml.tensorflow.model import FM
-
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
diff --git 
a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py
new file mode 100644
index 0000000..a75ff71
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/layers/core.py
@@ -0,0 +1,248 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import tensorflow as tf
+import tf_slim as slim
+from tensorflow.keras.layers import Layer
+
+tf.compat.v1.disable_v2_behavior()
+
+
+def batch_norm_layer(x, train_phase, scope_bn, batch_norm_decay):
+    bn_train = slim.batch_norm(
+        x,
+        decay=batch_norm_decay,
+        center=True,
+        scale=True,
+        updates_collections=None,
+        is_training=True,
+        reuse=None,
+        scope=scope_bn,
+    )
+    bn_infer = slim.batch_norm(
+        x,
+        decay=batch_norm_decay,
+        center=True,
+        scale=True,
+        updates_collections=None,
+        is_training=False,
+        reuse=True,
+        scope=scope_bn,
+    )
+    return tf.cond(
+        pred=tf.cast(train_phase, tf.bool), true_fn=lambda: bn_train, 
false_fn=lambda: bn_infer
+    )
+
+
+def dnn_layer(
+    inputs,
+    estimator_mode,
+    batch_norm,
+    deep_layers,
+    dropout,
+    batch_norm_decay=0.9,
+    l2_reg=0,
+    **kwargs
+):
+    """
+    The Multi Layer Perceptron
+    :param inputs: A tensor of at least rank 2 and static value for the last 
dimension; i.e.
+           [batch_size, depth], [None, None, None, channels].
+    :param estimator_mode: Standard names for Estimator model modes. `TRAIN`, 
`EVAL`, `PREDICT`
+    :param batch_norm: Whether use BatchNormalization before activation or not.
+    :param batch_norm_decay: Decay for the moving average.
+           Reasonable values for decay are close to 1.0, typically in the
+           multiple-nines range: 0.999, 0.99, 0.9, etc.
+    :param deep_layers: list of positive integer, the layer number and units 
in each layer.
+    :param dropout: float in [0,1). Fraction of the units to dropout.
+    :param l2_reg: float between 0 and 1.
+           L2 regularizer strength applied to the kernel weights matrix.
+    """
+    with tf.compat.v1.variable_scope("DNN_Layer"):
+        if batch_norm:
+
+            if estimator_mode == tf.estimator.ModeKeys.TRAIN:
+                train_phase = True
+            else:
+                train_phase = False
+
+        for i in range(len(deep_layers)):
+            deep_inputs = slim.fully_connected(
+                inputs=inputs,
+                num_outputs=deep_layers[i],
+                weights_regularizer=tf.keras.regularizers.l2(0.5 * (l2_reg)),
+                scope="mlp%d" % i,
+            )
+            if batch_norm:
+                deep_inputs = batch_norm_layer(
+                    deep_inputs,
+                    train_phase=train_phase,
+                    scope_bn="bn_%d" % i,
+                    batch_norm_decay=batch_norm_decay,
+                )
+            if estimator_mode == tf.estimator.ModeKeys.TRAIN:
+                deep_inputs = tf.nn.dropout(deep_inputs, rate=1 - (dropout[i]))
+
+        deep_out = slim.fully_connected(
+            inputs=deep_inputs,
+            num_outputs=1,
+            activation_fn=tf.identity,
+            weights_regularizer=tf.keras.regularizers.l2(0.5 * (l2_reg)),
+            scope="deep_out",
+        )
+        deep_out = tf.reshape(deep_out, shape=[-1])
+    return deep_out
+
+
+def linear_layer(features, feature_size, field_size, l2_reg=0, **kwargs):
+    """
+    Layer which represents linear function.
+    :param features: input features
+    :param feature_size: size of features
+    :param field_size: number of fields in the features
+    :param l2_reg: float between 0 and 1.
+           L2 regularizer strength applied to the kernel weights matrix.
+    """
+    feat_ids = features["feat_ids"]
+    feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
+    feat_vals = features["feat_vals"]
+    feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
+
+    regularizer = tf.keras.regularizers.l2(0.5 * (l2_reg))
+    with tf.compat.v1.variable_scope("LinearLayer_Layer"):
+        linear_bias = tf.compat.v1.get_variable(
+            name="linear_bias", shape=[1], 
initializer=tf.compat.v1.constant_initializer(0.0)
+        )
+        linear_weight = tf.compat.v1.get_variable(
+            name="linear_weight",
+            shape=[feature_size],
+            initializer=tf.compat.v1.glorot_normal_initializer(),
+            regularizer=regularizer,
+        )
+
+        feat_weights = tf.nn.embedding_lookup(params=linear_weight, 
ids=feat_ids)
+        linear_out = (
+            tf.reduce_sum(input_tensor=tf.multiply(feat_weights, feat_vals), 
axis=1) + linear_bias
+        )
+    return linear_out
+
+
+def embedding_layer(features, feature_size, field_size, embedding_size, 
l2_reg=0, **kwargs):
+    """
+    Turns positive integers (indexes) into dense vectors of fixed size.
+    eg. [[4], [20]] -> [[0.25, 0.1], [0.6, -0.2]]
+    :param features: input features
+    :param feature_size: size of features
+    :param field_size: number of fields in the features
+    :param embedding_size: sparse feature embedding_size
+    :param l2_reg: float between 0 and 1.
+           L2 regularizer strength applied to the kernel weights matrix.
+    """
+    feat_ids = features["feat_ids"]
+    feat_ids = tf.reshape(feat_ids, shape=[-1, field_size])
+    feat_vals = features["feat_vals"]
+    feat_vals = tf.reshape(feat_vals, shape=[-1, field_size])
+
+    with tf.compat.v1.variable_scope("Embedding_Layer"):
+        regularizer = tf.keras.regularizers.l2(0.5 * (l2_reg))
+        embedding_dict = tf.compat.v1.get_variable(
+            name="embedding_dict",
+            shape=[feature_size, embedding_size],
+            initializer=tf.compat.v1.glorot_normal_initializer(),
+            regularizer=regularizer,
+        )
+        embeddings = tf.nn.embedding_lookup(params=embedding_dict, 
ids=feat_ids)
+        feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1])
+        embedding_out = tf.multiply(embeddings, feat_vals)
+    return embedding_out
+
+
+def bilinear_layer(inputs, **kwargs):
+    """
+    Bi-Interaction Layer used in Neural FM,compress the pairwise element-wise 
product of features
+    into one single vector.
+    :param inputs: input features
+    """
+
+    with tf.compat.v1.variable_scope("BilinearLayer_Layer"):
+        sum_square = tf.square(tf.reduce_sum(input_tensor=inputs, axis=1))
+        square_sum = tf.reduce_sum(input_tensor=tf.square(inputs), axis=1)
+        bilinear_out = 0.5 * tf.subtract(sum_square, square_sum)
+    return bilinear_out
+
+
+def fm_layer(inputs, **kwargs):
+    """
+    Factorization Machine models pairwise (order-2) feature interactions
+    without linear term and bias.
+    :param inputs: input features
+    """
+    with tf.compat.v1.variable_scope("FM_Layer"):
+        sum_square = tf.square(tf.reduce_sum(input_tensor=inputs, axis=1))
+        square_sum = tf.reduce_sum(input_tensor=tf.square(inputs), axis=1)
+        fm_out = 0.5 * tf.reduce_sum(input_tensor=tf.subtract(sum_square, 
square_sum), axis=1)
+    return fm_out
+
+
+class KMaxPooling(Layer):
+    """K Max pooling that selects the k biggest value along the specific axis.
+    Input shape
+      -  nD tensor with shape: ``(batch_size, ..., input_dim)``.
+    Output shape
+      - nD tensor with shape: ``(batch_size, ..., output_dim)``.
+    Arguments
+      - **k**: positive integer, number of top elements to look for along the 
``axis`` dimension.
+      - **axis**: positive integer, the dimension to look for elements.
+    """
+
+    def __init__(self, k=1, axis=-1, **kwargs):
+
+        self.dims = 1
+        self.k = k
+        self.axis = axis
+        super(KMaxPooling, self).__init__(**kwargs)
+
+    def build(self, input_shape):
+
+        if self.axis < 1 or self.axis > len(input_shape):
+            raise ValueError("axis must be 1~%d,now is %d" % 
(len(input_shape), self.axis))
+
+        if self.k < 1 or self.k > input_shape[self.axis]:
+            raise ValueError("k must be in 1 ~ %d,now k is %d" % 
(input_shape[self.axis], self.k))
+        self.dims = len(input_shape)
+        super(KMaxPooling, self).build(input_shape)
+
+    def call(self, inputs):
+
+        perm = list(range(self.dims))
+        perm[-1], perm[self.axis] = perm[self.axis], perm[-1]
+        shifted_input = tf.transpose(a=inputs, perm=perm)
+
+        top_k = tf.nn.top_k(shifted_input, k=self.k, sorted=True, name=None)[0]
+        output = tf.transpose(a=top_k, perm=perm)
+
+        return output
+
+    def compute_output_shape(self, input_shape):
+        output_shape = list(input_shape)
+        output_shape[self.axis] = self.k
+        return tuple(output_shape)
+
+    def get_config(
+        self,
+    ):
+        config = {"k": self.k, "axis": self.axis}
+        base_config = super(KMaxPooling, self).get_config()
+        return dict(list(base_config.items()) + list(config.items()))
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py
similarity index 79%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py
index bedda94..6651f47 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/__init__.py
@@ -13,13 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import FM
+from .ccpm import CCPM
+from .deepfm import DeepFM
+from .fm import FM
+from .nfm import NFM
 
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
+__all__ = ["DeepFM", "FM", "NFM", "CCPM"]
diff --git 
a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py
new file mode 100644
index 0000000..eeb2cd4
--- /dev/null
+++ 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/base_tf_model.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from abc import ABC
+
+import numpy as np
+import tensorflow as tf
+
+from submarine.ml.abstract_model import AbstractModel
+from submarine.ml.tensorflow_v2.parameters import default_parameters
+from submarine.ml.tensorflow_v2.registries import input_fn_registry
+from submarine.utils.env import get_from_dicts, get_from_json, 
get_from_registry
+from submarine.utils.tf_utils_v2 import get_tf_config
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=W0221
+class BaseTFModel(AbstractModel, ABC):
+    def __init__(self, model_params=None, json_path=None):
+        super().__init__()
+        self.model_params = get_from_dicts(model_params, default_parameters)
+        self.model_params = get_from_json(json_path, self.model_params)
+        self._sanity_checks()
+        logging.info("Model parameters : %s", self.model_params)
+        self.input_type = self.model_params["input"]["type"]
+        self.model_dir = self.model_params["output"]["save_model_dir"]
+        self.config = get_tf_config(self.model_params)
+        self.model = tf.estimator.Estimator(
+            model_fn=self.model_fn,
+            model_dir=self.model_dir,
+            params=self.model_params,
+            config=self.config,
+        )
+
+    def train(self, train_input_fn=None, eval_input_fn=None, **kwargs):
+        """
+        Trains a pre-defined tensorflow estimator model with given training 
data
+        :param train_input_fn: A function that provides input data for 
training.
+        :param eval_input_fn: A function that provides input data for 
evaluating.
+        :return: None
+        """
+        if train_input_fn is None:
+            train_input_fn = get_from_registry(self.input_type, 
input_fn_registry)(
+                filepath=self.model_params["input"]["train_data"], 
**self.model_params["training"]
+            )
+        if eval_input_fn is None:
+            eval_input_fn = get_from_registry(self.input_type, 
input_fn_registry)(
+                filepath=self.model_params["input"]["valid_data"], 
**self.model_params["training"]
+            )
+
+        train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn)
+        eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
+        tf.estimator.train_and_evaluate(self.model, train_spec, eval_spec, 
**kwargs)
+
+    def evaluate(self, eval_input_fn=None, **kwargs):
+        """
+        Evaluates a pre-defined Tensorflow estimator model with given evaluate 
data
+        :param eval_input_fn: A function that provides input data for 
evaluating.
+        :return: A dict containing the evaluation metrics specified in 
`eval_input_fn` keyed by
+        name, as well as an entry `global_step` which contains the value of the
+        global step for which this evaluation was performed
+        """
+        if eval_input_fn is None:
+            eval_input_fn = get_from_registry(self.input_type, 
input_fn_registry)(
+                filepath=self.model_params["input"]["valid_data"], 
**self.model_params["training"]
+            )
+
+        return self.model.evaluate(input_fn=eval_input_fn, **kwargs)
+
+    def predict(self, predict_input_fn=None, **kwargs):
+        """
+        Yields predictions with given features.
+        :param predict_input_fn: A function that constructs the features.
+         Prediction continues until input_fn raises an end-of-input exception
+        :return: Evaluated values of predictions tensors.
+        """
+        if predict_input_fn is None:
+            predict_input_fn = get_from_registry(self.input_type, 
input_fn_registry)(
+                filepath=self.model_params["input"]["test_data"], 
**self.model_params["training"]
+            )
+
+        return self.model.predict(input_fn=predict_input_fn, **kwargs)
+
+    def _sanity_checks(self):
+        assert "input" in self.model_params, "Does not define any input 
parameters"
+        assert "type" in self.model_params["input"], "Does not define any 
input type"
+        assert "output" in self.model_params, "Does not define any output 
parameters"
+
+    def model_fn(self, features, labels, mode, params):
+        seed = params["training"]["seed"]
+        np.random.seed(seed)
+        tf.compat.v1.set_random_seed(seed)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py
new file mode 100644
index 0000000..5657294
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/ccpm.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import tensorflow as tf
+
+from submarine.ml.tensorflow_v2.layers.core import (
+    KMaxPooling,
+    dnn_layer,
+    embedding_layer,
+    linear_layer,
+)
+from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel
+from submarine.utils.tf_utils_v2 import get_estimator_spec
+
+logger = logging.getLogger(__name__)
+
+
+class CCPM(BaseTFModel):
+    def model_fn(self, features, labels, mode, params):
+        super().model_fn(features, labels, mode, params)
+
+        if len(params["training"]["conv_kernel_width"]) != 
len(params["training"]["conv_filters"]):
+            raise ValueError("conv_kernel_width must have same element with 
conv_filters")
+
+        linear_logit = linear_layer(features, **params["training"])
+        embedding_outputs = embedding_layer(features, **params["training"])
+        conv_filters = params["training"]["conv_filters"]
+        conv_kernel_width = params["training"]["conv_kernel_width"]
+
+        n = params["training"]["embedding_size"]
+        conv_filters_len = len(conv_filters)
+        conv_input = tf.concat(embedding_outputs, axis=1)
+
+        pooling_result = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, 
axis=3))(conv_input)
+
+        for i in range(1, conv_filters_len + 1):
+            filters = conv_filters[i - 1]
+            width = conv_kernel_width[i - 1]
+            p = pow(i / conv_filters_len, conv_filters_len - i)
+            k = max(1, int((1 - p) * n)) if i < conv_filters_len else 3
+
+            conv_result = tf.keras.layers.Conv2D(
+                filters=filters,
+                kernel_size=(width, 1),
+                strides=(1, 1),
+                padding="same",
+                activation="tanh",
+                use_bias=True,
+            )(pooling_result)
+
+            pooling_result = KMaxPooling(k=min(k, int(conv_result.shape[1])), 
axis=1)(conv_result)
+
+        flatten_result = tf.keras.layers.Flatten()(pooling_result)
+        deep_logit = dnn_layer(flatten_result, mode, **params["training"])
+
+        with tf.compat.v1.variable_scope("CCPM_out"):
+            logit = linear_logit + deep_logit
+
+        return get_estimator_spec(logit, labels, mode, params)
diff --git 
a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py
new file mode 100644
index 0000000..397ac4f
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/deepfm.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Tensorflow implementation of DeepFM
+
+Reference:
+[1] DeepFM: A Factorization-Machine based Neural Network for CTR Prediction,
+    Huifeng Guo, Ruiming Tang, Yunming Yey, Zhenguo Li, Xiuqiang He
+[2] Tensorflow implementation of DeepFM for CTR prediction
+    https://github.com/ChenglongChen/tensorflow-DeepFM
+[3] DeepCTR implementation of DeepFM for CTR prediction
+    https://github.com/shenweichen/DeepCTR
+"""
+
+import logging
+
+import tensorflow as tf
+
+from submarine.ml.tensorflow_v2.layers.core import (
+    dnn_layer,
+    embedding_layer,
+    fm_layer,
+    linear_layer,
+)
+from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel
+from submarine.utils.tf_utils_v2 import get_estimator_spec
+
+logger = logging.getLogger(__name__)
+
+
+class DeepFM(BaseTFModel):
+    def model_fn(self, features, labels, mode, params):
+        super().model_fn(features, labels, mode, params)
+
+        linear_logit = linear_layer(features, **params["training"])
+
+        embedding_outputs = embedding_layer(features, **params["training"])
+        fm_logit = fm_layer(embedding_outputs, **params["training"])
+
+        field_size = params["training"]["field_size"]
+        embedding_size = params["training"]["embedding_size"]
+        deep_inputs = tf.reshape(embedding_outputs, shape=[-1, field_size * 
embedding_size])
+        deep_logit = dnn_layer(deep_inputs, mode, **params["training"])
+
+        with tf.compat.v1.variable_scope("DeepFM_out"):
+            logit = linear_logit + fm_logit + deep_logit
+
+        return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py
new file mode 100644
index 0000000..e94a15e
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/fm.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+TensorFlow implementation of FM
+
+Reference:
+[1] Factorization machines for CTR Prediction,
+    Steffen Rendle
+"""
+
+import logging
+
+import tensorflow as tf
+
+from submarine.ml.tensorflow_v2.layers.core import embedding_layer, fm_layer, 
linear_layer
+from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel
+from submarine.utils.tf_utils_v2 import get_estimator_spec
+
+logger = logging.getLogger(__name__)
+
+
+class FM(BaseTFModel):
+    def model_fn(self, features, labels, mode, params):
+        super().model_fn(features, labels, mode, params)
+
+        linear_logit = linear_layer(features, **params["training"])
+        embedding_outputs = embedding_layer(features, **params["training"])
+        fm_logit = fm_layer(embedding_outputs, **params["training"])
+
+        with tf.compat.v1.variable_scope("FM_out"):
+            logit = linear_logit + fm_logit
+
+        return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py
new file mode 100644
index 0000000..19f0a58
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/model/nfm.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+TensorFlow implementation of NFM
+
+Reference:
+    [1] He X, Chua T S. Neural factorization machines for sparse predictive
+    analytics[C]//Proceedings of the 40th International ACM SIGIR conference 
on Research and
+    Development in Information Retrieval. ACM, 2017: 355-364. 
(https://arxiv.org/abs/1708.05027)
+"""
+
+import logging
+
+import tensorflow as tf
+
+from submarine.ml.tensorflow_v2.layers.core import (
+    bilinear_layer,
+    dnn_layer,
+    embedding_layer,
+    linear_layer,
+)
+from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel
+from submarine.utils.tf_utils_v2 import get_estimator_spec
+
+logger = logging.getLogger(__name__)
+
+
+class NFM(BaseTFModel):
+    def model_fn(self, features, labels, mode, params):
+        super().model_fn(features, labels, mode, params)
+
+        linear_logit = linear_layer(features, **params["training"])
+        embedding_outputs = embedding_layer(features, **params["training"])
+        deep_inputs = bilinear_layer(embedding_outputs, **params["training"])
+        deep_logit = dnn_layer(deep_inputs, mode, **params["training"])
+
+        with tf.compat.v1.variable_scope("NFM_out"):
+            logit = linear_logit + deep_logit
+
+        return get_estimator_spec(logit, labels, mode, params)
diff --git a/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py
new file mode 100644
index 0000000..bdba587
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/optimizer.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import tensorflow as tf
+
+logger = logging.getLogger(__name__)
+
+
+class OptimizerKey(object):
+    """Optimizer key strings."""
+
+    ADAM = "adam"
+    ADAGRAD = "adagrad"
+    MOMENTUM = "momentum"
+    FTRL = "ftrl"
+
+
+def get_optimizer(optimizer_key, learning_rate):
+    optimizer_key = optimizer_key.lower()
+
+    if optimizer_key == OptimizerKey.ADAM:
+        op = tf.compat.v1.train.AdamOptimizer(
+            learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8
+        )
+    elif optimizer_key == OptimizerKey.ADAGRAD:
+        op = tf.compat.v1.train.AdagradOptimizer(
+            learning_rate=learning_rate, initial_accumulator_value=1e-8
+        )
+    elif optimizer_key == OptimizerKey.MOMENTUM:
+        op = tf.compat.v1.train.MomentumOptimizer(learning_rate=learning_rate, 
momentum=0.95)
+    elif optimizer_key == OptimizerKey.FTRL:
+        op = tf.compat.v1.train.FtrlOptimizer(learning_rate)
+    else:
+        raise ValueError("Invalid optimizer_key :", optimizer_key)
+    return op
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py
similarity index 50%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py
index 55f8e37..261f62b 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/parameters.py
@@ -13,13 +13,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import DeepFM
-
-
-def test_run_deepfm(get_model_param):
-    params = get_model_param
-
-    model = DeepFM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
+default_parameters = {
+    "output": {"save_model_dir": "./experiment", "metric": "auc"},
+    "training": {
+        "batch_size": 512,
+        "field_size": 39,
+        "num_epochs": 3,
+        "feature_size": 117581,
+        "embedding_size": 256,
+        "learning_rate": 0.0005,
+        "batch_norm_decay": 0.9,
+        "l2_reg": 0.0001,
+        "deep_layers": [400, 400, 400],
+        "conv_kernel_width": [6, 5],
+        "conv_filters": [4, 4],
+        "dropout": [0.3, 0.3, 0.3],
+        "batch_norm": "false",
+        "optimizer": "adam",
+        "log_steps": 10,
+        "num_threads": 4,
+        "num_gpu": 0,
+        "seed": 77,
+        "mode": "local",
+    },
+    "resource": {"num_cpu": 4, "num_gpu": 0, "num_thread": 0},  # tf 
determines automatically
+}
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py
similarity index 79%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py
index bedda94..2dca91f 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/submarine/ml/tensorflow_v2/registries.py
@@ -13,13 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import FM
+from submarine.ml.tensorflow_v2.input import libsvm_input_fn
 
+LIBSVM = "libsvm"
 
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
+input_fn_registry = {LIBSVM: libsvm_input_fn}
diff --git a/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py 
b/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py
new file mode 100644
index 0000000..3c776ef
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/utils/tf_utils_v2.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+import tensorflow as tf
+
+from submarine.ml.tensorflow_v2.optimizer import get_optimizer
+
+
+def _get_session_config_from_env_var(params):
+    """Returns a tf.ConfigProto instance with appropriate device_filters 
set."""
+    tf.compat.v1.disable_v2_behavior()
+    tf_config = json.loads(os.environ.get("TF_CONFIG", "{}"))
+
+    if (
+        tf_config
+        and "task" in tf_config
+        and "type" in tf_config["task"]
+        and "index" in tf_config["task"]
+    ):
+        # Master should only communicate with itself and ps.
+        if tf_config["task"]["type"] == "master":
+            return tf.compat.v1.ConfigProto(
+                device_filters=["/job:ps", "/job:master"],
+                intra_op_parallelism_threads=params["resource"]["num_thread"],
+                inter_op_parallelism_threads=params["resource"]["num_thread"],
+            )
+        # Worker should only communicate with itself and ps.
+        elif tf_config["task"]["type"] == "worker":
+            return tf.compat.v1.ConfigProto(  # gpu_options=gpu_options,
+                device_filters=["/job:ps", "/job:worker/task:%d" % 
tf_config["task"]["index"]],
+                intra_op_parallelism_threads=params["resource"]["num_thread"],
+                inter_op_parallelism_threads=params["resource"]["num_thread"],
+            )
+    return None
+
+
+def get_tf_config(params):
+    """
+    Get TF_CONFIG to run local or distributed training, If user don't set 
TF_CONFIG environment
+    variables, by default set local mode
+    :param params: model parameters that contain total number of gpu or cpu 
the model
+    intends to use
+    :type params: Dictionary
+    :return: The class specifies the configurations for an Estimator run
+    """
+    if params["training"]["mode"] == "local":  # local mode
+        tf.compat.v1.disable_v2_behavior()
+        tf_config = tf.estimator.RunConfig().replace(
+            session_config=tf.compat.v1.ConfigProto(
+                device_count={
+                    "GPU": params["resource"]["num_gpu"],
+                    "CPU": params["resource"]["num_cpu"],
+                },
+                intra_op_parallelism_threads=params["resource"]["num_thread"],
+                inter_op_parallelism_threads=params["resource"]["num_thread"],
+            ),
+            log_step_count_steps=params["training"]["log_steps"],
+            save_summary_steps=params["training"]["log_steps"],
+        )
+
+    elif params["training"]["mode"] == "distributed":
+        pass
+        # need to be rewrite via tf2
+        # ParameterServerStrategy = 
tf.distribute.experimental.ParameterServerStrategy()
+        # tf_config = tf.estimator.RunConfig(
+        #     train_distribute=ParameterServerStrategy,
+        #     eval_distribute=ParameterServerStrategy,
+        #     session_config=_get_session_config_from_env_var(params),
+        #     save_summary_steps=params["training"]["log_steps"],
+        #     log_step_count_steps=params["training"]["log_steps"],
+        # )
+    else:
+        raise ValueError("mode should be local or distributed")
+    return tf_config
+
+
+def get_estimator_spec(logit, labels, mode, params):
+    """
+    Returns `EstimatorSpec` that a model_fn can return.
+    :param logit: logits `Tensor` to be used.
+    :param labels: Labels `Tensor`, or `dict` of same.
+    :param mode: Estimator's `ModeKeys`.
+    :param params: Optional dict of hyperparameters. Will receive what is 
passed to Estimator
+     in params parameter.
+    :return:
+    """
+    learning_rate = params["training"]["learning_rate"]
+    optimizer = params["training"]["optimizer"]
+    metric = params["output"]["metric"]
+
+    output = tf.sigmoid(logit)
+    predictions = {"probabilities": output}
+    export_outputs = {
+        # https://github.com/psf/black/issues/2434
+        # fmt: off
+        tf.saved_model.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
+            tf.estimator.export.PredictOutput(predictions)
+        # fmt: on
+    }
+    # Provide an estimator spec for `ModeKeys.PREDICT`
+    if mode == tf.estimator.ModeKeys.PREDICT:
+        return tf.estimator.EstimatorSpec(
+            mode=mode, predictions=predictions, export_outputs=export_outputs
+        )
+
+    with tf.compat.v1.name_scope("Loss"):
+        loss = tf.reduce_mean(
+            input_tensor=tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, 
labels=labels)
+        )
+
+    # Provide an estimator spec for `ModeKeys.EVAL`
+    eval_metric_ops = {}
+    if metric == "auc":
+        eval_metric_ops["auc"] = tf.compat.v1.metrics.auc(labels, output)
+    else:
+        raise TypeError("Invalid metric :", metric)
+
+    if mode == tf.estimator.ModeKeys.EVAL:
+        return tf.estimator.EstimatorSpec(
+            mode=mode, predictions=predictions, loss=loss, 
eval_metric_ops=eval_metric_ops
+        )
+
+    with tf.compat.v1.name_scope("Train"):
+        op = get_optimizer(optimizer, learning_rate)
+        train_op = op.minimize(loss, 
global_step=tf.compat.v1.train.get_global_step())
+
+    # Provide an estimator spec for `ModeKeys.TRAIN` modes
+    if mode == tf.estimator.ModeKeys.TRAIN:
+        return tf.estimator.EstimatorSpec(
+            mode=mode, predictions=predictions, loss=loss, train_op=train_op
+        )
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py
similarity index 78%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py
index bedda94..a6eb1b5 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/__init__.py
@@ -12,14 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from submarine.ml.tensorflow.model import FM
-
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py
similarity index 78%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py
index bedda94..a6eb1b5 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/__init__.py
@@ -12,14 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from submarine.ml.tensorflow.model import FM
-
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
diff --git 
a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py
index 9c31908..8bf7915 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
 from submarine.ml.tensorflow.model.base_tf_model import BaseTFModel
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_create_base_tf_model():
     params = {"learning rate": 0.05}
     with pytest.raises(AssertionError, match="Does not define any input 
parameters"):
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py
index 536c049..bc0fa87 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import pytest
+import tensorflow as tf
+
 from submarine.ml.tensorflow.model import CCPM
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_run_ccpm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
index 55f8e37..2a6bd8d 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import pytest
+import tensorflow as tf
+
 from submarine.ml.tensorflow.model import DeepFM
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_run_deepfm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
index bedda94..fb5abb8 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import pytest
+import tensorflow as tf
+
 from submarine.ml.tensorflow.model import FM
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_run_fm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py
index dab76b5..5000810 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import pytest
+import tensorflow as tf
+
 from submarine.ml.tensorflow.model import NFM
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_run_nfm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py
index 831d427..776ed49 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
 from submarine.ml.tensorflow.optimizer import get_optimizer
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_get_optimizer():
     optimizer_keys = ["adam", "adagrad", "momentum", "ftrl"]
     invalid_optimizer_keys = ["adddam"]
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py
similarity index 78%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py
index bedda94..a6eb1b5 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/__init__.py
@@ -12,14 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from submarine.ml.tensorflow.model import FM
-
-
-def test_run_fm(get_model_param):
-    params = get_model_param
-
-    model = FM(model_params=params)
-    model.train()
-    model.evaluate()
-    model.predict()
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py
new file mode 100644
index 0000000..132bdea
--- /dev/null
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/conftest.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import pytest
+
+LIBSVM_DATA = """1 1:0 2:0.051495 3:0.5 4:0.1 5:0.113437 6:0.874 7:0.01 8:0.08 
9:0.028 10:0
+1 1:1.35 2:0.031561 3:0.45 4:0.56 5:0.000031 6:0.056 7:0.27 8:0.58 9:0.056 
10:0.166667
+1 1:0.05 2:0.004983 3:0.19 4:0.14 5:0.000016 6:0.006 7:0.01 8:0.14 9:0.014 
10:0.166667
+1 1:0.2 2:0.004983 3:0 4:0.12 5:0.016422 6:0.268 7:0.04 8:0.7 9:0.144 
10:0.166667
+1 1:0 2:0.051495 3:0.5 4:0.1 5:0.113437 6:0.874 7:0.01 8:0.08 9:0.028 10:0
+1 1:1.35 2:0.031561 3:0.45 4:0.56 5:0.000031 6:0.056 7:0.27 8:0.58 9:0.056 
10:0.166667
+1 1:0.05 2:0.004983 3:0.19 4:0.14 5:0.000016 6:0.006 7:0.01 8:0.14 9:0.014 
10:0.166667
+1 1:0.2 2:0.004983 3:0 4:0.12 5:0.016422 6:0.268 7:0.04 8:0.7 9:0.144 
10:0.166667
+"""
+
+
+@pytest.fixture
+def get_model_param(tmpdir):
+    data_file = os.path.join(str(tmpdir), "libsvm.txt")
+    save_model_dir = os.path.join(str(tmpdir), "experiment")
+    with open(data_file, "wt") as writer:
+        writer.write(LIBSVM_DATA)
+
+    params = {
+        "input": {
+            "train_data": data_file,
+            "valid_data": data_file,
+            "test_data": data_file,
+            "type": "libsvm",
+        },
+        "output": {"save_model_dir": save_model_dir, "metric": "auc"},
+        "training": {"batch_size": 256, "num_epochs": 1, "field_size": 10, 
"feature_size": 1000},
+    }
+
+    yield params
+    os.remove(data_file)
diff --git 
a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py
similarity index 88%
copy from 
submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py
copy to 
submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py
index 9c31908..e706abb 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_base_tf_model.py
+++ 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_base_tf_model.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
-from submarine.ml.tensorflow.model.base_tf_model import BaseTFModel
+from submarine.ml.tensorflow_v2.model.base_tf_model import BaseTFModel
 
 
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_create_base_tf_model():
     params = {"learning rate": 0.05}
     with pytest.raises(AssertionError, match="Does not define any input 
parameters"):
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py
similarity index 85%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py
index 536c049..858b6bb 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_ccpm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_ccpm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import CCPM
+import pytest
+import tensorflow as tf
 
+from submarine.ml.tensorflow_v2.model import CCPM
 
+
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_run_ccpm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py
similarity index 85%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py
index 55f8e37..c6f2d4d 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_deepfm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_deepfm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import DeepFM
+import pytest
+import tensorflow as tf
 
+from submarine.ml.tensorflow_v2.model import DeepFM
 
+
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_run_deepfm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py
similarity index 85%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py
index bedda94..1d95304 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_fm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_fm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import FM
+import pytest
+import tensorflow as tf
 
+from submarine.ml.tensorflow_v2.model import FM
 
+
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_run_fm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py
similarity index 85%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py
index dab76b5..9de05f3 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/model/test_nfm.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/model/test_nfm.py
@@ -13,9 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from submarine.ml.tensorflow.model import NFM
+import pytest
+import tensorflow as tf
 
+from submarine.ml.tensorflow_v2.model import NFM
 
+
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_run_nfm(get_model_param):
     params = get_model_param
 
diff --git a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py 
b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py
similarity index 88%
copy from submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py
copy to submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py
index 831d427..1ef8fa1 100644
--- a/submarine-sdk/pysubmarine/tests/ml/tensorflow/test_optimizer.py
+++ b/submarine-sdk/pysubmarine/tests/ml/tensorflow_v2/test_optimizer.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
-from submarine.ml.tensorflow.optimizer import get_optimizer
+from submarine.ml.tensorflow_v2.optimizer import get_optimizer
 
 
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_get_optimizer():
     optimizer_keys = ["adam", "adagrad", "momentum", "ftrl"]
     invalid_optimizer_keys = ["adddam"]
diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py 
b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py
index a03d434..776ac41 100644
--- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py
+++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
 from submarine.utils.tf_utils import get_tf_config
 
 
+@pytest.mark.skipif(tf.__version__ >= "2.0.0", reason="requires tf1")
 def test_get_tf_config():
     params = {"training": {"mode": "test"}}
     with pytest.raises(ValueError, match="mode should be local or 
distributed"):
diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py 
b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py
similarity index 76%
copy from submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py
copy to submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py
index a03d434..53add06 100644
--- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py
+++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils_v2.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import pytest
+import tensorflow as tf
 
-from submarine.utils.tf_utils import get_tf_config
+from submarine.utils.tf_utils_v2 import get_tf_config
 
 
+@pytest.mark.skipif(tf.__version__ < "2.0.0", reason="requires tf2")
 def test_get_tf_config():
     params = {"training": {"mode": "test"}}
     with pytest.raises(ValueError, match="mode should be local or 
distributed"):
@@ -33,10 +35,11 @@ def test_get_tf_config():
     get_tf_config(params)
 
     # conf for distributed training
-    params.update(
-        {
-            "training": {"mode": "distributed", "log_steps": 10},
-            "resource": {"num_cpu": 4, "num_thread": 4, "num_gpu": 2},
-        }
-    )
-    get_tf_config(params)
+    # skip temporary
+    # params.update(
+    #     {
+    #         "training": {"mode": "distributed", "log_steps": 10},
+    #         "resource": {"num_cpu": 4, "num_thread": 4, "num_gpu": 2},
+    #     }
+    # )
+    # get_tf_config(params)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org
For additional commands, e-mail: dev-h...@submarine.apache.org

Reply via email to