http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java new file mode 100644 index 0000000..e1d7d57 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowClusterResolver.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.util; + +import java.io.Serializable; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; + +/** + * TensorFlow cluster resolver based on Ignite Cache affinity. + */ +public class TensorFlowClusterResolver implements Serializable { + /** */ + private static final long serialVersionUID = 631456775167710173L; + + /** Cluster port manager. */ + private final ClusterPortManager portMgr; + + /** Ignite instance supplier. */ + private final Supplier<Ignite> igniteSupplier; + + /** + * Constructs a new instance of TensorFlow cluster resolver. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> TensorFlowClusterResolver(T igniteSupplier) { + assert igniteSupplier != null : "Ignite supplier should not be null"; + + this.igniteSupplier = igniteSupplier; + this.portMgr = new ClusterPortManager("TF_POOL", 10000, 100, igniteSupplier); + } + + /** Initializes TensorFlow cluster resolver. */ + public void init() { + portMgr.init(); + } + + /** + * Resolves TensorFlow cluster and acquires required ports. + * + * @param upstreamCacheName Upstream cache name. + * @return TensorFlow cluster specification. + */ + public TensorFlowClusterSpec resolveAndAcquirePorts(String upstreamCacheName) { + Ignite ignite = igniteSupplier.get(); + Affinity<?> affinity = ignite.affinity(upstreamCacheName); + + int parts = affinity.partitions(); + + TensorFlowClusterSpec spec = new TensorFlowClusterSpec(); + + for (int part = 0; part < parts; part++) { + ClusterNode node = affinity.mapPartitionToNode(part); + UUID nodeId = node.id(); + + int port = portMgr.acquirePort(nodeId); + + spec.addTask("WORKER", nodeId, port); + } + + return spec; + } + + /** + * Frees ports acquired for the given cluster specification. + * + * @param spec TensorFlow cluster specification. + */ + public void freePorts(TensorFlowClusterSpec spec) { + for (String jobName : spec.getJobs().keySet()) + for (TensorFlowServerAddressSpec address : spec.getJobs().get(jobName)) + portMgr.freePort(address.getNodeId(), address.getPort()); + } + + /** Destroys TensorFlow cluster resolver. */ + public void destroy() { + portMgr.destroy(); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/package-info.java new file mode 100644 index 0000000..0c32959 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Util components that are used in TensorFlow cluster package. + */ +package org.apache.ignite.tensorflow.cluster.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java new file mode 100644 index 0000000..0ef81bc --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core; + +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Process manager that allows to run and maintain processes in the cluster. + * + * @param <R> Type of task to be run. + */ +public interface ProcessManager<R> extends Serializable { + /** + * Starts the processes by the given specifications. + * + * @param specifications Process specifications. + * @return Map of node identifier as a key and list of started process identifiers as a value. + */ + public Map<UUID, List<UUID>> start(List<R> specifications); + + /** + * Pings the given processes. + * + * @param procIds Map of node identifier as a key and list of process identifiers as a value. + * @return Map of node identifier as a key and list of process statuses as a value. + */ + public Map<UUID, List<LongRunningProcessStatus>> ping(Map<UUID, List<UUID>> procIds); + + /** + * Stops the given processes. + * + * @param procIds Map of node identifier as a key and list of process identifiers as a value. + * @return Map of node identifier as a key and list of process statuses as a value. + */ + public Map<UUID, List<LongRunningProcessStatus>> stop(Map<UUID, List<UUID>> procIds, boolean clear); + + /** + * Clears metadata of the given processes. + * + * @param procIds Map of node identifier as a key and list of process identifiers as a value. + * @return Map of node identifier as a key and list of process statuses as a value. + */ + public Map<UUID, List<LongRunningProcessStatus>> clear(Map<UUID, List<UUID>> procIds); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java new file mode 100644 index 0000000..b66b54f --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core; + +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Process manager wrapper that allows to define how one type of process specification should be transformed into + * another type of process specification delegate working with and delegate all operations to this delegate. + * + * @param <T> Type of process specification delegate working with. + * @param <R> Type of accepted process specifications. + */ +public abstract class ProcessManagerWrapper<T, R> implements ProcessManager<R> { + /** */ + private static final long serialVersionUID = -6397225095261457524L; + + /** Delegate. */ + private final ProcessManager<T> delegate; + + /** + * Constructs a new instance of process manager wrapper. + * + * @param delegate Delegate. + */ + public ProcessManagerWrapper(ProcessManager<T> delegate) { + assert delegate != null : "Delegate should not be null"; + + this.delegate = delegate; + } + + /** + * Transforms accepted process specification into process specification delegate working with. + * + * @param spec Accepted process specification. + * @return Process specification delegate working with. + */ + protected abstract T transformSpecification(R spec); + + /** {@inheritDoc} */ + @Override public Map<UUID, List<UUID>> start(List<R> specifications) { + List<T> transformedSpecifications = new ArrayList<>(); + + for (R spec : specifications) + transformedSpecifications.add(transformSpecification(spec)); + + return delegate.start(transformedSpecifications); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> ping(Map<UUID, List<UUID>> procIds) { + return delegate.ping(procIds); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> stop(Map<UUID, List<UUID>> procIds, boolean clear) { + return delegate.stop(procIds, clear); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> clear(Map<UUID, List<UUID>> procIds) { + return delegate.clear(procIds); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcess.java new file mode 100644 index 0000000..f11f42a --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcess.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * Long running process specification that contains identifier of a node where the process should be running on and the + * task to be run. + */ +public class LongRunningProcess implements Serializable { + /** */ + private static final long serialVersionUID = 6039507725567997183L; + + /** Node identifier. */ + private final UUID nodeId; + + /** Task to be run. */ + private final IgniteRunnable task; + + /** + * Constructs a new instance of long running process specification. + * + * @param nodeId Node identifier. + * @param task Task to be run. + */ + public LongRunningProcess(UUID nodeId, IgniteRunnable task) { + assert nodeId != null : "Node identifier should not be null"; + assert task != null : "Task should not be null"; + + this.nodeId = nodeId; + this.task = task; + } + + /** */ + public UUID getNodeId() { + return nodeId; + } + + /** */ + public IgniteRunnable getTask() { + return task; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java new file mode 100644 index 0000000..027ece3 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterGroupEmptyException; +import org.apache.ignite.tensorflow.core.ProcessManager; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessClearTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessPingTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStartTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStopTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessTask; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; + +/** + * Long running process manager that allows to start, stop and make other actions with long running processes. + */ +public class LongRunningProcessManager implements ProcessManager<LongRunningProcess> { + /** */ + private static final long serialVersionUID = 1151455641358063287L; + + /** Ignite instance supplier. */ + private final Supplier<Ignite> igniteSupplier; + + /** + * Constructs a new instance of long running process manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> LongRunningProcessManager(T igniteSupplier) { + assert igniteSupplier != null : "Ignite supplier should not be null"; + + this.igniteSupplier = igniteSupplier; + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<UUID>> start(List<LongRunningProcess> specifications) { + return call(groupByNodeId(specifications), LongRunningProcessStartTask::new, this::rollbackStartTask, false); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> ping(Map<UUID, List<UUID>> procIds) { + return call(procIds, LongRunningProcessPingTask::new, this::rollbackNothing, false); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> stop(Map<UUID, List<UUID>> procIds, boolean clear) { + return call(procIds, params -> new LongRunningProcessStopTask(params, clear), this::rollbackNothing, true); + } + + /** {@inheritDoc} */ + @Override public Map<UUID, List<LongRunningProcessStatus>> clear(Map<UUID, List<UUID>> procIds) { + return call(procIds, LongRunningProcessClearTask::new, this::rollbackNothing, true); + } + + /** + * Sends the specified tasks to the cluster to be executed. + * + * @param params Parameters needed to create tasks. + * @param taskSupplier Supplier that defines how to create tasks. + * @param rollback Rollback procedure. + * @param onlyIfNodeExists If node doesn't exist the correspondent task will be ignored. + * @param <T> Type of params needed to create tasks. + * @param <E> Type of returned by task value. + * @return Map of node identifier as a key and list task results as a value. + */ + private <T, E> Map<UUID, List<E>> call(Map<UUID, List<T>> params, + Function<List<T>, LongRunningProcessTask<List<E>>> taskSupplier, Consumer<Map<UUID, List<E>>> rollback, + boolean onlyIfNodeExists) { + Map<UUID, List<E>> res = new HashMap<>(); + + try { + for (UUID nodeId : params.keySet()) { + List<T> nodeProcesses = params.get(nodeId); + LongRunningProcessTask<List<E>> task = taskSupplier.apply(nodeProcesses); + + Ignite ignite = igniteSupplier.get(); + ClusterGroup clusterGrp = ignite.cluster().forNodeId(nodeId); + + try { + List<E> nodeRes = ignite.compute(clusterGrp).call(task); + res.put(nodeId, nodeRes); + } + catch (ClusterGroupEmptyException e) { + if (!onlyIfNodeExists) + throw e; + } + } + } + catch (Exception e) { + // All-or-nothing strategy. In case of exception already processed tasks should be rolled back. + rollback.accept(res); + + throw e; + } + + return res; + } + + /** + * Groups the given process specifications by node identifier. + * + * @param specifications Process specifications. + * @return Map of node identifier as a key and list of process specifications as a value. + */ + private Map<UUID, List<LongRunningProcess>> groupByNodeId(List<LongRunningProcess> specifications) { + Map<UUID, List<LongRunningProcess>> res = new HashMap<>(); + + for (LongRunningProcess spec : specifications) { + UUID nodeId = spec.getNodeId(); + + List<LongRunningProcess> nodeSpecs = res.get(nodeId); + + if (nodeSpecs == null) { + nodeSpecs = new ArrayList<>(); + nodeSpecs.add(spec); + res.put(nodeId, nodeSpecs); + } + else + nodeSpecs.add(spec); + } + + return res; + } + + /** + * Rolls back start task successfully applied earlier. + * + * @param procIds Process identifiers. + */ + private void rollbackStartTask(Map<UUID, List<UUID>> procIds) { + stop(procIds, true); + } + + /** + * Rolls back nothing. Ping, stop and clear tasks cannot be rolled back, so it's the only one available strategy + * for these tasks. + */ + private void rollbackNothing(Map<UUID, List<LongRunningProcessStatus>> processes) { + // Do nothing. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/package-info.java new file mode 100644 index 0000000..af0f995 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * The part of TensorFlow integration infrastructure that allows to start and maintain abstract long-running processes. + * As described in {@link org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess} user only needs to specify + * runnable task and the node identifier the process should be started on and LongRunning Process Manager will make the + * rest so that the specified runnable will be executed and maintained on the specified node. + */ +package org.apache.ignite.tensorflow.core.longrunning; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTask.java new file mode 100644 index 0000000..2afaaa9 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTask.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; + +/** + * Task that clears process metadata on the node where the process has run. + */ +public class LongRunningProcessClearTask extends LongRunningProcessTask<List<LongRunningProcessStatus>> { + /** */ + private static final long serialVersionUID = -1840332865137076107L; + + /** Process identifiers. */ + private final List<UUID> procIds; + + /** + * Constructs a new instance of clear task. + * + * @param procIds Process identifiers. + */ + public LongRunningProcessClearTask(List<UUID> procIds) { + assert procIds != null : "Process identifiers should not be null"; + + this.procIds = procIds; + } + + /** {@inheritDoc} */ + @Override public List<LongRunningProcessStatus> call() { + ArrayList<LongRunningProcessStatus> res = new ArrayList<>(); + + for (UUID prodId : procIds) + res.add(prepareProcessForRemoving(prodId)); + + // All-or-nothing strategy. Processes will be removed only if all processes can be removed. + removeProcessesFromMetadataStorage(); + + return res; + } + + /** + * Prepares process to be removed. Checks that the process is not running and wraps information about state and + * exception into a status object. + * + * @param procId Process identifier. + * @return Process status. + */ + private LongRunningProcessStatus prepareProcessForRemoving(UUID procId) { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + Future<?> fut = metadataStorage.get(procId); + + if (fut == null) + return new LongRunningProcessStatus(LongRunningProcessState.NOT_FOUND); + + if (!fut.isDone()) + throw new IllegalStateException("Process is still running [procId=" + procId + "]"); + + try { + fut.get(); + return new LongRunningProcessStatus(LongRunningProcessState.DONE); + } + catch (Exception e) { + return new LongRunningProcessStatus(LongRunningProcessState.DONE, e); + } + } + + /** + * Removes processes from metadata storage. + */ + private void removeProcessesFromMetadataStorage() { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + for (UUID procId : procIds) + metadataStorage.remove(procId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTask.java new file mode 100644 index 0000000..5d54886 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; + +/** + * Task that pings processes to check their statuses. + */ +public class LongRunningProcessPingTask extends LongRunningProcessTask<List<LongRunningProcessStatus>> { + /** */ + private static final long serialVersionUID = 7003289989579770395L; + + /** Process identifiers. */ + private final List<UUID> procIds; + + /** + * Constructs a new instance of ping task. + * + * @param procIds Process identifiers. + */ + public LongRunningProcessPingTask(List<UUID> procIds) { + assert procIds != null : "Process identifiers should not be null"; + + this.procIds = procIds; + } + + /** {@inheritDoc} */ + @Override public List<LongRunningProcessStatus> call() { + ArrayList<LongRunningProcessStatus> statuses = new ArrayList<>(); + + for (UUID procId : procIds) + statuses.add(getProcessStatus(procId)); + + return statuses; + } + + /** + * Extracts the process status. + * + * @param procId Process identifier. + * @return Process status. + */ + private LongRunningProcessStatus getProcessStatus(UUID procId) { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + Future<?> fut = metadataStorage.get(procId); + + if (fut == null) + return new LongRunningProcessStatus(LongRunningProcessState.NOT_FOUND); + + if (!fut.isDone()) + return new LongRunningProcessStatus(LongRunningProcessState.RUNNING); + + try { + fut.get(); + } + catch (Exception e) { + return new LongRunningProcessStatus(LongRunningProcessState.DONE, e); + } + + return new LongRunningProcessStatus(LongRunningProcessState.DONE); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java new file mode 100644 index 0000000..1d08519 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory; + +/** + * Task that starts long running processes by their specifications. + */ +public class LongRunningProcessStartTask extends LongRunningProcessTask<List<UUID>> { + /** */ + private static final long serialVersionUID = -3934183044853083034L; + + /** Process specifications. */ + private final List<LongRunningProcess> processes; + + /** + * Constructs a new instance of start task. + * + * @param processes Process specifications. + */ + public LongRunningProcessStartTask(List<LongRunningProcess> processes) { + assert processes != null : "Processes should not be null"; + + this.processes = processes; + } + + /** {@inheritDoc} */ + @Override public List<UUID> call() { + ArrayList<UUID> res = new ArrayList<>(); + + try { + for (LongRunningProcess proc : processes) { + Future<?> fut = runTask(proc.getTask()); + + UUID procId = saveProcMetadata(fut); + + res.add(procId); + } + } + catch (Exception e) { + // All-or-nothing strategy. In case of exception already started processes will be stopped. + stopAllProcessesAndClearMetadata(res); + + throw e; + } + + return res; + } + + /** + * Executes the task in a separate thread. + * + * @param task Task to be executed. + * @return Future that allows to interrupt or get the status of the task. + */ + private Future<?> runTask(Runnable task) { + return Executors + .newSingleThreadExecutor(new CustomizableThreadFactory("LONG_RUNNING_PROCESS_TASK", true)) + .submit(task); + } + + /** + * Saves process metadata into the local metadata storage. + * + * @param fut Future that allows to interrupt or get the status of the task. + * @return Process identifier. + */ + private UUID saveProcMetadata(Future<?> fut) { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + UUID procId = UUID.randomUUID(); + + metadataStorage.put(procId, fut); + + return procId; + } + + /** + * Stop all processes by their identifiers and removes them from the metadata storage. + * + * @param procIds Process identifiers. + */ + private void stopAllProcessesAndClearMetadata(List<UUID> procIds) { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + for (UUID procId : procIds) { + Future<?> fut = metadataStorage.remove(procId); + fut.cancel(true); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTask.java new file mode 100644 index 0000000..56b906b --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTask.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; + +/** + * Task that stops long running processes. + */ +public class LongRunningProcessStopTask extends LongRunningProcessTask<List<LongRunningProcessStatus>> { + /** */ + private static final long serialVersionUID = -5552468435820611170L; + + /** Process identifiers. */ + private final List<UUID> procIds; + + /** Flag that defines that metadata should be removed immediately. */ + private final boolean clear; + + /** + * Constructs a new instance of stop task. + * + * @param procIds Process identifiers. + * @param clear Flag that defines that metadata should be removed immediately. + */ + public LongRunningProcessStopTask(List<UUID> procIds, boolean clear) { + assert procIds != null : "Process identifiers should not be null"; + + this.procIds = procIds; + this.clear = clear; + } + + /** {@inheritDoc} */ + @Override public List<LongRunningProcessStatus> call() { + ArrayList<LongRunningProcessStatus> res = new ArrayList<>(); + + for (UUID prodId : procIds) + res.add(stopProcess(prodId)); + + // All-or-nothing strategy. Processes will be removed only if all processes can be removed. + if (clear) + removeProcessesFromMetadataStorage(); + + return res; + } + + /** + * Stop process by process identifier. + * + * @param procId Process identifier. + * @return Process status after stop. + */ + private LongRunningProcessStatus stopProcess(UUID procId) { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + Future<?> fut = metadataStorage.get(procId); + + if (fut == null) + return new LongRunningProcessStatus(LongRunningProcessState.NOT_FOUND); + + try { + fut.cancel(true); + fut.get(); + } + catch (Exception e) { + return new LongRunningProcessStatus(LongRunningProcessState.DONE, e); + } + + return new LongRunningProcessStatus(LongRunningProcessState.DONE); + } + + /** + * Removes processes from metadata storage. + */ + private void removeProcessesFromMetadataStorage() { + Map<UUID, Future<?>> metadataStorage = getMetadataStorage(); + + for (UUID procId : procIds) + metadataStorage.remove(procId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessTask.java new file mode 100644 index 0000000..96d2274 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcessManager; + +/** + * Task that can be executed on a cluster using the {@link LongRunningProcessManager}. + * + * @param <T> Type of the result. + */ +public abstract class LongRunningProcessTask<T> implements IgniteCallable<T> { + /** */ + private static final long serialVersionUID = 2351380200758417004L; + + /** Long running process storage name. */ + private static final String LONG_RUNNING_PROCESS_STORAGE_NAME = "LONG_RUNNING_PROCESS_STORAGE"; + + /** + * Returns task metadata storage that is kept by every node in the cluster. + * + * @return Task metadata storage that is kept by every node in the cluster. + */ + ConcurrentHashMap<UUID, Future<?>> getMetadataStorage() { + Ignite ignite = Ignition.localIgnite(); + + ConcurrentMap<String, ConcurrentHashMap<UUID, Future<?>>> nodeLocMap = ignite.cluster().nodeLocalMap(); + + return nodeLocMap.computeIfAbsent(LONG_RUNNING_PROCESS_STORAGE_NAME, k -> new ConcurrentHashMap<>()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/package-info.java new file mode 100644 index 0000000..2ffc5d4 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains long-running process tasks that encapsulates the logic of starting, pinging and stopping a long-running + * process. + */ +package org.apache.ignite.tensorflow.core.longrunning.task; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessState.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessState.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessState.java new file mode 100644 index 0000000..7b3c777 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task.util; + +/** + * Long running process state. + */ +public enum LongRunningProcessState { + /** Process not found. */ NOT_FOUND, + /** Process is running. */ RUNNING, + /** Process is done */ DONE +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessStatus.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessStatus.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessStatus.java new file mode 100644 index 0000000..1ae8e15 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/LongRunningProcessStatus.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task.util; + +import java.io.Serializable; + +/** + * Long running process status that includes state and exception if exists. + */ +public class LongRunningProcessStatus implements Serializable { + /** */ + private static final long serialVersionUID = -2958316078722079954L; + + /** Process state. */ + private final LongRunningProcessState state; + + /** Process exception. */ + private final Exception e; + + /** + * Constructs a new instance of long running process status. + * + * @param state Process state. + */ + public LongRunningProcessStatus(LongRunningProcessState state) { + this(state, null); + } + + /** + * Constructs a new instance of long running process status. + * + * @param state Process state. + * @param e Process exception. + */ + public LongRunningProcessStatus(LongRunningProcessState state, Exception e) { + assert state != null : "Process state should not be null"; + + this.state = state; + this.e = e; + } + + /** */ + public LongRunningProcessState getState() { + return state; + } + + /** */ + public Exception getException() { + return e; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/package-info.java new file mode 100644 index 0000000..015aee6 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Util package that contains util classes used in long-running tasks. + */ +package org.apache.ignite.tensorflow.core.longrunning.task.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java new file mode 100644 index 0000000..df36ba9 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.nativerunning; + +import java.io.Serializable; +import java.util.UUID; +import java.util.function.Supplier; + +/** + * Native process specification. + */ +public class NativeProcess implements Serializable { + /** */ + private static final long serialVersionUID = -7056800139746134956L; + + /** Process builder supplier. */ + private final Supplier<ProcessBuilder> procBuilderSupplier; + + /** Stdin of the process. */ + private final String stdin; + + /** Node identifier. */ + private final UUID nodeId; + + /** + * Constructs a new instance of native process specification. + * + * @param procBuilderSupplier Process builder supplier. + * @param stdin Stdin of the process. + * @param nodeId Node identifier. + */ + public <T extends Supplier<ProcessBuilder> & Serializable> NativeProcess(T procBuilderSupplier, String stdin, + UUID nodeId) { + assert procBuilderSupplier != null : "Process builder supplier should not be null"; + assert nodeId != null : "Node identifier should not be null"; + + this.procBuilderSupplier = procBuilderSupplier; + this.stdin = stdin; + this.nodeId = nodeId; + } + + /** */ + public Supplier<ProcessBuilder> getProcBuilderSupplier() { + return procBuilderSupplier; + } + + /** */ + public String getStdin() { + return stdin; + } + + /** */ + public UUID getNodeId() { + return nodeId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java new file mode 100644 index 0000000..60cd89b --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.nativerunning; + +import java.io.Serializable; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.core.ProcessManager; +import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess; +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcessManager; +import org.apache.ignite.tensorflow.core.nativerunning.task.NativeProcessStartTask; + +/** + * Native process manager that allows to start, stop and make other actions with native processes. + */ +public class NativeProcessManager extends ProcessManagerWrapper<LongRunningProcess, NativeProcess> { + /** */ + private static final long serialVersionUID = 718119807915504045L; + + /** + * Constructs a new native process manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> NativeProcessManager(T igniteSupplier) { + super(new LongRunningProcessManager(igniteSupplier)); + } + + /** + * Constructs a new native process manager. + * + * @param delegate Delegate. + */ + public NativeProcessManager(ProcessManager<LongRunningProcess> delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override protected LongRunningProcess transformSpecification(NativeProcess spec) { + return new LongRunningProcess(spec.getNodeId(), new NativeProcessStartTask(spec)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/package-info.java new file mode 100644 index 0000000..3e3fdce --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * The part of TensorFlow integration infrastructure that allows to start and maintain abstract native processes. As + * described in {@link org.apache.ignite.tensorflow.core.nativerunning.NativeProcess} user only needs to specify process + * builder, stdin and node identifier the process should be started on and Native Process Manager will make the rest so + * that the specified process will be executed and maintained on the specified node. + */ +package org.apache.ignite.tensorflow.core.nativerunning; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java new file mode 100644 index 0000000..8fc28a5 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.nativerunning.task; + +import java.util.function.Supplier; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.Scanner; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory; + +/** + * Task that starts native process by its specification. + */ +public class NativeProcessStartTask implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 8421398298283116405L; + + /** Native process specification. */ + private final NativeProcess procSpec; + + /** + * Constructs a new instance of native process start task. + * + * @param procSpec Native process specification. + */ + public NativeProcessStartTask(NativeProcess procSpec) { + assert procSpec != null : "Process specification should not be null"; + + this.procSpec = procSpec; + } + + /** {@inheritDoc} */ + @Override public void run() { + Supplier<ProcessBuilder> procBuilderSupplier = procSpec.getProcBuilderSupplier(); + ProcessBuilder procBuilder = procBuilderSupplier.get(); + + Process proc; + try { + proc = procBuilder.start(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + Thread shutdownHook = new Thread(proc::destroy); + Runtime.getRuntime().addShutdownHook(shutdownHook); + + Future<?> outForward = forwardStream(proc.getInputStream(), System.out); + Future<?> errForward = forwardStream(proc.getErrorStream(), System.err); + + try { + if (procSpec.getStdin() != null) { + PrintWriter writer = new PrintWriter(proc.getOutputStream()); + writer.println(procSpec.getStdin()); + writer.flush(); + } + + int status; + try { + status = proc.waitFor(); + } + catch (InterruptedException e) { + proc.destroy(); + status = proc.exitValue(); + } + + Runtime.getRuntime().removeShutdownHook(shutdownHook); + + if (status != 0) + throw new IllegalStateException("Native process exit status is " + status); + } + finally { + outForward.cancel(true); + errForward.cancel(true); + } + } + + /** + * Forwards stream. + * + * @param src Source stream. + * @param dst Destination stream. + * @return Future that allows to interrupt forwarding. + */ + private Future<?> forwardStream(InputStream src, PrintStream dst) { + return Executors + .newSingleThreadExecutor(new CustomizableThreadFactory("NATIVE_PROCESS_FORWARD_STREAM", true)) + .submit(() -> { + Scanner scanner = new Scanner(src); + + while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine()) + dst.println(scanner.nextLine()); + }); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/package-info.java new file mode 100644 index 0000000..4f8f8c5 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains native process task that encapsulates the logic of starting native process. + */ +package org.apache.ignite.tensorflow.core.nativerunning.task; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/package-info.java new file mode 100644 index 0000000..4e50015 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * TensorFlow integration core package that provides infrastructure layers that allows TensorFlow cluster to start and + * be maintained. It provides layer hierarchy. The lowermost layer (long-running process layer) provides API to start + * and maintain abstract long-running processes. The second layer (native-running processes layer) is built on top of + * previous layer and allows to start and maintain native processes. And the third layer (python-running processes + * layer) is responsible for starting and maintaining of Python native processes. + */ +package org.apache.ignite.tensorflow.core; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java new file mode 100644 index 0000000..9ed8b20 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcess.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.pythonrunning; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Python process specification. + */ +public class PythonProcess implements Serializable { + /** */ + private static final long serialVersionUID = -1623536488451695210L; + + /** Stdin of the process. */ + private final String stdin; + + /** Node identifier. */ + private final UUID nodeId; + + /** + * Constructs a new instance of python process. + * + * @param stdin Stdin of the process. + * @param nodeId Node identifier. + */ + public PythonProcess(String stdin, UUID nodeId) { + assert nodeId != null : "Node identifier should not be null"; + + this.stdin = stdin; + this.nodeId = nodeId; + } + + /** */ + public String getStdin() { + return stdin; + } + + /** */ + public UUID getNodeId() { + return nodeId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java new file mode 100644 index 0000000..de35ff9 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.pythonrunning; + +import java.io.Serializable; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.core.ProcessManager; +import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager; + +/** + * Python process manager that allows to start, stop and make other actions with python processes. + */ +public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, PythonProcess> { + /** */ + private static final long serialVersionUID = -7095409565854538504L; + + /** + * Constructs a new instance of python process manager. + * + * @param igniteSupplier Ignite instance supplier. + * @param <T> Type of serializable supplier. + */ + public <T extends Supplier<Ignite> & Serializable> PythonProcessManager(T igniteSupplier) { + this(new NativeProcessManager(igniteSupplier)); + } + + /** + * Constructs a new instance of python process manager. + * + * @param delegate Delegate. + */ + public PythonProcessManager(ProcessManager<NativeProcess> delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override protected NativeProcess transformSpecification(PythonProcess spec) { + return new NativeProcess( + new PythonProcessBuilderSupplier(), + spec.getStdin(), + spec.getNodeId() + ); + } + + /** + * Python process builder supplier that is used to create Python process builder. + */ + private static class PythonProcessBuilderSupplier implements Supplier<ProcessBuilder>, Serializable { + /** */ + private static final long serialVersionUID = 8497087649461965914L; + + /** Python environment variable name. */ + private static final String PYTHON_ENV_NAME = "PYTHON"; + + /** {@inheritDoc} */ + @Override public ProcessBuilder get() { + String python = System.getenv(PYTHON_ENV_NAME); + + if (python == null) + python = "python3"; + + return new ProcessBuilder(python, "-i"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java new file mode 100644 index 0000000..541c047 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * The part of TensorFlow integration infrastructure that allows to start and maintain Python native processes. As + * described in {@link org.apache.ignite.tensorflow.core.pythonrunning.PythonProcess} user only needs to specify Python + * code and identifier of the node the process should be started and Python Process Manager will make the rest so that + * the given code will be executed and maintained on the specified node. + */ +package org.apache.ignite.tensorflow.core.pythonrunning; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/CustomizableThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/CustomizableThreadFactory.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/CustomizableThreadFactory.java new file mode 100644 index 0000000..2de671d --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/CustomizableThreadFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.util; + +import java.util.concurrent.ThreadFactory; +import org.jetbrains.annotations.NotNull; + +/** + * Customizable thread factory that allows to specify thread name and daemon flag for the processes created by this + * factory. + */ +public class CustomizableThreadFactory implements ThreadFactory { + /** Thread name. */ + private final String threadName; + + /** Is daemon flag. */ + private final boolean isDaemon; + + /** + * Constructs a new instance of customizable thread factory. + * + * @param threadName Thread name. + * @param isDaemon Is daemon flag. + */ + public CustomizableThreadFactory(String threadName, boolean isDaemon) { + this.threadName = threadName; + this.isDaemon = isDaemon; + } + + /** {@inheritDoc} */ + @Override public Thread newThread(@NotNull Runnable r) { + Thread thread = new Thread(r); + + thread.setName(threadName); + thread.setDaemon(isDaemon); + + return thread; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/package-info.java new file mode 100644 index 0000000..7e3744f --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Util classes used in other {@link org.apache.ignite.tensorflow.core} classes. + */ +package org.apache.ignite.tensorflow.core.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/package-info.java new file mode 100644 index 0000000..c48553b --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/package-info.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * TensorFlow integration that allows to start and maintain TensorFlow cluster on top of Apache Ignite cluster + * infrastructure. The TensorFlow cluster is built for the specified cache the following way: + * <ol> + * <li>The TensorFlow cluster maintainer is created to maintain the cluster associated with the specified cache so + * that this service works reliable even the node will fail. It's achieved using Ignite Service Grid.</li> + * <li>TensorFlow cluster maintainer finds out that cluster is not started and begins starting procedure.</li> + * <li>TensorFlow cluster resolver builds cluster specification based on the specified cache so that every + * TensorFlow task is associated with partition of the cache and assumed to be started on the node where the + * partition is kept.</li> + * <li>Based on the built cluster specification the set of tasks is sent to the nodes on purpose to start TensorFlow + * servers on the nodes defined in the specification.</li> + * <li>When this set of tasks is completed successfully the started process identifiers are returned and saved + * for future using.</li> + * <li>The starting procedure is completed. In case a server fails the cluster will be turn down and then started + * again by maintainer.</li> + * </ol> + */ +package org.apache.ignite.tensorflow; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/TensorFlowTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/TensorFlowTestSuite.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/TensorFlowTestSuite.java new file mode 100644 index 0000000..4ad5216 --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/TensorFlowTestSuite.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow; + +import org.apache.ignite.tensorflow.core.CoreTestSuite; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Test suite for all module tests. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + CoreTestSuite.class +}) +public class TensorFlowTestSuite { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/CoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/CoreTestSuite.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/CoreTestSuite.java new file mode 100644 index 0000000..92b6dbd --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/CoreTestSuite.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core; + +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcessManagerTest; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessClearTaskTest; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessPingTaskTest; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStartTaskTest; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStopTaskTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Test suite for all tests in core package. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ProcessManagerWrapperTest.class, + LongRunningProcessClearTaskTest.class, + LongRunningProcessPingTaskTest.class, + LongRunningProcessStartTaskTest.class, + LongRunningProcessStopTaskTest.class, + LongRunningProcessManagerTest.class +}) +public class CoreTestSuite { + // No-op. +}