IGNITE-8795: Add ability to start and maintain TensorFlow cluster
on top of Apache Ignite

this closes #4214


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ee876a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ee876a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ee876a6

Branch: refs/heads/ignite-8446
Commit: 5ee876a6635b3c48c86c6a51bb0dc9f98ee55d63
Parents: 909b644
Author: Anton Dmitriev <dmitrievanth...@gmail.com>
Authored: Thu Jun 21 14:55:06 2018 +0300
Committer: Yury Babak <yba...@gridgain.com>
Committed: Thu Jun 21 14:55:06 2018 +0300

----------------------------------------------------------------------
 examples/pom.xml                                |  21 ++
 .../cluster/TensorFlowClusterExample.java       | 118 +++++++++++
 modules/tensorflow/licences/apache-2.0.txt      | 202 ------------------
 modules/tensorflow/licenses/apache-2.0.txt      | 202 ++++++++++++++++++
 modules/tensorflow/pom.xml                      |  30 ++-
 .../tensorflow/cluster/TensorFlowCluster.java   |  62 ++++++
 .../cluster/TensorFlowClusterGateway.java       |  68 ++++++
 .../TensorFlowClusterGatewayManager.java        |  92 ++++++++
 .../cluster/TensorFlowClusterMaintainer.java    | 138 ++++++++++++
 .../cluster/TensorFlowClusterManager.java       | 211 +++++++++++++++++++
 .../ignite/tensorflow/cluster/package-info.java |  33 +++
 .../cluster/spec/TensorFlowClusterSpec.java     |  59 ++++++
 .../spec/TensorFlowServerAddressSpec.java       |  59 ++++++
 .../tensorflow/cluster/spec/package-info.java   |  22 ++
 .../cluster/tfrunning/TensorFlowServer.java     | 100 +++++++++
 .../tfrunning/TensorFlowServerManager.java      | 167 +++++++++++++++
 .../cluster/tfrunning/package-info.java         |  23 ++
 .../cluster/util/ClusterPortManager.java        | 161 ++++++++++++++
 .../cluster/util/TensorFlowClusterResolver.java | 101 +++++++++
 .../tensorflow/cluster/util/package-info.java   |  22 ++
 .../ignite/tensorflow/core/ProcessManager.java  |  63 ++++++
 .../tensorflow/core/ProcessManagerWrapper.java  |  83 ++++++++
 .../core/longrunning/LongRunningProcess.java    |  61 ++++++
 .../longrunning/LongRunningProcessManager.java  | 168 +++++++++++++++
 .../core/longrunning/package-info.java          |  25 +++
 .../task/LongRunningProcessClearTask.java       |  98 +++++++++
 .../task/LongRunningProcessPingTask.java        |  85 ++++++++
 .../task/LongRunningProcessStartTask.java       | 114 ++++++++++
 .../task/LongRunningProcessStopTask.java        | 102 +++++++++
 .../task/LongRunningProcessTask.java            |  53 +++++
 .../core/longrunning/task/package-info.java     |  23 ++
 .../task/util/LongRunningProcessState.java      |  27 +++
 .../task/util/LongRunningProcessStatus.java     |  66 ++++++
 .../longrunning/task/util/package-info.java     |  22 ++
 .../core/nativerunning/NativeProcess.java       |  71 +++++++
 .../nativerunning/NativeProcessManager.java     |  59 ++++++
 .../core/nativerunning/package-info.java        |  25 +++
 .../task/NativeProcessStartTask.java            | 116 ++++++++++
 .../core/nativerunning/task/package-info.java   |  22 ++
 .../ignite/tensorflow/core/package-info.java    |  26 +++
 .../core/pythonrunning/PythonProcess.java       |  58 +++++
 .../pythonrunning/PythonProcessManager.java     |  83 ++++++++
 .../core/pythonrunning/package-info.java        |  25 +++
 .../core/util/CustomizableThreadFactory.java    |  54 +++++
 .../tensorflow/core/util/package-info.java      |  22 ++
 .../apache/ignite/tensorflow/package-info.java  |  37 ++++
 .../ignite/tensorflow/TensorFlowTestSuite.java  |  33 +++
 .../ignite/tensorflow/core/CoreTestSuite.java   |  42 ++++
 .../core/ProcessManagerWrapperTest.java         | 108 ++++++++++
 .../LongRunningProcessManagerTest.java          | 169 +++++++++++++++
 .../task/LongRunningProcessClearTaskTest.java   | 144 +++++++++++++
 .../task/LongRunningProcessPingTaskTest.java    | 152 +++++++++++++
 .../task/LongRunningProcessStartTaskTest.java   | 101 +++++++++
 .../task/LongRunningProcessStopTaskTest.java    | 157 ++++++++++++++
 54 files changed, 4182 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index f29ad87..53aeba5 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -124,6 +124,8 @@
         <lgpl.test.folder>src/test/java</lgpl.test.folder>
         <spark.folder>src/main/java</spark.folder>
         <spark.test.folder>src/test/java</spark.test.folder>
