http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java new file mode 100644 index 0000000..3dcd5f8 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.cluster.util; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.Ignition; +import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec; +import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec; +import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner; +import org.apache.ignite.tensorflow.core.util.NativeProcessRunner; + +/** + * Utils class that helps to start and stop user script process. + */ +public class TensorFlowUserScriptRunner extends AsyncNativeProcessRunner { + /** Ignite logger. */ + private final IgniteLogger log; + + /** Job archive that will be extracted and used as working directory for the native process. */ + private final TensorFlowJobArchive jobArchive; + + /** TensorFlow cluster specification. */ + private final TensorFlowClusterSpec clusterSpec; + + /** Output stream data consumer. */ + private final Consumer<String> out; + + /** Error stream data consumer. */ + private final Consumer<String> err; + + /** Working directory of the user script process. */ + private File workingDir; + + /** + * Constructs a new instance of TensorFlow user script runner. + * + * @param ignite Ignite instance. + * @param executor Executor to be used in {@link AsyncNativeProcessRunner}. + * @param jobArchive Job archive that will be extracted and used as working directory for the native process. + * @param clusterSpec TensorFlow cluster specification. + * @param out Output stream data consumer. + * @param err Error stream data consumer. + */ + public TensorFlowUserScriptRunner(Ignite ignite, ExecutorService executor, TensorFlowJobArchive jobArchive, + TensorFlowClusterSpec clusterSpec, Consumer<String> out, Consumer<String> err) { + super(ignite, executor); + + this.log = ignite.log().getLogger(TensorFlowUserScriptRunner.class); + + this.jobArchive = jobArchive; + this.clusterSpec = clusterSpec; + this.out = out; + this.err = err; + } + + /** {@inheritDoc} */ + @Override public NativeProcessRunner doBefore() { + try { + workingDir = Files.createTempDirectory("tf_us_").toFile(); + log.debug("Directory has been created [path=" + workingDir.getAbsolutePath() + "]"); + + unzip(jobArchive.getData(), workingDir); + log.debug("Job archive has been extracted [path=" + workingDir.getAbsolutePath() + "]"); + + return prepareNativeProcessRunner(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override public void doAfter() { + if (workingDir != null) { + delete(workingDir); + log.debug("Directory has been deleted [path=" + workingDir.getAbsolutePath() + "]"); + } + } + + /** + * Prepares process builder and specifies working directory and command to be run. + * + * @return Prepared process builder. + */ + private NativeProcessRunner prepareNativeProcessRunner() { + if (workingDir == null) + throw new IllegalStateException("Working directory is not created"); + + ProcessBuilder procBuilder = new ProcessBuilder(); + + procBuilder.directory(workingDir); + procBuilder.command(jobArchive.getCommands()); + + Map<String, String> env = procBuilder.environment(); + env.put("PYTHONPATH", workingDir.getAbsolutePath()); + env.put("TF_CONFIG", formatTfConfigVar()); + env.put("TF_WORKERS", formatTfWorkersVar()); + env.put("TF_CHIEF_SERVER", formatTfChiefServerVar()); + + return new NativeProcessRunner(procBuilder, null, out, err); + } + + /** + * Formats "TF_CONFIG" variable to be passed into user script. + * + * @return Formatted "TF_CONFIG" variable to be passed into user script. + */ + private String formatTfConfigVar() { + return new StringBuilder() + .append("{\"cluster\" : ") + .append(clusterSpec.format(Ignition.ignite())) + .append(", ") + .append("\"task\": {\"type\" : \"" + TensorFlowClusterResolver.CHIEF_JOB_NAME + "\", \"index\": 0}}") + .toString(); + } + + /** + * Formats "TF_WORKERS" variable to be passed into user script. + * + * @return Formatted "TF_WORKERS" variable to be passed into user script. + */ + private String formatTfWorkersVar() { + StringJoiner joiner = new StringJoiner(", "); + + int cnt = clusterSpec.getJobs().get(TensorFlowClusterResolver.WORKER_JOB_NAME).size(); + for (int i = 0; i < cnt; i++) + joiner.add("\"/job:" + TensorFlowClusterResolver.WORKER_JOB_NAME + "/task:" + i + "\""); + + return "[" + joiner + "]"; + } + + /** + * Formats "TF_CHIEF_SERVER" variable to be passed into user script. + * + * @return Formatted "TF_CHIEF_SERVER" variable to be passed into user script. + */ + private String formatTfChiefServerVar() { + List<TensorFlowServerAddressSpec> tasks = clusterSpec.getJobs().get(TensorFlowClusterResolver.CHIEF_JOB_NAME); + + if (tasks == null || tasks.size() != 1) + throw new IllegalStateException("TensorFlow cluster specification should contain exactly one chief task"); + + TensorFlowServerAddressSpec addrSpec = tasks.iterator().next(); + + return "grpc://" + addrSpec.format(Ignition.ignite()); + } + + /** + * Clears given file or directory recursively. + * + * @param file File or directory to be cleaned, + */ + private void delete(File file) { + if (file.isDirectory()) { + String[] files = file.list(); + + if (files != null && files.length != 0) + for (String fileToBeDeleted : files) + delete(new File(file, fileToBeDeleted)); + + if (!file.delete()) + throw new IllegalStateException("Can't delete directory [path=" + file.getAbsolutePath() + "]"); + } + else { + if (!file.delete()) + throw new IllegalStateException("Can't delete file [path=" + file.getAbsolutePath() + "]"); + } + } + + /** + * Extracts specified zip archive into specified directory. + * + * @param data Zip archive to be extracted. + * @param extractTo Target directory. + */ + private void unzip(byte[] data, File extractTo) { + try (ZipInputStream zipStream = new ZipInputStream(new ByteArrayInputStream(data))) { + ZipEntry entry; + while ((entry = zipStream.getNextEntry()) != null) { + File file = new File(extractTo, entry.getName()); + + if (entry.isDirectory() && !file.exists()) { + boolean created = file.mkdirs(); + if (!created) + throw new IllegalStateException("Can't create directory [path=" + file.getAbsolutePath() + "]"); + } + else { + if (!file.getParentFile().exists()) { + boolean created = file.getParentFile().mkdirs(); + if (!created) + throw new IllegalStateException("Can't create directory [path=" + + file.getParentFile().getAbsolutePath() + "]"); + } + + try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) { + IOUtils.copy(zipStream, out); + } + } + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java index 0ef81bc..c825448 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java @@ -17,18 +17,17 @@ package org.apache.ignite.tensorflow.core; -import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; -import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; /** * Process manager that allows to run and maintain processes in the cluster. * * @param <R> Type of task to be run. */ -public interface ProcessManager<R> extends Serializable { +public interface ProcessManager<R> { /** * Starts the processes by the given specifications. * http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java index b66b54f..4f10e83 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java @@ -17,11 +17,11 @@ package org.apache.ignite.tensorflow.core; -import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; /** * Process manager wrapper that allows to define how one type of process specification should be transformed into @@ -31,9 +31,6 @@ import java.util.UUID; * @param <R> Type of accepted process specifications. */ public abstract class ProcessManagerWrapper<T, R> implements ProcessManager<R> { - /** */ - private static final long serialVersionUID = -6397225095261457524L; - /** Delegate. */ private final ProcessManager<T> delegate; http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java index 027ece3..a25ff97 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.tensorflow.core.longrunning; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,7 +24,6 @@ import java.util.Map; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterGroupEmptyException; @@ -41,22 +39,18 @@ import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProces * Long running process manager that allows to start, stop and make other actions with long running processes. */ public class LongRunningProcessManager implements ProcessManager<LongRunningProcess> { - /** */ - private static final long serialVersionUID = 1151455641358063287L; - - /** Ignite instance supplier. */ - private final Supplier<Ignite> igniteSupplier; + /** Ignite instance. */ + private final Ignite ignite; /** * Constructs a new instance of long running process manager. * - * @param igniteSupplier Ignite instance supplier. - * @param <T> Type of serializable supplier. + * @param ignite Ignite instance. */ - public <T extends Supplier<Ignite> & Serializable> LongRunningProcessManager(T igniteSupplier) { - assert igniteSupplier != null : "Ignite supplier should not be null"; + public LongRunningProcessManager(Ignite ignite) { + assert ignite != null : "Ignite instance should not be null"; - this.igniteSupplier = igniteSupplier; + this.ignite = ignite; } /** {@inheritDoc} */ @@ -100,7 +94,6 @@ public class LongRunningProcessManager implements ProcessManager<LongRunningProc List<T> nodeProcesses = params.get(nodeId); LongRunningProcessTask<List<E>> task = taskSupplier.apply(nodeProcesses); - Ignite ignite = igniteSupplier.get(); ClusterGroup clusterGrp = ignite.cluster().forNodeId(nodeId); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java index 1d08519..04f90d3 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java @@ -17,13 +17,13 @@ package org.apache.ignite.tensorflow.core.longrunning.task; -import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess; import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory; /** @@ -78,7 +78,7 @@ public class LongRunningProcessStartTask extends LongRunningProcessTask<List<UUI */ private Future<?> runTask(Runnable task) { return Executors - .newSingleThreadExecutor(new CustomizableThreadFactory("LONG_RUNNING_PROCESS_TASK", true)) + .newSingleThreadExecutor(new CustomizableThreadFactory("tf-long-running", true)) .submit(task); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java index df36ba9..2ad3c9d 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java @@ -20,6 +20,7 @@ package org.apache.ignite.tensorflow.core.nativerunning; import java.io.Serializable; import java.util.UUID; import java.util.function.Supplier; +import org.apache.ignite.tensorflow.util.SerializableSupplier; /** * Native process specification. @@ -29,7 +30,7 @@ public class NativeProcess implements Serializable { private static final long serialVersionUID = -7056800139746134956L; /** Process builder supplier. */ - private final Supplier<ProcessBuilder> procBuilderSupplier; + private final SerializableSupplier<ProcessBuilder> procBuilderSupplier; /** Stdin of the process. */ private final String stdin; @@ -44,8 +45,7 @@ public class NativeProcess implements Serializable { * @param stdin Stdin of the process. * @param nodeId Node identifier. */ - public <T extends Supplier<ProcessBuilder> & Serializable> NativeProcess(T procBuilderSupplier, String stdin, - UUID nodeId) { + public NativeProcess(SerializableSupplier<ProcessBuilder> procBuilderSupplier, String stdin, UUID nodeId) { assert procBuilderSupplier != null : "Process builder supplier should not be null"; assert nodeId != null : "Node identifier should not be null"; http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java index 60cd89b..5accf3d 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java @@ -17,8 +17,6 @@ package org.apache.ignite.tensorflow.core.nativerunning; -import java.io.Serializable; -import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.tensorflow.core.ProcessManager; import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; @@ -30,17 +28,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.task.NativeProcessStartTa * Native process manager that allows to start, stop and make other actions with native processes. */ public class NativeProcessManager extends ProcessManagerWrapper<LongRunningProcess, NativeProcess> { - /** */ - private static final long serialVersionUID = 718119807915504045L; - /** * Constructs a new native process manager. * - * @param igniteSupplier Ignite instance supplier. - * @param <T> Type of serializable supplier. + * @param ignite Ignite instance. */ - public <T extends Supplier<Ignite> & Serializable> NativeProcessManager(T igniteSupplier) { - super(new LongRunningProcessManager(igniteSupplier)); + public NativeProcessManager(Ignite ignite) { + super(new LongRunningProcessManager(ignite)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java index 8fc28a5..ae9e2b9 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java @@ -18,16 +18,11 @@ package org.apache.ignite.tensorflow.core.nativerunning.task; import java.util.function.Supplier; -import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.util.Scanner; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.Ignition; import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory; +import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess; +import org.apache.ignite.tensorflow.core.util.NativeProcessRunner; /** * Task that starts native process by its specification. @@ -55,62 +50,26 @@ public class NativeProcessStartTask implements IgniteRunnable { Supplier<ProcessBuilder> procBuilderSupplier = procSpec.getProcBuilderSupplier(); ProcessBuilder procBuilder = procBuilderSupplier.get(); - Process proc; - try { - proc = procBuilder.start(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - Thread shutdownHook = new Thread(proc::destroy); - Runtime.getRuntime().addShutdownHook(shutdownHook); + NativeProcessRunner procRunner = new NativeProcessRunner( + procBuilder, + procSpec.getStdin(), + System.out::println, + System.err::println + ); - Future<?> outForward = forwardStream(proc.getInputStream(), System.out); - Future<?> errForward = forwardStream(proc.getErrorStream(), System.err); + IgniteLogger log = Ignition.ignite().log().getLogger(NativeProcessStartTask.class); try { - if (procSpec.getStdin() != null) { - PrintWriter writer = new PrintWriter(proc.getOutputStream()); - writer.println(procSpec.getStdin()); - writer.flush(); - } - - int status; - try { - status = proc.waitFor(); - } - catch (InterruptedException e) { - proc.destroy(); - status = proc.exitValue(); - } - - Runtime.getRuntime().removeShutdownHook(shutdownHook); - - if (status != 0) - throw new IllegalStateException("Native process exit status is " + status); + log.debug("Starting native process"); + procRunner.startAndWait(); + log.debug("Native process completed"); } - finally { - outForward.cancel(true); - errForward.cancel(true); + catch (InterruptedException e) { + log.debug("Native process interrupted"); + } + catch (Exception e) { + log.error("Native process failed", e); + throw e; } - } - - /** - * Forwards stream. - * - * @param src Source stream. - * @param dst Destination stream. - * @return Future that allows to interrupt forwarding. - */ - private Future<?> forwardStream(InputStream src, PrintStream dst) { - return Executors - .newSingleThreadExecutor(new CustomizableThreadFactory("NATIVE_PROCESS_FORWARD_STREAM", true)) - .submit(() -> { - Scanner scanner = new Scanner(src); - - while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine()) - dst.println(scanner.nextLine()); - }); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java new file mode 100644 index 0000000..e59ab00 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.pythonrunning; + +import org.apache.ignite.tensorflow.util.SerializableSupplier; + +/** + * Python process builder supplier that is used to create Python process builder. + */ +public class PythonProcessBuilderSupplier implements SerializableSupplier<ProcessBuilder> { + /** */ + private static final long serialVersionUID = 7181937306294456125L; + + /** Python environment variable name. */ + private static final String PYTHON_ENV_NAME = "PYTHON"; + + /** Interactive flag (allows to used standard input to pass Python script). */ + private final boolean interactive; + + /** + * Constructs a new instance of Python process builder supplier. + * + * @param interactive Interactive flag (allows to used standard input to pass Python script). + */ + public PythonProcessBuilderSupplier(boolean interactive) { + this.interactive = interactive; + } + + /** + * Returns process builder to be used to start Python process. + * + * @return Process builder to be used to start Python process. + */ + public ProcessBuilder get() { + String python = System.getenv(PYTHON_ENV_NAME); + + if (python == null) + python = "python3"; + + return interactive ? new ProcessBuilder(python, "-i") : new ProcessBuilder(python); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java index de35ff9..1f6c11e 100644 --- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java @@ -17,8 +17,6 @@ package org.apache.ignite.tensorflow.core.pythonrunning; -import java.io.Serializable; -import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.tensorflow.core.ProcessManager; import org.apache.ignite.tensorflow.core.ProcessManagerWrapper; @@ -29,17 +27,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager; * Python process manager that allows to start, stop and make other actions with python processes. */ public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, PythonProcess> { - /** */ - private static final long serialVersionUID = -7095409565854538504L; - /** * Constructs a new instance of python process manager. * - * @param igniteSupplier Ignite instance supplier. - * @param <T> Type of serializable supplier. + * @param ignite Ignite instance. */ - public <T extends Supplier<Ignite> & Serializable> PythonProcessManager(T igniteSupplier) { - this(new NativeProcessManager(igniteSupplier)); + public PythonProcessManager(Ignite ignite) { + this(new NativeProcessManager(ignite)); } /** @@ -54,30 +48,9 @@ public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, P /** {@inheritDoc} */ @Override protected NativeProcess transformSpecification(PythonProcess spec) { return new NativeProcess( - new PythonProcessBuilderSupplier(), + new PythonProcessBuilderSupplier(true), spec.getStdin(), spec.getNodeId() ); } - - /** - * Python process builder supplier that is used to create Python process builder. - */ - private static class PythonProcessBuilderSupplier implements Supplier<ProcessBuilder>, Serializable { - /** */ - private static final long serialVersionUID = 8497087649461965914L; - - /** Python environment variable name. */ - private static final String PYTHON_ENV_NAME = "PYTHON"; - - /** {@inheritDoc} */ - @Override public ProcessBuilder get() { - String python = System.getenv(PYTHON_ENV_NAME); - - if (python == null) - python = "python3"; - - return new ProcessBuilder(python, "-i"); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java new file mode 100644 index 0000000..b336b97 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; + +/** + * Asynchronous native process runner. + */ +public abstract class AsyncNativeProcessRunner { + /** Ignite logger. */ + private final IgniteLogger log; + + /** Executors that is used to start async native process. */ + private final ExecutorService executor; + + /** Future of the async process process. */ + private Future<?> fut; + + /** + * Constructs a new asynchronous native process runner. + * + * @param ignite Ignite instance. + * @param executor Executor. + */ + public AsyncNativeProcessRunner(Ignite ignite, ExecutorService executor) { + this.log = ignite.log().getLogger(AsyncNativeProcessRunner.class); + this.executor = executor; + } + + /** + * Method that should be called before starting the process. + * + * @return Prepared native process runner. + */ + public abstract NativeProcessRunner doBefore(); + + /** + * Method that should be called after starting the process. + */ + public abstract void doAfter(); + + /** + * Starts the process in separate thread. + */ + public synchronized void start() { + if (fut != null) + throw new IllegalStateException("Async native process has already been started"); + + NativeProcessRunner procRunner = doBefore(); + + fut = executor.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + log.debug("Starting native process"); + procRunner.startAndWait(); + log.debug("Native process completed"); + break; + } + catch (InterruptedException e) { + log.debug("Native process interrupted"); + break; + } + catch (Exception e) { + log.error("Native process failed", e); + } + } + + doAfter(); + }); + } + + /** + * Stops the process. + */ + public synchronized void stop() { + if (fut != null && !fut.isDone()) + fut.cancel(true); + } + + /** + * Checks if process is already completed. + * + * @return {@code true} if process completed, otherwise {@code false}. + */ + public boolean isCompleted() { + return fut != null && fut.isDone(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java new file mode 100644 index 0000000..38af26d --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.Scanner; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Utils class that helps to start native processes. + */ +public class NativeProcessRunner { + /** Thread name to be used by threads that forward streams. */ + private static final String NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME = "tf-forward-native-output"; + + /** Process builder. */ + private final ProcessBuilder procBuilder; + + /** Standard input of the process. */ + private final String stdin; + + /** Output stream data consumer. */ + private final Consumer<String> out; + + /** Error stream data consumer. */ + private final Consumer<String> err; + + /** + * Constructs a new instance of native process runner. + * + * @param procBuilder Process builder. + * @param stdin Standard input of the process. + * @param out Output stream data consumer. + * @param err Error stream data consumer. + */ + public NativeProcessRunner(ProcessBuilder procBuilder, String stdin, Consumer<String> out, Consumer<String> err) { + this.procBuilder = procBuilder; + this.stdin = stdin; + this.out = out; + this.err = err; + } + + /** + * Starts the native process and waits it to be completed successfully or with exception. + */ + public void startAndWait() throws InterruptedException { + Process proc; + try { + proc = procBuilder.start(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + AtomicBoolean shutdown = new AtomicBoolean(); + + Thread shutdownHook = new Thread(() -> { + shutdown.set(true); + proc.destroy(); + }); + + Runtime.getRuntime().addShutdownHook(shutdownHook); + + Future<?> outForward = forwardStream(proc.getInputStream(), out); + Future<?> errForward = forwardStream(proc.getErrorStream(), err); + + try { + if (stdin != null) { + PrintWriter writer = new PrintWriter(proc.getOutputStream()); + writer.println(stdin); + writer.flush(); + } + + int status; + try { + status = proc.waitFor(); + } + catch (InterruptedException e) { + proc.destroy(); + throw e; + } + + if (!shutdown.get()) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + + if (status != 0) + throw new IllegalStateException("Native process exit [status=" + status + "]"); + } + } + finally { + outForward.cancel(true); + errForward.cancel(true); + } + } + + /** + * Forwards stream. + * + * @param src Source stream. + * @param dst Destination stream. + * @return Future that allows to interrupt forwarding. + */ + private Future<?> forwardStream(InputStream src, Consumer<String> dst) { + return Executors + .newSingleThreadExecutor(new CustomizableThreadFactory(NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME, true)) + .submit(() -> { + Scanner scanner = new Scanner(src); + + while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine()) + dst.accept(scanner.nextLine()); + }); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java new file mode 100644 index 0000000..0a7cae6 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter; + +import org.apache.ignite.tensorflow.submitter.command.RootCommand; +import picocli.CommandLine; + +/** + * Main class of the job submitter application that allows to submit TensorFlow jobs to be run within Ignite cluster. + */ +public class JobSubmitter { + /** + * Main method. + * + * @param args Arguments. + */ + public static void main(String... args) { + CommandLine.run(new RootCommand(), System.out, args); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java new file mode 100644 index 0000000..4d2fc18 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.logger.slf4j.Slf4jLogger; +import picocli.CommandLine; + +/** + * Abstract command that contains options common for all commands. + */ +public abstract class AbstractCommand implements Runnable { + /** Ignite node configuration path. */ + @CommandLine.Option(names = { "-c", "--config" }, description = "Apache Ignite client configuration.") + protected String cfg; + + /** + * Returns Ignite instance based on configuration specified in {@link #cfg} field. + * + * @return Ignite instance. + */ + protected Ignite getIgnite() { + if (cfg != null) + return Ignition.start(cfg); + else { + IgniteConfiguration igniteCfg = new IgniteConfiguration(); + igniteCfg.setGridLogger(new Slf4jLogger()); + igniteCfg.setClientMode(true); + + return Ignition.start(igniteCfg); + } + } + + /** */ + public void setCfg(String cfg) { + this.cfg = cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java new file mode 100644 index 0000000..946aa08 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager; +import picocli.CommandLine; + +/** + * Command "attach" that is used to attach to running TensorFlow cluster and receive output of the user script. + */ +@CommandLine.Command( + name = "attach", + description = "Attaches to running TensorFlow cluster (user script process).", + mixinStandardHelpOptions = true +) +public class AttachCommand extends AbstractCommand { + /** TensorFlow cluster identifier. */ + @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.") + private UUID clusterId; + + /** {@inheritDoc} */ + @Override public void run() { + try (Ignite ignite = getIgnite()) { + TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite); + + mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println); + } + } + + /** */ + public void setClusterId(UUID clusterId) { + this.clusterId = clusterId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java new file mode 100644 index 0000000..0538496 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.cluster.TensorFlowCluster; +import org.apache.ignite.tensorflow.cluster.TensorFlowClusterManager; +import picocli.CommandLine; + +/** + * Command "ps" that is used to print identifiers of all running TensorFlow clusters. + */ +@CommandLine.Command( + name = "ps", + description = "Prints identifiers of all running TensorFlow clusters.", + mixinStandardHelpOptions = true +) +public class PsCommand extends AbstractCommand { + /** {@inheritDoc} */ + @Override public void run() { + try (Ignite ignite = getIgnite()) { + TensorFlowClusterManager mgr = new TensorFlowClusterManager(ignite); + + Map<UUID, TensorFlowCluster> clusters = mgr.getAllClusters(); + + for (UUID clusterId : clusters.keySet()) + System.out.println(clusterId); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java new file mode 100644 index 0000000..508ea7b --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import picocli.CommandLine; + +/** + * Root command that aggregates all sub commands. + */ +@CommandLine.Command( + name = "ignite-tf", + description = "Apache Ignite and TensorFlow integration command line tool that allows to start, maintain and" + + " stop distributed deep learning utilizing Apache Ignite infrastructure and data.", + subcommands = { + StartCommand.class, + StopCommand.class, + AttachCommand.class, + PsCommand.class + }, + mixinStandardHelpOptions = true +) +public class RootCommand extends AbstractCommand { + /** {@inheritDoc} */ + @Override public void run() { + CommandLine.usage(this, System.out); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java new file mode 100644 index 0000000..082b363 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager; +import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive; +import picocli.CommandLine; + +/** + * Command "start" that is used to start a new TensorFlow cluster on top of Apache Ignite. + */ +@CommandLine.Command( + name = "start", + description = "Starts a new TensorFlow cluster and attaches to user script process.", + mixinStandardHelpOptions = true +) +public class StartCommand extends AbstractCommand { + /** Upstream cache name. */ + @CommandLine.Parameters(index = "0", paramLabel = "CACHE_NAME", description = "Upstream cache name.") + private String cacheName; + + /** Job folder or archive. */ + @CommandLine.Parameters(index = "1", paramLabel = "JOB_DIR", description = "Job folder (or zip archive).") + private String jobFolder; + + /** Job command to be executed in cluster. */ + @CommandLine.Parameters(index = "2", paramLabel = "JOB_CMD", description = "Job command.") + private String jobCmd; + + /** Arguments of a job command to be executed in cluster. */ + @CommandLine.Parameters(index = "3..*", paramLabel = "JOB_ARGS", description = "Job arguments.") + private String[] jobArguments; + + /** {@inheritDoc} */ + @Override public void run() { + try (Ignite ignite = getIgnite()) { + UUID clusterId = UUID.randomUUID(); + String[] commands = new String[jobArguments.length + 1]; + commands[0] = jobCmd; + System.arraycopy(jobArguments, 0, commands, 1, commands.length - 1); + + TensorFlowJobArchive jobArchive = new TensorFlowJobArchive( + cacheName, + zip(jobFolder), + commands + ); + + TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite); + mgr.createCluster(clusterId, jobArchive); + + mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Archives specified folder or file into zip archive. + * + * @param jobArchivePath Path to folder to be archived. + * @return Byte array representing zip archive. + * @throws IOException In case of input/output exception. + */ + private byte[] zip(String jobArchivePath) throws IOException { + Path path = Paths.get(jobArchivePath); + File file = path.toFile(); + + if (!file.exists()) + throw new IllegalArgumentException("File doesn't exist [name=" + jobArchivePath + "]"); + + if (file.isDirectory()) + return zipDirectory(file); + else if (jobArchivePath.endsWith(".zip")) + return zipArchive(file); + else + return zipFile(file); + } + + /** + * Archives specified folder into zip archive. + * + * @param dir Directory to be archived. + * @return Byte array representing zip archive. + * @throws IOException In case of input/output exception. + */ + private byte[] zipDirectory(File dir) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (ZipOutputStream zipFile = new ZipOutputStream(baos)) { + compressDirectoryToZip(dir.getAbsolutePath(), dir.getAbsolutePath(), zipFile); + } + + return baos.toByteArray(); + } + + /** + * Archives specified file into zip archive. + * + * @param file File to be archived. + * @return Byte array representing zip archive. + * @throws IOException In case of input/output exception. + */ + private byte[] zipFile(File file) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (ZipOutputStream zos = new ZipOutputStream(baos)) { + ZipEntry entry = new ZipEntry(file.getName()); + zos.putNextEntry(entry); + + try (FileInputStream in = new FileInputStream(file.getAbsolutePath())) { + IOUtils.copy(in, zos); + } + } + + return baos.toByteArray(); + } + + /** + * Reads zip archive into byte array and returns this array. + * + * @param file Archive to be read. + * @return Byte array representing zip archive. + * @throws IOException In case of input/output exception. + */ + private byte[] zipArchive(File file) throws IOException { + try (FileInputStream fis = new FileInputStream(file)) { + return IOUtils.toByteArray(fis); + } + } + + /** + * Archives specified folder into zip output stream. + * + * @param rootDir Root directory. + * @param srcDir Source directory. + * @param out Zip output stream. + * @throws IOException In case of input/output exception. + */ + private void compressDirectoryToZip(String rootDir, String srcDir, ZipOutputStream out) throws IOException { + File[] files = new File(srcDir).listFiles(); + + if (files != null) { + for (File file : files) { + if (file.isDirectory()) + compressDirectoryToZip(rootDir, srcDir + File.separator + file.getName(), out); + else { + ZipEntry entry = new ZipEntry(srcDir.replace(rootDir, "") + + File.separator + file.getName()); + out.putNextEntry(entry); + + try (FileInputStream in = new FileInputStream(srcDir + File.separator + file.getName())) { + IOUtils.copy(in, out); + } + } + } + } + } + + /** */ + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** */ + public void setJobFolder(String jobFolder) { + this.jobFolder = jobFolder; + } + + /** */ + public void setJobCmd(String jobCmd) { + this.jobCmd = jobCmd; + } + + /** */ + public void setJobArguments(String[] jobArguments) { + this.jobArguments = jobArguments; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java new file mode 100644 index 0000000..8890370 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.submitter.command; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager; +import picocli.CommandLine; + +/** + * Command "stop" that is used to stop TensorFlow cluster. + */ +@CommandLine.Command( + name = "stop", + description = "Stops a running TensorFlow cluster.", + mixinStandardHelpOptions = true +) +public class StopCommand extends AbstractCommand { + /** Cluster identifier. */ + @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.") + private UUID clusterId; + + /** {@inheritDoc} */ + @Override public void run() { + try (Ignite ignite = getIgnite()) { + TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite); + mgr.stopClusterIfExists(clusterId); + } + } + + /** */ + public void setClusterId(UUID clusterId) { + this.clusterId = clusterId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java new file mode 100644 index 0000000..7949feb --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * This package contains commands that command line tool provides. Pico CLI is used to make these commands maintainable. + */ +package org.apache.ignite.tensorflow.submitter.command; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java new file mode 100644 index 0000000..8288b16 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * This package contains classes that allow to use command line interface to submit jobs into TensorFlow in Apache + * Ignite infrastructure. + */ +package org.apache.ignite.tensorflow.submitter; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java new file mode 100644 index 0000000..ece58aa --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.util; + +import java.io.Serializable; +import java.util.function.Consumer; + +/** + * Serializable consumer. + * + * @param <T> The type of the input to the operation. + */ +public interface SerializableConsumer<T> extends Consumer<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java new file mode 100644 index 0000000..768dbe1 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.util; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Serializable supplier. + * + * @param <T> The type of results supplied by this supplier. + */ +public interface SerializableSupplier<T> extends Supplier<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java new file mode 100644 index 0000000..8ed43c3 --- /dev/null +++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Util classes used in {@link org.apache.ignite.tensorflow} package. + */ +package org.apache.ignite.tensorflow.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/ignite-tf.sh ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/sh/ignite-tf.sh b/modules/tensorflow/src/main/sh/ignite-tf.sh new file mode 100755 index 0000000..fd3e02c --- /dev/null +++ b/modules/tensorflow/src/main/sh/ignite-tf.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )" +java -Xmx4G -DIGNITE_QUIET=false -cp "$SCRIPT_PATH:$SCRIPT_PATH/lib/*" org.apache.ignite.tensorflow.submitter.JobSubmitter "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/logback.xml ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/main/sh/logback.xml b/modules/tensorflow/src/main/sh/logback.xml new file mode 100644 index 0000000..816b5e6 --- /dev/null +++ b/modules/tensorflow/src/main/sh/logback.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- + Logback configuration file. +--> +<configuration> + + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>ignite-tf.log</file> + <append>false</append> + <encoder> + <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <root level="warn"> + <appender-ref ref="FILE" /> + </root> + +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java index 7d917e7..faa2b6b 100644 --- a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java @@ -17,13 +17,11 @@ package org.apache.ignite.tensorflow.core.longrunning; -import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; @@ -67,7 +65,7 @@ public class LongRunningProcessManagerTest { List<LongRunningProcess> list = Collections.singletonList(new LongRunningProcess(nodeId, () -> {})); - LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + LongRunningProcessManager mgr = new LongRunningProcessManager(ignite); Map<UUID, List<UUID>> res = mgr.start(list); assertEquals(1, res.size()); @@ -97,7 +95,7 @@ public class LongRunningProcessManagerTest { Map<UUID, List<UUID>> procIds = new HashMap<>(); procIds.put(nodeId, Collections.singletonList(procId)); - LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + LongRunningProcessManager mgr = new LongRunningProcessManager(ignite); Map<UUID, List<LongRunningProcessStatus>> res = mgr.ping(procIds); assertEquals(1, res.size()); @@ -127,7 +125,7 @@ public class LongRunningProcessManagerTest { Map<UUID, List<UUID>> procIds = new HashMap<>(); procIds.put(nodeId, Collections.singletonList(procId)); - LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + LongRunningProcessManager mgr = new LongRunningProcessManager(ignite); Map<UUID, List<LongRunningProcessStatus>> res = mgr.stop(procIds, true); assertEquals(1, res.size()); @@ -157,7 +155,7 @@ public class LongRunningProcessManagerTest { Map<UUID, List<UUID>> procIds = new HashMap<>(); procIds.put(nodeId, Collections.singletonList(procId)); - LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + LongRunningProcessManager mgr = new LongRunningProcessManager(ignite); Map<UUID, List<LongRunningProcessStatus>> res = mgr.clear(procIds); assertEquals(1, res.size());