IGNITE-8795: Add ability to start and maintain TensorFlow cluster on top of Apache Ignite
this closes #4214 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ee876a6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ee876a6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ee876a6 Branch: refs/heads/ignite-8446 Commit: 5ee876a6635b3c48c86c6a51bb0dc9f98ee55d63 Parents: 909b644 Author: Anton Dmitriev <dmitrievanth...@gmail.com> Authored: Thu Jun 21 14:55:06 2018 +0300 Committer: Yury Babak <yba...@gridgain.com> Committed: Thu Jun 21 14:55:06 2018 +0300 ---------------------------------------------------------------------- examples/pom.xml | 21 ++ .../cluster/TensorFlowClusterExample.java | 118 +++++++++++ modules/tensorflow/licences/apache-2.0.txt | 202 ------------------ modules/tensorflow/licenses/apache-2.0.txt | 202 ++++++++++++++++++ modules/tensorflow/pom.xml | 30 ++- .../tensorflow/cluster/TensorFlowCluster.java | 62 ++++++ .../cluster/TensorFlowClusterGateway.java | 68 ++++++ .../TensorFlowClusterGatewayManager.java | 92 ++++++++ .../cluster/TensorFlowClusterMaintainer.java | 138 ++++++++++++ .../cluster/TensorFlowClusterManager.java | 211 +++++++++++++++++++ .../ignite/tensorflow/cluster/package-info.java | 33 +++ .../cluster/spec/TensorFlowClusterSpec.java | 59 ++++++ .../spec/TensorFlowServerAddressSpec.java | 59 ++++++ .../tensorflow/cluster/spec/package-info.java | 22 ++ .../cluster/tfrunning/TensorFlowServer.java | 100 +++++++++ .../tfrunning/TensorFlowServerManager.java | 167 +++++++++++++++ .../cluster/tfrunning/package-info.java | 23 ++ .../cluster/util/ClusterPortManager.java | 161 ++++++++++++++ .../cluster/util/TensorFlowClusterResolver.java | 101 +++++++++ .../tensorflow/cluster/util/package-info.java | 22 ++ .../ignite/tensorflow/core/ProcessManager.java | 63 ++++++ .../tensorflow/core/ProcessManagerWrapper.java | 83 ++++++++ .../core/longrunning/LongRunningProcess.java | 61 ++++++ .../longrunning/LongRunningProcessManager.java | 168 +++++++++++++++ .../core/longrunning/package-info.java | 25 +++ .../task/LongRunningProcessClearTask.java | 98 +++++++++ .../task/LongRunningProcessPingTask.java | 85 ++++++++ .../task/LongRunningProcessStartTask.java | 114 ++++++++++ .../task/LongRunningProcessStopTask.java | 102 +++++++++ .../task/LongRunningProcessTask.java | 53 +++++ .../core/longrunning/task/package-info.java | 23 ++ .../task/util/LongRunningProcessState.java | 27 +++ .../task/util/LongRunningProcessStatus.java | 66 ++++++ .../longrunning/task/util/package-info.java | 22 ++ .../core/nativerunning/NativeProcess.java | 71 +++++++ .../nativerunning/NativeProcessManager.java | 59 ++++++ .../core/nativerunning/package-info.java | 25 +++ .../task/NativeProcessStartTask.java | 116 ++++++++++ .../core/nativerunning/task/package-info.java | 22 ++ .../ignite/tensorflow/core/package-info.java | 26 +++ .../core/pythonrunning/PythonProcess.java | 58 +++++ .../pythonrunning/PythonProcessManager.java | 83 ++++++++ .../core/pythonrunning/package-info.java | 25 +++ .../core/util/CustomizableThreadFactory.java | 54 +++++ .../tensorflow/core/util/package-info.java | 22 ++ .../apache/ignite/tensorflow/package-info.java | 37 ++++ .../ignite/tensorflow/TensorFlowTestSuite.java | 33 +++ .../ignite/tensorflow/core/CoreTestSuite.java | 42 ++++ .../core/ProcessManagerWrapperTest.java | 108 ++++++++++ .../LongRunningProcessManagerTest.java | 169 +++++++++++++++ .../task/LongRunningProcessClearTaskTest.java | 144 +++++++++++++ .../task/LongRunningProcessPingTaskTest.java | 152 +++++++++++++ .../task/LongRunningProcessStartTaskTest.java | 101 +++++++++ .../task/LongRunningProcessStopTaskTest.java | 157 ++++++++++++++ 54 files changed, 4182 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index f29ad87..53aeba5 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -124,6 +124,8 @@ <lgpl.test.folder>src/test/java</lgpl.test.folder> <spark.folder>src/main/java</spark.folder> <spark.test.folder>src/test/java</spark.test.folder> + <tensorflow.folder>src/main/java</tensorflow.folder> + <tensorflow.test.folder>src/test/java</tensorflow.test.folder> </properties> <profiles> @@ -245,6 +247,23 @@ </dependency> </dependencies> </profile> + + <profile> + <id>tensorflow</id> + + <properties> + <tensorflow.folder>src/main/tensorflow</tensorflow.folder> + <tensorflow.test.folder>src/test/tensorflow</tensorflow.test.folder> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-tensorflow</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> </profiles> <build> @@ -275,6 +294,7 @@ <sources> <source>${lgpl.folder}</source> <source>${spark.folder}</source> + <source>${tensorflow.folder}</source> </sources> </configuration> </execution> @@ -289,6 +309,7 @@ <sources> <source>${lgpl.test.folder}</source> <source>${spark.test.folder}</source> + <source>${tensorflow.test.folder}</source> </sources> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java b/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java new file mode 100644 index 0000000..3a956c9 --- /dev/null +++ b/examples/src/main/tensorflow/org/apache/ignite/tensorflow/cluster/TensorFlowClusterExample.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServerManager; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; + +/** + * Prerequisites: be aware that to successfully run this example you need to have Python and TensorFlow installed on + * your machine. To find out how to install Python please take a look this page: https://www.python.org/downloads/. To + * install TensorFlow see this web site: https://www.tensorflow.org/install/. + * + * Example that shows how to use {@link TensorFlowClusterGatewayManager} and start, maintain and stop TensorFlow + * cluster. + */ +public class TensorFlowClusterExample { + /** Run example. */ + public static void main(String... args) throws InterruptedException { + IgniteConfiguration configuration = new IgniteConfiguration(); + configuration.setClientMode(false); + + try (Ignite ignite = Ignition.start(configuration)) { + System.out.println(">>> TensorFlow cluster example started."); + + CacheConfiguration<Integer, Integer> cacheConfiguration = new CacheConfiguration<>(); + cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 10)); + cacheConfiguration.setName("TEST_CACHE"); + + IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration); + for (int i = 0; i < 1000; i++) + cache.put(i, i); + + System.out.println(">>> Cache created."); + + TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite); + TensorFlowClusterGateway gateway = mgr.getOrCreateCluster("TEST_CACHE"); + + System.out.println(">>> TensorFlow cluster gateway started."); + + CountDownLatch latch = new CountDownLatch(1); + + gateway.subscribe(cluster -> { + StringBuilder builder = new StringBuilder(); + builder.append("------------------- TensorFlow Cluster Service Info -------------------").append('\n'); + + builder.append("Specification : ").append('\n'); + + TensorFlowServerManager srvMgr = new TensorFlowServerManager( + (Supplier<Ignite> & Serializable)() -> ignite + ); + + String clusterSpec = srvMgr.formatClusterSpec(cluster.getSpec()); + builder.append(clusterSpec).append('\n'); + + Map<UUID, List<LongRunningProcessStatus>> statuses = srvMgr.ping(cluster.getProcesses()); + + builder.append("State : ").append('\n'); + + for (UUID nodeId : cluster.getProcesses().keySet()) { + List<UUID> pr = cluster.getProcesses().get(nodeId); + List<LongRunningProcessStatus> st = statuses.get(nodeId); + + builder.append("Node ").append(nodeId.toString().substring(0, 8)).append(" -> ").append('\n'); + for (int i = 0; i < pr.size(); i++) { + builder.append("\tProcess ") + .append(pr.get(i).toString().substring(0, 8)) + .append(" with status ") + .append(st.get(i).getState()); + + if (st.get(i).getException() != null) + builder.append(" (").append(st.get(i).getException()).append(")"); + + builder.append('\n'); + } + } + + builder.append("-----------------------------------------------------------------------").append('\n'); + + System.out.println(builder); + + latch.countDown(); + }); + + latch.await(); + + mgr.stopClusterIfExists("TEST_CACHE"); + + System.out.println(">>> TensorFlow cluster example completed."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/licences/apache-2.0.txt ---------------------------------------------------------------------- diff --git a/modules/tensorflow/licences/apache-2.0.txt b/modules/tensorflow/licences/apache-2.0.txt deleted file mode 100644 index d645695..0000000 --- a/modules/tensorflow/licences/apache-2.0.txt +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/licenses/apache-2.0.txt ---------------------------------------------------------------------- diff --git a/modules/tensorflow/licenses/apache-2.0.txt b/modules/tensorflow/licenses/apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/tensorflow/licenses/apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/pom.xml ---------------------------------------------------------------------- diff --git a/modules/tensorflow/pom.xml b/modules/tensorflow/pom.xml index c865d7e..78ab69a 100644 --- a/modules/tensorflow/pom.xml +++ b/modules/tensorflow/pom.xml @@ -37,9 +37,37 @@ <dependencies> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-ml</artifactId> + <artifactId>ignite-core</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java new file mode 100644 index 0000000..cf60e42 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowCluster.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * TensorFlow Cluster metadata. + */ +public class TensorFlowCluster implements Serializable { + /** */ + private static final long serialVersionUID = -6636339457255751011L; + + /** TensorFlow cluster specification. */ + private final TensorFlowClusterSpec spec; + + /** Process identifiers. */ + private final Map<UUID, List<UUID>> processes; + + /** + * Constructs a new instance of TensorFlow cluster. + * + * @param spec TensorFlow cluster specification. + * @param processes Process identifiers. + */ + public TensorFlowCluster(TensorFlowClusterSpec spec, Map<UUID, List<UUID>> processes) { + assert spec != null : "TensorFlow cluster specification should not be null"; + assert processes != null : "Processes should not be null"; + + this.spec = spec; + this.processes = processes; + } + + /** */ + public TensorFlowClusterSpec getSpec() { + return spec; + } + + /** */ + public Map<UUID, List<UUID>> getProcesses() { + return processes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java new file mode 100644 index 0000000..5eee155 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGateway.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import java.util.HashSet; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.ignite.lang.IgniteBiPredicate; + +/** + * TensorFlow cluster gateway that allows to subscribe on changes in cluster configuration. + */ +public class TensorFlowClusterGateway implements IgniteBiPredicate<UUID, TensorFlowCluster> { + /** */ + private static final long serialVersionUID = -540323262800791340L; + + /** Subscribers. */ + private final HashSet<Consumer<TensorFlowCluster>> subscribers = new HashSet<>(); + + /** Last value received from the upstream. */ + private TensorFlowCluster last; + + /** + * Subscribers the specified subscriber on the upstream events. + * + * @param subscriber Subscriber. + */ + public synchronized void subscribe(Consumer<TensorFlowCluster> subscriber) { + subscribers.add(subscriber); + + if (last != null) + subscriber.accept(last); + } + + /** + * Unsubscribe the specified subscriber. + * + * @param subscriber Subscriber. + */ + public synchronized void unsubscribe(Consumer<TensorFlowCluster> subscriber) { + subscribers.remove(subscriber); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean apply(UUID uuid, TensorFlowCluster cluster) { + for (Consumer<TensorFlowCluster> subscriber : subscribers) + subscriber.accept(cluster); + + last = cluster; + + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java new file mode 100644 index 0000000..f4b8187 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterGatewayManager.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteServices; + +/** + * TensorFlow cluster manager that allows to start, maintain and stop TensorFlow cluster using + * {@link TensorFlowClusterManager} and Ignite Service Grid. + */ +public class TensorFlowClusterGatewayManager { + /** Service name template. */ + private static final String SERVICE_NAME_TEMPLATE = "TF_SERVICE_%s"; + + /** Service topic name template. */ + private static final String SERVICE_TOPIC_NAME_TEMPLATE = "TF_SERVICE_TOPIC_%s"; + + /** Ignite instance. */ + private final Ignite ignite; + + /** + * Constructs a new instance of TensorFlow cluster manager with maintenance. + * + * @param ignite Ignite instance. + */ + public TensorFlowClusterGatewayManager(Ignite ignite) { + assert ignite != null : "Ignite should not be null"; + + this.ignite = ignite; + } + + /** + * Creates and starts a new TensorFlow cluster for the specified cache if it doesn't exist, otherwise returns + * existing one. + * + * @param upstreamCacheName Upstream cache name. + * @return TensorFlow cluster gateway that allows to subscribe on cluster changes. + */ + public TensorFlowClusterGateway getOrCreateCluster(String upstreamCacheName) { + String svcName = String.format(SERVICE_NAME_TEMPLATE, upstreamCacheName); + String topicName = String.format(SERVICE_TOPIC_NAME_TEMPLATE, upstreamCacheName); + + TensorFlowClusterGateway gateway = createTensorFlowClusterGateway(topicName); + + IgniteServices services = ignite.services(); + + services.deployClusterSingleton(svcName, new TensorFlowClusterMaintainer(upstreamCacheName, topicName)); + + return gateway; + } + + /** + * Stops TensorFlow cluster. + * + * @param upstreamCacheName Upstream cache name. + */ + public void stopClusterIfExists(String upstreamCacheName) { + IgniteServices services = ignite.services(); + + services.cancel(String.format(SERVICE_NAME_TEMPLATE, upstreamCacheName)); + } + + /** + * Creates TensorFlow cluster gateway. + * + * @param topicName Topic name. + * @return TensorFlow cluster gateway. + */ + private TensorFlowClusterGateway createTensorFlowClusterGateway(String topicName) { + TensorFlowClusterGateway gateway = new TensorFlowClusterGateway(); + + ignite.message().localListen(topicName, gateway); + + return gateway; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java new file mode 100644 index 0000000..21b9c2b --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; + +/** + * TensorFlow cluster service that maintains TensorFlow cluster. + */ +public class TensorFlowClusterMaintainer implements Service { + /** */ + private static final long serialVersionUID = -3220563310643566419L; + + /** Upstream cache name. */ + private final String cacheName; + + /** Topic name. */ + private final String topicName; + + /** TensorFlow cluster manager. */ + private final TensorFlowClusterManager clusterMgr; + + /** Previous partition mapping. */ + private UUID[] prev; + + /** + * Constructs a new instance of TensorFlow cluster service. + * + * @param cacheName Upstream cache name. + * @param topicName Topic name. + */ + public TensorFlowClusterMaintainer(String cacheName, String topicName) { + assert cacheName != null : "Cache name should not be null"; + assert topicName != null : "Topic name should not be null"; + + this.clusterMgr = new TensorFlowClusterManager((Supplier<Ignite> & Serializable)Ignition::ignite); + this.cacheName = cacheName; + this.topicName = topicName; + } + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + clusterMgr.stopClusterIfExists(cacheName); + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) { + clusterMgr.init(); + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + while (!ctx.isCancelled()) { + Thread.sleep(1000); + + boolean restartRequired = hasAffinityChanged(); + + if (!restartRequired) { + TensorFlowCluster cluster = clusterMgr.getCache().get(cacheName); + Map<UUID, List<LongRunningProcessStatus>> statuses = clusterMgr.getSrvProcMgr() + .ping(cluster.getProcesses()); + + for (UUID nodeId : statuses.keySet()) { + for (LongRunningProcessStatus status : statuses.get(nodeId)) { + if (status.getState().equals(LongRunningProcessState.DONE)) { + restartRequired = true; + break; + } + } + } + } + + if (restartRequired) { + clusterMgr.stopClusterIfExists(cacheName); + + TensorFlowCluster cluster = clusterMgr.getOrCreateCluster(cacheName); + + IgniteMessaging messaging = Ignition.ignite().message(); + messaging.send(topicName, cluster); + } + } + } + + /** + * Checks if affinity mapping has been changed. + * + * @return True if mapping has been changed, otherwise false. + */ + private boolean hasAffinityChanged() { + Affinity<?> affinity = Ignition.ignite().affinity(cacheName); + + int parts = affinity.partitions(); + + UUID[] ids = new UUID[parts]; + + for (int part = 0; part < parts; part++) { + ClusterNode node = affinity.mapPartitionToNode(part); + UUID nodeId = node.id(); + ids[part] = nodeId; + } + + if (prev == null || !Arrays.equals(ids, prev)) { + prev = ids; + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java new file mode 100644 index 0000000..2d63195 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/TensorFlowClusterManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; +import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServer; +import org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServerManager; +import org.apache.ignite.tensorflow.cluster.util.TensorFlowClusterResolver; + +/** + * TensorFlow cluster manager that allows to start, maintain and stop TensorFlow cluster. + */ +public class TensorFlowClusterManager implements Serializable { + /** */ + private static final long serialVersionUID = -4847155592164802806L; + + /** TensorFlow cluster metadata cache name. */ + private static final String TF_CLUSTER_METADATA_CACHE_NAME = "TF_CLUSTER_METADATA_CACHE"; + + /** Ignite instance supplier. */ + private final Supplier<Ignite> igniteSupplier; + + /** TensorFlow server manager. */ + private final TensorFlowServerManager srvProcMgr; + + /** TensorFlow cluster resolver. */ + private final TensorFlowClusterResolver clusterRslvr; + + /** TensorFlow cluster metadata cache. */ + private transient IgniteCache<String, TensorFlowCluster> cache; + + /** + * Constructs a new instance of TensorFlow cluster manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> TensorFlowClusterManager(T igniteSupplier) { + this( + igniteSupplier, + new TensorFlowServerManager(igniteSupplier), + new TensorFlowClusterResolver(igniteSupplier) + ); + } + + /** + * Constructs a new instance of TensorFlow cluster manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param srvProcMgr TensorFlow server manager. + * @param clusterRslvr TensorFlow cluster resolver. + */ + public <T extends Supplier<Ignite> & Serializable> TensorFlowClusterManager(T igniteSupplier, + TensorFlowServerManager srvProcMgr, TensorFlowClusterResolver clusterRslvr) { + assert igniteSupplier != null : "Ignite supplier should not be null"; + assert srvProcMgr != null : "TensorFlow server manager should not be null"; + assert clusterRslvr != null : "TensorFlow cluster resolver should not be null"; + + this.igniteSupplier = igniteSupplier; + this.srvProcMgr = srvProcMgr; + this.clusterRslvr = clusterRslvr; + } + + /** Initializes TensorFlow cluster manager and gets or creates correspondent caches. */ + public void init() { + clusterRslvr.init(); + + CacheConfiguration<String, TensorFlowCluster> cacheConfiguration = new CacheConfiguration<>(); + cacheConfiguration.setName(TF_CLUSTER_METADATA_CACHE_NAME); + cacheConfiguration.setCacheMode(CacheMode.REPLICATED); + cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + Ignite ignite = igniteSupplier.get(); + cache = ignite.getOrCreateCache(cacheConfiguration); + } + + /** + * Creates and starts a new TensorFlow cluster for the specified cache if it doesn't exist, otherwise returns + * existing one. + * + * @param upstreamCacheName Upstream cache name. + * @return TensorFlow cluster metadata. + */ + public TensorFlowCluster getOrCreateCluster(String upstreamCacheName) { + checkThatInitialized(); + + Lock clusterMgrCacheLock = cache.lock(upstreamCacheName); + clusterMgrCacheLock.lock(); + + try { + TensorFlowCluster cluster = cache.get(upstreamCacheName); + + if (cluster == null) { + TensorFlowClusterSpec clusterSpec = clusterRslvr.resolveAndAcquirePorts(upstreamCacheName); + cluster = startCluster(clusterSpec); + cache.put(upstreamCacheName, cluster); + } + + return cluster; + } + finally { + clusterMgrCacheLock.unlock(); + } + } + + /** + * Stops TensorFlow cluster. + * + * @param upstreamCacheName Upstream cache name. + */ + public void stopClusterIfExists(String upstreamCacheName) { + checkThatInitialized(); + + Lock clusterMgrCacheLock = cache.lock(upstreamCacheName); + clusterMgrCacheLock.lock(); + + try { + TensorFlowCluster cluster = cache.get(upstreamCacheName); + + if (cluster != null) { + srvProcMgr.stop(cluster.getProcesses(), true); + clusterRslvr.freePorts(cluster.getSpec()); + cache.remove(upstreamCacheName); + } + } + finally { + clusterMgrCacheLock.unlock(); + } + } + + /** Destroys TensorFlow cluster manager and related caches. */ + public void destroy() { + clusterRslvr.destroy(); + + Ignite ignite = igniteSupplier.get(); + ignite.destroyCache(TF_CLUSTER_METADATA_CACHE_NAME); + } + + /** + * Starts TensorFlow cluster using the specified specification and returns metadata of the started cluster. + * + * @param spec TensorFlow cluster specification. + * @return TensorFlow cluster metadata. + */ + private TensorFlowCluster startCluster(TensorFlowClusterSpec spec) { + checkThatInitialized(); + + List<TensorFlowServer> srvs = new ArrayList<>(); + + Map<String, List<TensorFlowServerAddressSpec>> jobs = spec.getJobs(); + + for (String jobName : jobs.keySet()) { + List<TensorFlowServerAddressSpec> tasks = jobs.get(jobName); + + for (int i = 0; i < tasks.size(); i++) { + TensorFlowServer srvSpec = new TensorFlowServer(spec, jobName, i); + srvs.add(srvSpec); + } + } + + Map<UUID, List<UUID>> processes = srvProcMgr.start(srvs); + + return new TensorFlowCluster(spec, processes); + } + + /** + * Checks that the component has been initialized. + */ + private void checkThatInitialized() { + if (cache == null) + throw new IllegalStateException("TensorFlow Cluster Manager is not initialized"); + } + + /** */ + public TensorFlowServerManager getSrvProcMgr() { + return srvProcMgr; + } + + /** */ + public IgniteCache<String, TensorFlowCluster> getCache() { + return cache; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java new file mode 100644 index 0000000..ff00e57 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/package-info.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * TensorFlow integration API that allows to start and maintain TensorFlow cluster using infrastructure tools from + * package {@link org.apache.ignite.tensorflow.core}. The most important components are: + * <ul> + * <li>{@link org.apache.ignite.tensorflow.cluster.TensorFlowClusterManager} that allows to start and stop + * TensorFlow cluster on top of Apache Ignite, but doesn't monitor it and doesn't maintain so that in case of + * failure the cluster won't be restarted.</li> + * <li>{@link org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager} that allows to start, maintain + * and stop TensorFlow cluster on top of Apache Ignite so that in case of failure the cluster will be restarted and + * recovered.</li> + * <li>{@link org.apache.ignite.tensorflow.cluster.TensorFlowClusterGateway} that allows to subscribe on cluster + * configuration changes that might be done as result of rebalancing or node failures.</li> + * </ul> + */ +package org.apache.ignite.tensorflow.cluster; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java new file mode 100644 index 0000000..a053b8e --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowClusterSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.spec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * TensorFlow cluster specification. + */ +public class TensorFlowClusterSpec implements Serializable { + /** */ + private static final long serialVersionUID = 1428581667528448091L; + + /** TensorFlow cluster jobs. */ + private final Map<String, List<TensorFlowServerAddressSpec>> jobs = new HashMap<>(); + + /** + * Adds new task to the cluster specification. + * + * @param jobName Job name. + * @param nodeId Node identifier. + * @param port Port number. + * @return This instance of TensorFlow cluster specification. + */ + public TensorFlowClusterSpec addTask(String jobName, UUID nodeId, int port) { + jobs.putIfAbsent(jobName, new ArrayList<>()); + + List<TensorFlowServerAddressSpec> tasks = jobs.get(jobName); + + tasks.add(new TensorFlowServerAddressSpec(nodeId, port)); + + return this; + } + + /** */ + public Map<String, List<TensorFlowServerAddressSpec>> getJobs() { + return jobs; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java new file mode 100644 index 0000000..196b166 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/TensorFlowServerAddressSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.spec; + +import java.io.Serializable; +import java.util.UUID; + +/** + * TensorFlow server address specification. + */ +public class TensorFlowServerAddressSpec implements Serializable { + /** */ + private static final long serialVersionUID = 7883701602323727681L; + + /** Node identifier. */ + private final UUID nodeId; + + /** Port. */ + private final int port; + + /** + * Constructs a new instance of TensorFlow server address specification. + * + * @param nodeId Node identifier. + * @param port Port. + */ + TensorFlowServerAddressSpec(UUID nodeId, int port) { + assert nodeId != null : "Node identifier should not be null"; + assert port >= 0 && port <= 0xFFFF : "Port should be between 0 and 65535"; + + this.nodeId = nodeId; + this.port = port; + } + + /** */ + public UUID getNodeId() { + return nodeId; + } + + /** */ + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java new file mode 100644 index 0000000..0425d6c --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/spec/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains specifications that describes TensorFlow cluster configuration. + */ +package org.apache.ignite.tensorflow.cluster.spec; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java new file mode 100644 index 0000000..ca67405 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.tfrunning; + +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import java.io.Serializable; + +/** + * TensorFlow server specification. + */ +public class TensorFlowServer implements Serializable { + /** */ + private static final long serialVersionUID = 165988934166176805L; + + /** TensorFlow cluster specification. */ + private final TensorFlowClusterSpec clusterSpec; + + /** Job name. */ + private final String jobName; + + /** Task index. */ + private final Integer taskIdx; + + /** Protocol. */ + private final String proto; + + /** + * Constructs a new instance of TensorFlow server specification. + * + * @param clusterSpec TensorFlow cluster specification. + * @param jobName Job name. + */ + public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName) { + this(clusterSpec, jobName, null); + } + + /** + * Constructs a new instance of TensorFlow server specification. + * + * @param clusterSpec TensorFlow cluster specification. + * @param jobName Job name. + * @param taskIdx Task index. + */ + public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName, Integer taskIdx) { + this(clusterSpec, jobName, taskIdx, null); + } + + /** + * Constructs a new instance of TensorFlow server specification. + * + * @param clusterSpec TensorFlow cluster specification. + * @param jobName Job name. + * @param taskIdx Task index. + * @param proto Task index. + */ + public TensorFlowServer(TensorFlowClusterSpec clusterSpec, String jobName, Integer taskIdx, String proto) { + assert clusterSpec != null : "TensorFlow cluster specification should not be null"; + assert jobName != null : "Job name should not be null"; + + this.clusterSpec = clusterSpec; + this.jobName = jobName; + this.taskIdx = taskIdx; + this.proto = proto; + } + + /** */ + public TensorFlowClusterSpec getClusterSpec() { + return clusterSpec; + } + + /** */ + public String getJobName() { + return jobName; + } + + /** */ + public Integer getTaskIdx() { + return taskIdx; + } + + /** */ + public String getProto() { + return proto; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java new file mode 100644 index 0000000..192f619 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/TensorFlowServerManager.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.tfrunning; + +import java.io.Serializable; +import java.util.function.Supplier; +import org.apache.ignite.tensorflow.core.ProcessManager; +import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; +import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcess; +import org.apache.ignite.tensorflow.core.pythonrunning.PythonProcessManager; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; + +/** + * TensorFlow server manager that allows to start, stop and make other actions with TensorFlow servers. + */ +public class TensorFlowServerManager extends ProcessManagerWrapper<PythonProcess, TensorFlowServer> { + /** */ + private static final long serialVersionUID = 8355019934723445973L; + + /** + * Constructs a new instance of TensorFlow server manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> TensorFlowServerManager(T igniteSupplier) { + this(new PythonProcessManager(igniteSupplier)); + } + + /** + * Constructs a new instance of TensorFlow server manager. + * + * @param delegate Delegate. + */ + public TensorFlowServerManager(ProcessManager<PythonProcess> delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override protected PythonProcess transformSpecification(TensorFlowServer spec) { + return new PythonProcess( + formatPythonScript(spec), + getNode(spec) + ); + } + + /** + * Extracts the cluster node server should be running on. + * + * @param spec TensorFlow server specification. + * @return Node identifier. + */ + private UUID getNode(TensorFlowServer spec) { + TensorFlowClusterSpec clusterSpec = spec.getClusterSpec(); + Map<String, List<TensorFlowServerAddressSpec>> jobs = clusterSpec.getJobs(); + List<TensorFlowServerAddressSpec> tasks = jobs.get(spec.getJobName()); + TensorFlowServerAddressSpec addr = tasks.get(spec.getTaskIdx()); + + return addr.getNodeId(); + } + + /** + * Formats TensorFlow server specification so that it's available to be passed into а python script. + * + * @param spec TensorFlow server specification. + * @return Formatted TensorFlow server specification. + */ + private String formatPythonScript(TensorFlowServer spec) { + StringBuilder builder = new StringBuilder(); + + builder.append("import tensorflow as tf").append('\n'); + builder.append("cluster = tf.train.ClusterSpec(") + .append(formatClusterSpec(spec.getClusterSpec())) + .append(')') + .append('\n'); + builder.append("server = tf.train.Server(cluster"); + + if (spec.getJobName() != null) + builder.append(", job_name=\"").append(spec.getJobName()).append('"'); + + if (spec.getTaskIdx() != null) + builder.append(", task_index=").append(spec.getTaskIdx()); + + if (spec.getProto() != null) + builder.append(", protocol=\"").append(spec.getProto()).append('"'); + + builder.append(')').append('\n'); + builder.append("server.join()").append('\n'); + + return builder.toString(); + } + + /** + * Formats TensorFlow cluster specification so that it's available to be passed into а python script. + * + * @param spec TensorFlow cluster specification. + * @return Formatted TensorFlow cluster specification. + */ + public String formatClusterSpec(TensorFlowClusterSpec spec) { + StringBuilder builder = new StringBuilder(); + + builder.append("{\n"); + + for (Map.Entry<String, List<TensorFlowServerAddressSpec>> entry : spec.getJobs().entrySet()) { + builder + .append("\t\"") + .append(entry.getKey()) + .append("\" : [ "); + + for (TensorFlowServerAddressSpec address : entry.getValue()) { + builder + .append("\n\t\t\"") + .append(formatAddressSpec(address)) + .append("\", "); + } + + if (!entry.getValue().isEmpty()) + builder.delete(builder.length() - 2, builder.length()); + + builder.append("\n\t],\n"); + } + + if (!spec.getJobs().isEmpty()) + builder.delete(builder.length() - 2, builder.length() - 1); + + builder.append('}'); + + return builder.toString(); + } + + /** + * Formats TensorFlow server address specification so that it's available to be passed into а python script. + * + * @param spec TensorFlow server address specification. + * @return Formatted TensorFlow server address specification. + */ + private String formatAddressSpec(TensorFlowServerAddressSpec spec) { + UUID nodeId = spec.getNodeId(); + + Ignite ignite = Ignition.localIgnite(); + Collection<String> names = ignite.cluster().forNodeId(nodeId).hostNames(); + + return names.iterator().next() + ":" + spec.getPort(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java new file mode 100644 index 0000000..f4d4cad --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/tfrunning/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Additional "running" layer on top of Python running layer of infrastructure. Allows to start TensorFlow server using + * server description ({@link org.apache.ignite.tensorflow.cluster.tfrunning.TensorFlowServer}). + */ +package org.apache.ignite.tensorflow.cluster.tfrunning; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java new file mode 100644 index 0000000..78087ab --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/ClusterPortManager.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.util; + +import java.io.Serializable; +import java.util.BitSet; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * Cluster port manager that allows to reliably {@link #acquirePort(UUID)} and {@link #freePort(UUID, int)} on the + * cluster nodes. + */ +public class ClusterPortManager implements Serializable { + /** */ + private static final long serialVersionUID = -5116593574559007292L; + + /** Port manager cache name. */ + private final String portMgrCacheName; + + /** Port range from point. */ + private final int from; + + /** Port range size. */ + private final int cnt; + + /** Ignite instance supplier. */ + private final Supplier<Ignite> igniteSupplier; + + /** Port manager cache */ + private transient IgniteCache<UUID, BitSet> cache; + + /** + * Constructs a new instance of cluster port manager. + * + * @param poolName Port pool name. + * @param from Port range from point. + * @param cnt Port range size. + */ + public <T extends Supplier<Ignite> & Serializable> ClusterPortManager(String poolName, int from, int cnt, + T igniteSupplier) { + assert poolName != null : "Pool name should not be null"; + assert cnt >= 0 : "Count should not be negative"; + assert from >= 0 && cnt + from <= 0xFFFF : "Port range should be between 0 and 65535"; + assert igniteSupplier != null : "Ignite supplier should not be null"; + + this.portMgrCacheName = String.format("PORT_MANAGER_CACHE_%s", poolName); + this.from = from; + this.cnt = cnt; + this.igniteSupplier = igniteSupplier; + } + + /** Initializes port manager and creates or gets correspondent caches. */ + public void init() { + CacheConfiguration<UUID, BitSet> cacheConfiguration = new CacheConfiguration<>(); + cacheConfiguration.setName(portMgrCacheName); + cacheConfiguration.setCacheMode(CacheMode.REPLICATED); + cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + Ignite ignite = igniteSupplier.get(); + cache = ignite.getOrCreateCache(cacheConfiguration); + } + + /** + * Acquires free port on the specified node. + * + * @param nodeId Node identifier. + * @return Port to be acquired. + */ + public int acquirePort(UUID nodeId) { + checkThatInitialized(); + + Lock lock = cache.lock(nodeId); + lock.lock(); + + try { + BitSet ports = cache.get(nodeId); + + if (ports == null) + ports = new BitSet(cnt); + + int free = ports.nextClearBit(0); + + if (free >= cnt) + throw new IllegalStateException("No free ports in range [from=" + from + ", cnt=" + cnt + "]"); + + ports.set(free); + + cache.put(nodeId, ports); + + return from + free; + } + finally { + lock.unlock(); + } + } + + /** + * Frees acquired port on the specified node. + * + * @param nodeId Node identifier. + * @param port Acquired port to be free. + */ + public void freePort(UUID nodeId, int port) { + assert port - from >= 0 && port - from < cnt : "Port not in the range"; + + checkThatInitialized(); + + Lock lock = cache.lock(nodeId); + lock.lock(); + + try { + BitSet ports = cache.get(nodeId); + + if (ports != null) { + ports.clear(port - from); + + if (ports.isEmpty()) + cache.remove(nodeId); + } + } + finally { + lock.unlock(); + } + } + + /** Destroys port manager and related caches. */ + public void destroy() { + Ignite ignite = igniteSupplier.get(); + ignite.destroyCache(portMgrCacheName); + } + + /** + * Checks that the component has been initialized. + */ + private void checkThatInitialized() { + if (cache == null) + throw new IllegalStateException("Cluster Port Manager is not initialized"); + } +}