+        <tensorflow.folder>src/main/java</tensorflow.folder>
+        <tensorflow.test.folder>src/test/java</tensorflow.test.folder>
     </properties>
 
     <profiles>
@@ -245,6 +247,23 @@
                 </dependency>
             </dependencies>
         </profile>
+
+        <profile>
+            <id>tensorflow</id>
+
+            <properties>
+                <tensorflow.folder>src/main/tensorflow</tensorflow.folder>
+                
<tensorflow.test.folder>src/test/tensorflow</tensorflow.test.folder>
+            </properties>
+
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.ignite</groupId>
+                    <artifactId>ignite-tensorflow</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
     </profiles>
 
     <build>
@@ -275,6 +294,7 @@
                             <sources>
                                 <source>${lgpl.folder}</source>
                                 <source>${spark.folder}</source>
+                                <source>${tensorflow.folder}</source>
                             </sources>
                         </configuration>
                     </execution>
@@ -289,6 +309,7 @@
                             <sources>
                                 <source>${lgpl.test.folder}</source>
                                 <source>${spark.test.folder}</source>
+                                <source>${tensorflow.test.folder}</source>
                             </sources>
                         </configuration>
                     </execution>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java
 
b/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java
new file mode 100644
index 0000000..3a956c9
--- /dev/null
+++ 
b/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServerManager;
+import 
org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
+
+/**
+ * Prerequisites: be aware that to successfully run this example you need to 
have Python and TensorFlow installed on
+ * your machine. To find out how to install Python please take a look this 
page: https://www.python.org/downloads/. To
+ * install TensorFlow see this web site: https://www.tensorflow.org/install/.
+ *
+ * Example that shows how to use {@link TensorFlowClusterGatewayManager} and 
start, maintain and stop TensorFlow
+ * cluster.
+ */
+public class TensorFlowClusterExample {
+    /** Run example. */
+    public static void main(String... args) throws InterruptedException {
+        IgniteConfiguration configuration = new IgniteConfiguration();
+        configuration.setClientMode(false);
+
+        try (Ignite ignite = Ignition.start(configuration)) {
+            System.out.println(">>> TensorFlow cluster example started.");
+
+            CacheConfiguration<Integer, Integer> cacheConfiguration = new 
CacheConfiguration<>();
+            cacheConfiguration.setAffinity(new 
RendezvousAffinityFunction(false, 10));
+            cacheConfiguration.setName("TEST_CACHE");
+
+            IgniteCache<Integer, Integer> cache = 
ignite.getOrCreateCache(cacheConfiguration);
+            for (int i = 0; i < 1000; i++)
+                cache.put(i, i);
+
+            System.out.println(">>> Cache created.");
+
+            TensorFlowClusterGatewayManager mgr = new 
TensorFlowClusterGatewayManager(ignite);
+            TensorFlowClusterGateway gateway = 
mgr.getOrCreateCluster("TEST_CACHE");
+
+            System.out.println(">>> TensorFlow cluster gateway started.");
+
+            CountDownLatch latch = new CountDownLatch(1);
+
+            gateway.subscribe(cluster -> {
+                StringBuilder builder = new StringBuilder();
+                builder.append("------------------- TensorFlow Cluster Service 
Info -------------------").append('\n');
+
+                builder.append("Specification : ").append('\n');
+
+                TensorFlowServerManager srvMgr = new TensorFlowServerManager(
+                    (Supplier<Ignite> & Serializable)() -> ignite
+                );
+
+                String clusterSpec = 
srvMgr.formatClusterSpec(cluster.getSpec());
+                builder.append(clusterSpec).append('\n');
+
+                Map<UUID, List<LongRunningProcessStatus>> statuses = 
srvMgr.ping(cluster.getProcesses());
+
+                builder.append("State : ").append('\n');
+
+                for (UUID nodeId : cluster.getProcesses().keySet()) {
+                    List<UUID> pr = cluster.getProcesses().get(nodeId);
+                    List<LongRunningProcessStatus> st = statuses.get(nodeId);
+
+                    builder.append("Node 
").append(nodeId.toString().substring(0, 8)).append(" -> ").append('\n');
+                    for (int i = 0; i < pr.size(); i++) {
+                        builder.append("\tProcess ")
+                            .append(pr.get(i).toString().substring(0, 8))
+                            .append(" with status ")
+                            .append(st.get(i).getState());
+
+                        if (st.get(i).getException() != null)
+                            builder.append(" 
(").append(st.get(i).getException()).append(")");
+
+                        builder.append('\n');
+                    }
+                }
+
+                
builder.append("-----------------------------------------------------------------------").append('\n');
+
+                System.out.println(builder);
+
+                latch.countDown();
+            });
+
+            latch.await();
+
+            mgr.stopClusterIfExists("TEST_CACHE");
+
+            System.out.println(">>> TensorFlow cluster example completed.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/licences/apache-2.0.txt
----------------------------------------------------------------------
diff --git a/modules/tensorflow/licences/apache-2.0.txt 
b/modules/tensorflow/licences/apache-2.0.txt
deleted file mode 100644
index d645695..0000000
--- a/modules/tensorflow/licences/apache-2.0.txt
+++ /dev/null
@@ -1,202 +0,0 @@
-
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/licenses/apache-2.0.txt
----------------------------------------------------------------------
diff --git a/modules/tensorflow/licenses/apache-2.0.txt 
b/modules/tensorflow/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/tensorflow/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/pom.xml
----------------------------------------------------------------------
diff --git a/modules/tensorflow/pom.xml b/modules/tensorflow/pom.xml
index c865d7e..78ab69a 100644
--- a/modules/tensorflow/pom.xml
+++ b/modules/tensorflow/pom.xml
@@ -37,9 +37,37 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-ml</artifactId>
+            <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java
new file mode 100644
index 0000000..cf60e42
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * TensorFlow Cluster metadata.
+ */
+public class TensorFlowCluster implements Serializable {
+    /** */
+    private static final long serialVersionUID = -6636339457255751011L;
+
+    /** TensorFlow cluster specification. */
+    private final TensorFlowClusterSpec spec;
+
+    /** Process identifiers. */
+    private final Map<UUID, List<UUID>> processes;
+
+    /**
+     * Constructs a new instance of TensorFlow cluster.
+     *
+     * @param spec TensorFlow cluster specification.
+     * @param processes Process identifiers.
+     */
+    public TensorFlowCluster(TensorFlowClusterSpec spec, Map<UUID, List<UUID>> 
processes) {
+        assert spec != null : "TensorFlow cluster specification should not be 
null";
+        assert processes != null : "Processes should not be null";
+
+        this.spec = spec;
+        this.processes = processes;
+    }
+
+    /** */
+    public TensorFlowClusterSpec getSpec() {
+        return spec;
+    }
+
+    /** */
+    public Map<UUID, List<UUID>> getProcesses() {
+        return processes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java
new file mode 100644
index 0000000..5eee155
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * TensorFlow cluster gateway that allows to subscribe on changes in cluster 
configuration.
+ */
+public class TensorFlowClusterGateway implements IgniteBiPredicate<UUID, 
TensorFlowCluster> {
+    /** */
+    private static final long serialVersionUID = -540323262800791340L;
+
+    /** Subscribers. */
+    private final HashSet<Consumer<TensorFlowCluster>> subscribers = new 
HashSet<>();
+
+    /** Last value received from the upstream. */
+    private TensorFlowCluster last;
+
+    /**
+     * Subscribers the specified subscriber on the upstream events.
+     *
+     * @param subscriber Subscriber.
+     */
+    public synchronized void subscribe(Consumer<TensorFlowCluster> subscriber) 
{
+        subscribers.add(subscriber);
+
+        if (last != null)
+            subscriber.accept(last);
+    }
+
+    /**
+     * Unsubscribe the specified subscriber.
+     *
+     * @param subscriber Subscriber.
+     */
+    public synchronized void unsubscribe(Consumer<TensorFlowCluster> 
subscriber) {
+        subscribers.remove(subscriber);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean apply(UUID uuid, TensorFlowCluster 
cluster) {
+        for (Consumer<TensorFlowCluster> subscriber : subscribers)
+            subscriber.accept(cluster);
+
+        last = cluster;
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java
new file mode 100644
index 0000000..f4b8187
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServices;
+
+/**
+ * TensorFlow cluster manager that allows to start, maintain and stop 
TensorFlow cluster using
+ * {@link TensorFlowClusterManager} and Ignite Service Grid.
+ */
+public class TensorFlowClusterGatewayManager {
+    /** Service name template. */
+    private static final String SERVICE_NAME_TEMPLATE = "TF_SERVICE_%s";
+
+    /** Service topic name template. */
+    private static final String SERVICE_TOPIC_NAME_TEMPLATE = 
"TF_SERVICE_TOPIC_%s";
+
+    /** Ignite instance. */
+    private final Ignite ignite;
+
+    /**
+     * Constructs a new instance of TensorFlow cluster manager with 
maintenance.
+     *
+     * @param ignite Ignite instance.
+     */
+    public TensorFlowClusterGatewayManager(Ignite ignite) {
+        assert ignite != null : "Ignite should not be null";
+
+        this.ignite = ignite;
+    }
+
+    /**
+     * Creates and starts a new TensorFlow cluster for the specified cache if 
it doesn't exist, otherwise returns
+     * existing one.
+     *
+     * @param upstreamCacheName Upstream cache name.
+     * @return TensorFlow cluster gateway that allows to subscribe on cluster 
changes.
+     */
+    public TensorFlowClusterGateway getOrCreateCluster(String 
upstreamCacheName) {
+        String svcName = String.format(SERVICE_NAME_TEMPLATE, 
upstreamCacheName);
+        String topicName = String.format(SERVICE_TOPIC_NAME_TEMPLATE, 
upstreamCacheName);
+
+        TensorFlowClusterGateway gateway = 
createTensorFlowClusterGateway(topicName);
+
+        IgniteServices services = ignite.services();
+
+        services.deployClusterSingleton(svcName, new 
TensorFlowClusterMaintainer(upstreamCacheName, topicName));
+
+        return gateway;
+    }
+
+    /**
+     * Stops TensorFlow cluster.
+     *
+     * @param upstreamCacheName Upstream cache name.
+     */
+    public void stopClusterIfExists(String upstreamCacheName) {
+        IgniteServices services = ignite.services();
+
+        services.cancel(String.format(SERVICE_NAME_TEMPLATE, 
upstreamCacheName));
+    }
+
+    /**
+     * Creates TensorFlow cluster gateway.
+     *
+     * @param topicName Topic name.
+     * @return TensorFlow cluster gateway.
+     */
+    private TensorFlowClusterGateway createTensorFlowClusterGateway(String 
topicName) {
+        TensorFlowClusterGateway gateway = new TensorFlowClusterGateway();
+
+        ignite.message().localListen(topicName, gateway);
+
+        return gateway;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java
new file mode 100644
index 0000000..21b9c2b
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import 
org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState;
+import 
org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
+
+/**
+ * TensorFlow cluster service that maintains TensorFlow cluster.
+ */
+public class TensorFlowClusterMaintainer implements Service {
+    /** */
+    private static final long serialVersionUID = -3220563310643566419L;
+
+    /** Upstream cache name. */
+    private final String cacheName;
+
+    /** Topic name. */
+    private final String topicName;
+
+    /** TensorFlow cluster manager. */
+    private final TensorFlowClusterManager clusterMgr;
+
+    /** Previous partition mapping. */
+    private UUID[] prev;
+
+    /**
+     * Constructs a new instance of TensorFlow cluster service.
+     *
+     * @param cacheName Upstream cache name.
+     * @param topicName Topic name.
+     */
+    public TensorFlowClusterMaintainer(String cacheName, String topicName) {
+        assert cacheName != null : "Cache name should not be null";
+        assert topicName != null : "Topic name should not be null";
+
+        this.clusterMgr = new TensorFlowClusterManager((Supplier<Ignite> & 
Serializable)Ignition::ignite);
+        this.cacheName = cacheName;
+        this.topicName = topicName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel(ServiceContext ctx) {
+        clusterMgr.stopClusterIfExists(cacheName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init(ServiceContext ctx) {
+        clusterMgr.init();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(ServiceContext ctx) throws Exception {
+        while (!ctx.isCancelled()) {
+            Thread.sleep(1000);
+
+            boolean restartRequired = hasAffinityChanged();
+
+            if (!restartRequired) {
+                TensorFlowCluster cluster = 
clusterMgr.getCache().get(cacheName);
+                Map<UUID, List<LongRunningProcessStatus>> statuses = 
clusterMgr.getSrvProcMgr()
+                    .ping(cluster.getProcesses());
+
+                for (UUID nodeId : statuses.keySet()) {
+                    for (LongRunningProcessStatus status : 
statuses.get(nodeId)) {
+                        if 
(status.getState().equals(LongRunningProcessState.DONE)) {
+                            restartRequired = true;
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if (restartRequired) {
+                clusterMgr.stopClusterIfExists(cacheName);
+
+                TensorFlowCluster cluster =  
clusterMgr.getOrCreateCluster(cacheName);
+
+                IgniteMessaging messaging = Ignition.ignite().message();
+                messaging.send(topicName, cluster);
+            }
+        }
+    }
+
+    /**
+     * Checks if affinity mapping has been changed.
+     *
+     * @return True if mapping has been changed, otherwise false.
+     */
+    private boolean hasAffinityChanged() {
+        Affinity<?> affinity = Ignition.ignite().affinity(cacheName);
+
+        int parts = affinity.partitions();
+
+        UUID[] ids = new UUID[parts];
+
+        for (int part = 0; part < parts; part++) {
+            ClusterNode node = affinity.mapPartitionToNode(part);
+            UUID nodeId = node.id();
+            ids[part] = nodeId;
+        }
+
+        if (prev == null || !Arrays.equals(ids, prev)) {
+            prev = ids;
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java
new file mode 100644
index 0000000..2d63195
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec;
+import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServer;
+import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServerManager;
+import org.apache.ignite.tensorflow.cluster.util.TensorFlowClusterResolver;
+
+/**
+ * TensorFlow cluster manager that allows to start, maintain and stop 
TensorFlow cluster.
+ */
+public class TensorFlowClusterManager implements Serializable {
+    /** */
+    private static final long serialVersionUID = -4847155592164802806L;
+
+    /** TensorFlow cluster metadata cache name. */
+    private static final String TF_CLUSTER_METADATA_CACHE_NAME = 
"TF_CLUSTER_METADATA_CACHE";
+
+    /** Ignite instance supplier. */
+    private final Supplier<Ignite> igniteSupplier;
+
+    /** TensorFlow server manager. */
+    private final TensorFlowServerManager srvProcMgr;
+
+    /** TensorFlow cluster resolver. */
+    private final TensorFlowClusterResolver clusterRslvr;
+
+    /** TensorFlow cluster metadata cache. */
+    private transient IgniteCache<String, TensorFlowCluster> cache;
+
+    /**
+     * Constructs a new instance of TensorFlow cluster manager.
+     *
+     * @param igniteSupplier Ignite instance supplier.
+     * @param <T> Type of serializable supplier.
+     */
+    public <T extends Supplier<Ignite> & Serializable> 
TensorFlowClusterManager(T igniteSupplier) {
+        this(
+            igniteSupplier,
+            new TensorFlowServerManager(igniteSupplier),
+            new TensorFlowClusterResolver(igniteSupplier)
+        );
+    }
+
+    /**
+     * Constructs a new instance of TensorFlow cluster manager.
+     *
+     * @param igniteSupplier Ignite instance supplier.
+     * @param srvProcMgr TensorFlow server manager.
+     * @param clusterRslvr TensorFlow cluster resolver.
+     */
+    public <T extends Supplier<Ignite> & Serializable> 
TensorFlowClusterManager(T igniteSupplier,
+        TensorFlowServerManager srvProcMgr, TensorFlowClusterResolver 
clusterRslvr) {
+        assert igniteSupplier != null : "Ignite supplier should not be null";
+        assert srvProcMgr != null : "TensorFlow server manager should not be 
null";
+        assert clusterRslvr != null : "TensorFlow cluster resolver should not 
be null";
+
+        this.igniteSupplier = igniteSupplier;
+        this.srvProcMgr = srvProcMgr;
+        this.clusterRslvr = clusterRslvr;
+    }
+
+    /** Initializes TensorFlow cluster manager and gets or creates 
correspondent caches. */
+    public void init() {
+        clusterRslvr.init();
+
+        CacheConfiguration<String, TensorFlowCluster> cacheConfiguration = new 
CacheConfiguration<>();
+        cacheConfiguration.setName(TF_CLUSTER_METADATA_CACHE_NAME);
+        cacheConfiguration.setCacheMode(CacheMode.REPLICATED);
+        cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        Ignite ignite = igniteSupplier.get();
+        cache = ignite.getOrCreateCache(cacheConfiguration);
+    }
+
+    /**
+     * Creates and starts a new TensorFlow cluster for the specified cache if 
it doesn't exist, otherwise returns
+     * existing one.
+     *
+     * @param upstreamCacheName Upstream cache name.
+     * @return TensorFlow cluster metadata.
+     */
+    public TensorFlowCluster getOrCreateCluster(String upstreamCacheName) {
+        checkThatInitialized();
+
+        Lock clusterMgrCacheLock = cache.lock(upstreamCacheName);
+        clusterMgrCacheLock.lock();
+
+        try {
+            TensorFlowCluster cluster = cache.get(upstreamCacheName);
+
+            if (cluster == null) {
+                TensorFlowClusterSpec clusterSpec = 
clusterRslvr.resolveAndAcquirePorts(upstreamCacheName);
+                cluster = startCluster(clusterSpec);
+                cache.put(upstreamCacheName, cluster);
+            }
+
+            return cluster;
+        }
+        finally {
+            clusterMgrCacheLock.unlock();
+        }
+    }
+
+    /**
+     * Stops TensorFlow cluster.
+     *
+     * @param upstreamCacheName Upstream cache name.
+     */
+    public void stopClusterIfExists(String upstreamCacheName) {
+        checkThatInitialized();
+
+        Lock clusterMgrCacheLock = cache.lock(upstreamCacheName);
+        clusterMgrCacheLock.lock();
+
+        try {
+            TensorFlowCluster cluster = cache.get(upstreamCacheName);
+
+            if (cluster != null) {
+                srvProcMgr.stop(cluster.getProcesses(), true);
+                clusterRslvr.freePorts(cluster.getSpec());
+                cache.remove(upstreamCacheName);
+            }
+        }
+        finally {
+            clusterMgrCacheLock.unlock();
+        }
+    }
+
+    /** Destroys TensorFlow cluster manager and related caches. */
+    public void destroy() {
+        clusterRslvr.destroy();
+
+        Ignite ignite = igniteSupplier.get();
+        ignite.destroyCache(TF_CLUSTER_METADATA_CACHE_NAME);
+    }
+
+    /**
+     * Starts TensorFlow cluster using the specified specification and returns 
metadata of the started cluster.
+     *
+     * @param spec TensorFlow cluster specification.
+     * @return TensorFlow cluster metadata.
+     */
+    private TensorFlowCluster startCluster(TensorFlowClusterSpec spec) {
+        checkThatInitialized();
+
+        List<TensorFlowServer> srvs = new ArrayList<>();
+
+        Map<String, List<TensorFlowServerAddressSpec>> jobs = spec.getJobs();
+
+        for (String jobName : jobs.keySet()) {
+            List<TensorFlowServerAddressSpec> tasks = jobs.get(jobName);
+
+            for (int i = 0; i < tasks.size(); i++) {
+                TensorFlowServer srvSpec = new TensorFlowServer(spec, jobName, 
i);
+                srvs.add(srvSpec);
+            }
+        }
+
+        Map<UUID, List<UUID>> processes = srvProcMgr.start(srvs);
+
+        return new TensorFlowCluster(spec, processes);
+    }
+
+    /**
+     * Checks that the component has been initialized.
+     */
+    private void checkThatInitialized() {
+        if (cache == null)
+            throw new IllegalStateException("TensorFlow Cluster Manager is not 
initialized");
+    }
+
+    /** */
+    public TensorFlowServerManager getSrvProcMgr() {
+        return srvProcMgr;
+    }
+
+    /** */
+    public IgniteCache<String, TensorFlowCluster> getCache() {
+        return cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java
new file mode 100644
index 0000000..ff00e57
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * TensorFlow integration API that allows to start and maintain TensorFlow 
cluster using infrastructure tools from
+ * package {@link org.apache.ignite.tensorflow.core}. The most important 
components are:
+ * <ul>
+ *     <li>{@link 
org.apache.ignite.tensorflow.cluster.TensorFlowClusterManager} that allows to 
start and stop
+ *     TensorFlow cluster on top of Apache Ignite, but doesn't monitor it and 
doesn't maintain so that in case of
+ *     failure the cluster won't be restarted.</li>
+ *     <li>{@link 
org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager} that 
allows to start, maintain
+ *     and stop TensorFlow cluster on top of Apache Ignite so that in case of 
failure the cluster will be restarted and
+ *     recovered.</li>
+ *     <li>{@link 
org.apache.ignite.tensorflow.cluster.TensorFlowClusterGateway} that allows to 
subscribe on cluster
+ *     configuration changes that might be done as result of rebalancing or 
node failures.</li>
+ * </ul>
+ */
+package org.apache.ignite.tensorflow.cluster;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java
new file mode 100644
index 0000000..a053b8e
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.spec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * TensorFlow cluster specification.
+ */
+public class TensorFlowClusterSpec implements Serializable {
+    /** */
+    private static final long serialVersionUID = 1428581667528448091L;
+
+    /** TensorFlow cluster jobs. */
+    private final Map<String, List<TensorFlowServerAddressSpec>> jobs = new 
HashMap<>();
+
+    /**
+     * Adds new task to the cluster specification.
+     *
+     * @param jobName Job name.
+     * @param nodeId Node identifier.
+     * @param port Port number.
+     * @return This instance of TensorFlow cluster specification.
+     */
+    public TensorFlowClusterSpec addTask(String jobName, UUID nodeId, int 
port) {
+        jobs.putIfAbsent(jobName, new ArrayList<>());
+
+        List<TensorFlowServerAddressSpec> tasks = jobs.get(jobName);
+
+        tasks.add(new TensorFlowServerAddressSpec(nodeId, port));
+
+        return this;
+    }
+
+    /** */
+    public Map<String, List<TensorFlowServerAddressSpec>> getJobs() {
+        return jobs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java
new file mode 100644
index 0000000..196b166
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.spec;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * TensorFlow server address specification.
+ */
+public class TensorFlowServerAddressSpec implements Serializable {
+    /** */
+    private static final long serialVersionUID = 7883701602323727681L;
+
+    /** Node identifier. */
+    private final UUID nodeId;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Constructs a new instance of TensorFlow server address specification.
+     *
+     * @param nodeId Node identifier.
+     * @param port Port.
+     */
+    TensorFlowServerAddressSpec(UUID nodeId, int port) {
+        assert nodeId != null : "Node identifier should not be null";
+        assert port >= 0 && port <= 0xFFFF : "Port should be between 0 and 
65535";
+
+        this.nodeId = nodeId;
+        this.port = port;
+    }
+
+    /** */
+    public UUID getNodeId() {
+        return nodeId;
+    }
+
+    /** */
+    public int getPort() {
+        return port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java
new file mode 100644
index 0000000..0425d6c
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains specifications that describes TensorFlow cluster configuration.
+ */
+package org.apache.ignite.tensorflow.cluster.spec;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java
new file mode 100644
index 0000000..ca67405
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.tfrunning;
+
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import java.io.Serializable;
+
+/**
+ * TensorFlow server specification.
+ */
+public class TensorFlowServer implements Serializable {
+    /** */
+    private static final long serialVersionUID = 165988934166176805L;
+
+    /** TensorFlow cluster specification. */
+    private final TensorFlowClusterSpec clusterSpec;
+
+    /** Job name. */
+    private final String jobName;
+
+    /** Task index. */
+    private final Integer taskIdx;
+
+    /** Protocol. */
+    private final String proto;
+
+    /**
+     * Constructs a new instance of TensorFlow server specification.
+     *
+     * @param clusterSpec TensorFlow cluster specification.
+     * @param jobName Job name.
+     */
+    public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName) 
{
+        this(clusterSpec, jobName, null);
+    }
+
+    /**
+     * Constructs a new instance of TensorFlow server specification.
+     *
+     * @param clusterSpec TensorFlow cluster specification.
+     * @param jobName Job name.
+     * @param taskIdx Task index.
+     */
+    public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName, 
Integer taskIdx) {
+        this(clusterSpec, jobName, taskIdx, null);
+    }
+
+    /**
+     * Constructs a new instance of TensorFlow server specification.
+     *
+     * @param clusterSpec TensorFlow cluster specification.
+     * @param jobName Job name.
+     * @param taskIdx Task index.
+     * @param proto Task index.
+     */
+    public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName, 
Integer taskIdx, String proto) {
+        assert clusterSpec != null : "TensorFlow cluster specification should 
not be null";
+        assert jobName != null : "Job name should not be null";
+
+        this.clusterSpec = clusterSpec;
+        this.jobName = jobName;
+        this.taskIdx = taskIdx;
+        this.proto = proto;
+    }
+
+    /** */
+    public TensorFlowClusterSpec getClusterSpec() {
+        return clusterSpec;
+    }
+
+    /** */
+    public String getJobName() {
+        return jobName;
+    }
+
+    /** */
+    public Integer getTaskIdx() {
+        return taskIdx;
+    }
+
+    /** */
+    public String getProto() {
+        return proto;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
new file mode 100644
index 0000000..192f619
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.tfrunning;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+import org.apache.ignite.tensorflow.core.ProcessManager;
+import org.apache.ignite.tensorflow.core.ProcessManagerWrapper;
+import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcess;
+import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessManager;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+
+/**
+ * TensorFlow server manager that allows to start, stop and make other actions 
with TensorFlow servers.
+ */
+public class TensorFlowServerManager extends 
ProcessManagerWrapper<PythonProcess, TensorFlowServer> {
+    /** */
+    private static final long serialVersionUID = 8355019934723445973L;
+
+    /**
+     * Constructs a new instance of TensorFlow server manager.
+     *
+     * @param igniteSupplier Ignite instance supplier.
+     * @param <T> Type of serializable supplier.
+     */
+    public <T extends Supplier<Ignite> & Serializable> 
TensorFlowServerManager(T igniteSupplier) {
+        this(new PythonProcessManager(igniteSupplier));
+    }
+
+    /**
+     * Constructs a new instance of TensorFlow server manager.
+     *
+     * @param delegate Delegate.
+     */
+    public TensorFlowServerManager(ProcessManager<PythonProcess> delegate) {
+        super(delegate);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected PythonProcess transformSpecification(TensorFlowServer 
spec) {
+        return new PythonProcess(
+            formatPythonScript(spec),
+            getNode(spec)
+        );
+    }
+
+    /**
+     * Extracts the cluster node server should be running on.
+     *
+     * @param spec TensorFlow server specification.
+     * @return Node identifier.
+     */
+    private UUID getNode(TensorFlowServer spec) {
+        TensorFlowClusterSpec clusterSpec = spec.getClusterSpec();
+        Map<String, List<TensorFlowServerAddressSpec>> jobs = 
clusterSpec.getJobs();
+        List<TensorFlowServerAddressSpec> tasks = jobs.get(spec.getJobName());
+        TensorFlowServerAddressSpec addr = tasks.get(spec.getTaskIdx());
+
+        return addr.getNodeId();
+    }
+
+    /**
+     * Formats TensorFlow server specification so that it's available to be 
passed into а python script.
+     *
+     * @param spec TensorFlow server specification.
+     * @return Formatted TensorFlow server specification.
+     */
+    private String formatPythonScript(TensorFlowServer spec) {
+        StringBuilder builder = new StringBuilder();
+
+        builder.append("import tensorflow as tf").append('\n');
+        builder.append("cluster = tf.train.ClusterSpec(")
+            .append(formatClusterSpec(spec.getClusterSpec()))
+            .append(')')
+            .append('\n');
+        builder.append("server = tf.train.Server(cluster");
+
+        if (spec.getJobName() != null)
+            builder.append(", 
job_name=\"").append(spec.getJobName()).append('"');
+
+        if (spec.getTaskIdx() != null)
+            builder.append(", task_index=").append(spec.getTaskIdx());
+
+        if (spec.getProto() != null)
+            builder.append(", 
protocol=\"").append(spec.getProto()).append('"');
+
+        builder.append(')').append('\n');
+        builder.append("server.join()").append('\n');
+
+        return builder.toString();
+    }
+
+    /**
+     * Formats TensorFlow cluster specification so that it's available to be 
passed into а python script.
+     *
+     * @param spec TensorFlow cluster specification.
+     * @return Formatted TensorFlow cluster specification.
+     */
+    public String formatClusterSpec(TensorFlowClusterSpec spec) {
+        StringBuilder builder = new StringBuilder();
+
+        builder.append("{\n");
+
+        for (Map.Entry<String, List<TensorFlowServerAddressSpec>> entry : 
spec.getJobs().entrySet()) {
+            builder
+                .append("\t\"")
+                .append(entry.getKey())
+                .append("\" : [ ");
+
+            for (TensorFlowServerAddressSpec address : entry.getValue()) {
+                builder
+                    .append("\n\t\t\"")
+                    .append(formatAddressSpec(address))
+                    .append("\", ");
+            }
+
+            if (!entry.getValue().isEmpty())
+                builder.delete(builder.length() - 2, builder.length());
+
+            builder.append("\n\t],\n");
+        }
+
+        if (!spec.getJobs().isEmpty())
+            builder.delete(builder.length() - 2, builder.length() - 1);
+
+        builder.append('}');
+
+        return builder.toString();
+    }
+
+    /**
+     * Formats TensorFlow server address specification so that it's available 
to be passed into а python script.
+     *
+     * @param spec TensorFlow server address specification.
+     * @return Formatted TensorFlow server address specification.
+     */
+    private String formatAddressSpec(TensorFlowServerAddressSpec spec) {
+        UUID nodeId = spec.getNodeId();
+
+        Ignite ignite = Ignition.localIgnite();
+        Collection<String> names = 
ignite.cluster().forNodeId(nodeId).hostNames();
+
+        return names.iterator().next() + ":" + spec.getPort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java
new file mode 100644
index 0000000..f4d4cad
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Additional "running" layer on top of Python running layer of 
infrastructure. Allows to start TensorFlow server using
+ * server description ({@link 
org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServer}).
+ */
+package org.apache.ignite.tensorflow.cluster.tfrunning;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java
----------------------------------------------------------------------
diff --git 
a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java
 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java
new file mode 100644
index 0000000..78087ab
--- /dev/null
+++ 
b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.util;
+
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Cluster port manager that allows to reliably {@link #acquirePort(UUID)} and 
{@link #freePort(UUID, int)} on the
+ * cluster nodes.
+ */
+public class ClusterPortManager implements Serializable {
+    /** */
+    private static final long serialVersionUID = -5116593574559007292L;
+
+    /** Port manager cache name. */
+    private final String portMgrCacheName;
+
+    /** Port range from point. */
+    private final int from;
+
+    /** Port range size. */
+    private final int cnt;
+
+    /** Ignite instance supplier. */
+    private final Supplier<Ignite> igniteSupplier;
+
+    /** Port manager cache */
+    private transient IgniteCache<UUID, BitSet> cache;
+
+    /**
+     * Constructs a new instance of cluster port manager.
+     *
+     * @param poolName Port pool name.
+     * @param from Port range from point.
+     * @param cnt Port range size.
+     */
+    public <T extends Supplier<Ignite> & Serializable> 
ClusterPortManager(String poolName, int from, int cnt,
+        T igniteSupplier) {
+        assert poolName != null : "Pool name should not be null";
+        assert cnt >= 0 : "Count should not be negative";
+        assert from >= 0 && cnt + from <= 0xFFFF : "Port range should be 
between 0 and 65535";
+        assert igniteSupplier != null : "Ignite supplier should not be null";
+
+        this.portMgrCacheName = String.format("PORT_MANAGER_CACHE_%s", 
poolName);
+        this.from = from;
+        this.cnt = cnt;
+        this.igniteSupplier = igniteSupplier;
+    }
+
+    /** Initializes port manager and creates or gets correspondent caches. */
+    public void init() {
+        CacheConfiguration<UUID, BitSet> cacheConfiguration = new 
CacheConfiguration<>();
+        cacheConfiguration.setName(portMgrCacheName);
+        cacheConfiguration.setCacheMode(CacheMode.REPLICATED);
+        cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        Ignite ignite = igniteSupplier.get();
+        cache = ignite.getOrCreateCache(cacheConfiguration);
+    }
+
+    /**
+     * Acquires free port on the specified node.
+     *
+     * @param nodeId Node identifier.
+     * @return Port to be acquired.
+     */
+    public int acquirePort(UUID nodeId) {
+        checkThatInitialized();
+
+        Lock lock = cache.lock(nodeId);
+        lock.lock();
+
+        try {
+            BitSet ports = cache.get(nodeId);
+
+            if (ports == null)
+                ports = new BitSet(cnt);
+
+            int free = ports.nextClearBit(0);
+
+            if (free >= cnt)
+                throw new IllegalStateException("No free ports in range 
[from=" + from + ", cnt=" + cnt + "]");
+
+            ports.set(free);
+
+            cache.put(nodeId, ports);
+
+            return from + free;
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Frees acquired port on the specified node.
+     *
+     * @param nodeId Node identifier.
+     * @param port Acquired port to be free.
+     */
+    public void freePort(UUID nodeId, int port) {
+        assert port - from >= 0 && port - from < cnt : "Port not in the range";
+
+        checkThatInitialized();
+
+        Lock lock = cache.lock(nodeId);
+        lock.lock();
+
+        try {
+            BitSet ports = cache.get(nodeId);
+
+            if (ports != null) {
+                ports.clear(port - from);
+
+                if (ports.isEmpty())
+                    cache.remove(nodeId);
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /** Destroys port manager and related caches. */
+    public void destroy() {
+        Ignite ignite = igniteSupplier.get();
+        ignite.destroyCache(portMgrCacheName);
+    }
+
+    /**
+     * Checks that the component has been initialized.
+     */
+    private void checkThatInitialized() {
+        if (cache == null)
+            throw new IllegalStateException("Cluster Port Manager is not 
initialized");
+    }
+}

Reply via email to