NIFI-145

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/64657049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/64657049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/64657049

Branch: refs/heads/develop
Commit: 646570490c530d0c076c9bd9b7d1170946a9dae8
Parents: cb63c66
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Dec 9 12:18:35 2014 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Dec 9 12:18:35 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/BootstrapListener.java | 229 ++++++++++++++++
 .../src/main/java/org/apache/nifi/NiFi.java     |  25 ++
 nifi-bootstrap/pom.xml                          |  17 +-
 .../apache/nifi/bootstrap/BootstrapCodec.java   |  89 ++++++
 .../org/apache/nifi/bootstrap/NiFiListener.java | 116 ++++++++
 .../java/org/apache/nifi/bootstrap/RunNiFi.java | 270 +++++++++++++++++--
 .../org/apache/nifi/bootstrap/ShutdownHook.java |  59 +++-
 .../exception/InvalidCommandException.java      |  37 +++
 8 files changed, 824 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
new file mode 100644
index 0000000..3bcbeb3
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BootstrapListener {
+       private static final Logger logger = 
LoggerFactory.getLogger(BootstrapListener.class);
+       
+       private final NiFi nifi;
+       private final int bootstrapPort;
+
+       private Listener listener;
+       private ServerSocket serverSocket;
+       
+       
+       public BootstrapListener(final NiFi nifi, final int port) {
+               this.nifi = nifi;
+               this.bootstrapPort = port;
+       }
+       
+       public void start() throws IOException {
+               logger.debug("Starting Bootstrap Listener to communicate with 
Bootstrap Port {}", bootstrapPort);
+               
+               serverSocket = new ServerSocket();
+               serverSocket.bind(new InetSocketAddress("localhost", 0));
+               
+               final int localPort = serverSocket.getLocalPort();
+               logger.info("Started Bootstrap Listener, Listening for incoming 
requests on port {}", localPort);
+               
+               listener = new Listener(serverSocket);
+               final Thread listenThread = new Thread(listener);
+               listenThread.setName("Listen to Bootstrap");
+               listenThread.start();
+               
+               logger.debug("Notifying Bootstrap that local port is {}", 
localPort);
+               try (final Socket socket = new Socket()) {
+                       socket.setSoTimeout(60000);
+                       socket.connect(new InetSocketAddress("localhost", 
bootstrapPort));
+                       socket.setSoTimeout(60000);
+                       
+                       final OutputStream out = socket.getOutputStream();
+                       out.write(("PORT " + localPort + 
"\n").getBytes(StandardCharsets.UTF_8));
+                       out.flush();
+                       
+                       logger.debug("Awaiting response from Bootstrap...");
+                       final BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
+                       final String response = reader.readLine();
+                       if ("OK".equals(response)) {
+                               logger.info("Successfully initiated 
communication with Bootstrap");
+                       } else {
+                               logger.error("Failed to communicate with 
Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
+                       }
+               }
+       }
+       
+       
+       public void stop() {
+               if (listener != null) {
+                       listener.stop();
+               }
+       }
+       
+       private class Listener implements Runnable {
+               private final ServerSocket serverSocket;
+               private final ExecutorService executor;
+               private volatile boolean stopped = false;
+               
+               public Listener(final ServerSocket serverSocket) {
+                       this.serverSocket = serverSocket;
+                       this.executor = Executors.newFixedThreadPool(2);
+               }
+               
+               public void stop() {
+                       stopped = true;
+                       
+                       executor.shutdownNow();
+                       
+                       try {
+                               serverSocket.close();
+                       } catch (final IOException ioe) {
+                               // nothing to really do here. we could log 
this, but it would just become
+                               // confusing in the logs, as we're shutting 
down and there's no real benefit
+                       }
+               }
+               
+               @Override
+               public void run() {
+                       while (!serverSocket.isClosed()) {
+                               try {
+                                       if ( stopped ) {
+                                               return;
+                                       }
+                                       
+                                       final Socket socket;
+                                       try {
+                                               socket = serverSocket.accept();
+                                       } catch (final IOException ioe) {
+                                               if ( stopped ) {
+                                                       return;
+                                               }
+                                               
+                                               throw ioe;
+                                       }
+                                       
+                                       executor.submit(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               final 
BootstrapRequest request = readRequest(socket.getInputStream());
+                                                               final 
BootstrapRequest.RequestType requestType = request.getRequestType();
+                                                               
+                                                               switch 
(requestType) {
+                                                                       case 
PING:
+                                                                               
logger.debug("Received PING request from Bootstrap; responding");
+                                                                               
echoPing(socket.getOutputStream());
+                                                                               
logger.debug("Responded to PING request from Bootstrap");
+                                                                               
break;
+                                                                       case 
SHUTDOWN:
+                                                                               
logger.info("Received SHUTDOWN request from Bootstrap");
+                                                                               
echoShutdown(socket.getOutputStream());
+                                                                               
nifi.shutdownHook();
+                                                                               
return;
+                                                               }
+                                                       } catch (final 
Throwable t) {
+                                                               
logger.error("Failed to process request from Bootstrap due to " + t.toString(), 
t);
+                                                       } finally {
+                                                               try {
+                                                                       
socket.close();
+                                                               } catch (final 
IOException ioe) {
+                                                                       
logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
+                                                               }
+                                                       }
+                                               }
+                                       });
+                               } catch (final Throwable t) {
+                                       logger.error("Failed to process request 
from Bootstrap due to " + t.toString(), t);
+                               }
+                       }
+               }
+       }
+       
+       
+       private void echoPing(final OutputStream out) throws IOException {
+               out.write("PING\n".getBytes(StandardCharsets.UTF_8));
+               out.flush();
+       }
+       
+       private void echoShutdown(final OutputStream out) throws IOException {
+               out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+               out.flush();
+       }
+       
+       private BootstrapRequest readRequest(final InputStream in) throws 
IOException {
+               final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+               
+               final String line = reader.readLine();
+               final String[] splits = line.split(" ");
+               if ( splits.length < 0 ) {
+                       throw new IOException("Received invalid command from 
NiFi: " + line);
+               }
+               
+               final String requestType = splits[0];
+               final String[] args;
+               if ( splits.length == 1 ) {
+                       args = new String[0];
+               } else {
+                       args = Arrays.copyOfRange(splits, 1, splits.length);
+               }
+               
+               try {
+                       return new BootstrapRequest(requestType, args);
+               } catch (final Exception e) {
+                       throw new IOException("Received invalid request from 
bootstrap; request type = " + requestType);
+               }
+       }
+       
+       
+       private static class BootstrapRequest {
+               public static enum RequestType {
+                       SHUTDOWN,
+                       PING;
+               }
+               
+               private final RequestType requestType;
+               private final String[] args;
+               
+               public BootstrapRequest(final String request, final String[] 
args) {
+                       this.requestType = RequestType.valueOf(request);
+                       this.args = args;
+               }
+               
+               public RequestType getRequestType() {
+                       return requestType;
+               }
+               
+               public String[] getArgs() {
+                       return args;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
index 5fd1a13..bf50a21 100644
--- 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -45,6 +45,9 @@ public class NiFi {
 
     private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
     private final NiFiServer nifiServer;
+    private final BootstrapListener bootstrapListener;
+    
+    public static final String BOOTSTRAP_PORT_PROPERTY = 
"nifi.bootstrap.listen.port";
 
     public NiFi(final NiFiProperties properties) throws 
ClassNotFoundException, IOException, NoSuchMethodException, 
InstantiationException, IllegalAccessException, IllegalArgumentException, 
InvocationTargetException {
         Thread.setDefaultUncaughtExceptionHandler(new 
UncaughtExceptionHandler() {
@@ -65,6 +68,25 @@ public class NiFi {
             }
         }));
 
+        final String bootstrapPort = 
System.getProperty(BOOTSTRAP_PORT_PROPERTY);
+        if ( bootstrapPort != null ) {
+               try {
+                       final int port = Integer.parseInt(bootstrapPort);
+                       
+                       if (port < 1 || port > 65535) {
+                               throw new RuntimeException("Failed to start 
NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid 
integer in the range 1 - 65535");
+                       }
+                       
+                       bootstrapListener = new BootstrapListener(this, port);
+                       bootstrapListener.start();
+               } catch (final NumberFormatException nfe) {
+                       throw new RuntimeException("Failed to start NiFi 
because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid 
integer in the range 1 - 65535");
+               }
+        } else {
+               logger.info("NiFi started without Bootstrap Port information 
provided; will not listen for requests from Bootstrap");
+               bootstrapListener = null;
+        }
+        
         // delete the web working dir - if the application does not start 
successfully
         // the web app directories might be in an invalid state. when this 
happens
         // jetty will not attempt to re-extract the war into the directory. by 
removing
@@ -115,6 +137,9 @@ public class NiFi {
             if (nifiServer != null) {
                 nifiServer.stop();
             }
+            if (bootstrapListener != null) {
+               bootstrapListener.stop();
+            }
             logger.info("Jetty web server shutdown completed (nicely or 
otherwise).");
         } catch (final Throwable t) {
             logger.warn("Problem occured ensuring Jetty web server was 
properly terminated due to " + t);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/pom.xml b/nifi-bootstrap/pom.xml
index b620c84..a992018 100644
--- a/nifi-bootstrap/pom.xml
+++ b/nifi-bootstrap/pom.xml
@@ -1,5 +1,18 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
        <modelVersion>4.0.0</modelVersion>
 
        <parent>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
new file mode 100644
index 0000000..8138c02
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+
+import org.apache.nifi.bootstrap.exception.InvalidCommandException;
+
+public class BootstrapCodec {
+       private final RunNiFi runner;
+       private final BufferedReader reader;
+       private final BufferedWriter writer;
+       
+       public BootstrapCodec(final RunNiFi runner, final InputStream in, final 
OutputStream out) {
+               this.runner = runner;
+               this.reader = new BufferedReader(new InputStreamReader(in));
+               this.writer = new BufferedWriter(new OutputStreamWriter(out));
+       }
+       
+       public void communicate() throws IOException {
+               final String line = reader.readLine();
+               final String[] splits = line.split(" ");
+               if ( splits.length < 0 ) {
+                       throw new IOException("Received invalid command from 
NiFi: " + line);
+               }
+               
+               final String cmd = splits[0];
+               final String[] args;
+               if ( splits.length == 1 ) {
+                       args = new String[0];
+               } else {
+                       args = Arrays.copyOfRange(splits, 1, splits.length);
+               }
+               
+               try {
+                       processRequest(cmd, args);
+               } catch (final InvalidCommandException ice) {
+                       throw new IOException("Received invalid command from 
NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + 
ice.toString());
+               }
+       }
+
+       private void processRequest(final String cmd, final String[] args) 
throws InvalidCommandException, IOException {
+               switch (cmd) {
+                       case "PORT": {
+                               if ( args.length != 1 ) {
+                                       throw new InvalidCommandException();
+                               }
+                               
+                               final int port;
+                               try {
+                                       port = Integer.parseInt( args[0] );
+                               } catch (final NumberFormatException nfe) {
+                                       throw new 
InvalidCommandException("Invalid Port number; should be integer between 1 and 
65535");
+                               }
+                               
+                               if ( port < 1 || port > 65535 ) {
+                                       throw new 
InvalidCommandException("Invalid Port number; should be integer between 1 and 
65535");
+                               }
+                               
+                               runner.setNiFiCommandControlPort(port);
+                               writer.write("OK");
+                               writer.newLine();
+                               writer.flush();
+                       }
+                       break;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
new file mode 100644
index 0000000..c831351
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class NiFiListener {
+       private ServerSocket serverSocket;
+       private volatile Listener listener;
+       
+       int start(final RunNiFi runner) throws IOException {
+               serverSocket = new ServerSocket();
+               serverSocket.bind(new InetSocketAddress("localhost", 0));
+               
+               final int localPort = serverSocket.getLocalPort();
+               listener = new Listener(serverSocket, runner);
+               final Thread listenThread = new Thread(listener);
+               listenThread.setName("Listen to NiFi");
+               listenThread.start();
+               return localPort;
+       }
+       
+       public void stop() throws IOException {
+               final Listener listener = this.listener;
+               if ( listener == null ) {
+                       return;
+               }
+               
+               listener.stop();
+       }
+       
+       private class Listener implements Runnable {
+               private final ServerSocket serverSocket;
+               private final ExecutorService executor;
+               private final RunNiFi runner;
+               private volatile boolean stopped = false;
+               
+               public Listener(final ServerSocket serverSocket, final RunNiFi 
runner) {
+                       this.serverSocket = serverSocket;
+                       this.executor = Executors.newFixedThreadPool(2);
+                       this.runner = runner;
+               }
+               
+               public void stop() throws IOException {
+                       stopped = true;
+                       
+                       executor.shutdown();
+                       try {
+                               executor.awaitTermination(3, TimeUnit.SECONDS);
+                       } catch (final InterruptedException ie) {
+                       }
+                       
+                       serverSocket.close();
+               }
+               
+               @Override
+               public void run() {
+                       while (!serverSocket.isClosed()) {
+                               try {
+                                       if ( stopped ) {
+                                               return;
+                                       }
+                                       
+                                       final Socket socket;
+                                       try {
+                                               socket = serverSocket.accept();
+                                       } catch (final IOException ioe) {
+                                               if ( stopped ) {
+                                                       return;
+                                               }
+                                               
+                                               throw ioe;
+                                       }
+                                       
+                                       
+                                       executor.submit(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               final 
BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), 
socket.getOutputStream());
+                                                               
codec.communicate();
+                                                               socket.close();
+                                                       } catch (final 
Throwable t) {
+                                                               
System.out.println("Failed to communicate with NiFi due to " + t);
+                                                               
t.printStackTrace();
+                                                       }
+                                               }
+                                       });
+                               } catch (final Throwable t) {
+                                       System.err.println("Failed to receive 
information from NiFi due to " + t);
+                                       t.printStackTrace();
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index afa1f47..ea3e566 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -1,16 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.bootstrap;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 /**
@@ -18,7 +47,6 @@ import java.util.Properties;
  * 
  * This class looks for the bootstrap.conf file by looking in the following 
places (in order):
  * <ol>
- *     <li>First argument to the program</li>
  *  <li>Java System Property named {@code 
org.apache.nifi.bootstrap.config.file}</li>
  *  <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an 
environment variable {@code NIFI_HOME}</li>
  *  <li>./conf/bootstrap.conf, where {@code .} represents the working 
directory.
@@ -29,12 +57,57 @@ import java.util.Properties;
 public class RunNiFi {
        public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
        public static final String DEFAULT_NIFI_PROPS_FILE = 
"./conf/nifi.properties";
+
+       public static final int MAX_RESTART_ATTEMPTS = 5;
+       public static final int STARTUP_WAIT_SECONDS = 60;
+       
+       public static final String SHUTDOWN_CMD = "SHUTDOWN";
+       public static final String PING_CMD = "PING";
+       
+       private volatile boolean autoRestartNiFi = true;
+       private volatile int ccPort = -1;
+       
+       private final Lock lock = new ReentrantLock();
+       private final Condition startupCondition = lock.newCondition();
+       
+       private final File bootstrapConfigFile;
+       
+       public RunNiFi(final File bootstrapConfigFile) {
+               this.bootstrapConfigFile = bootstrapConfigFile;
+       }
+       
+       private static void printUsage() {
+               System.out.println("Usage:");
+               System.out.println();
+               System.out.println("java org.apache.nifi.bootstrap.RunNiFi 
<command>");
+               System.out.println();
+               System.out.println("Valid commands include:");
+               System.out.println("");
+               System.out.println("Start : Start a new instance of Apache 
NiFi");
+               System.out.println("Stop : Stop a running instance of Apache 
NiFi");
+               System.out.println("Status : Determine if there is a running 
instance of Apache NiFi");
+               System.out.println();
+       }
        
-       @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void main(final String[] args) throws IOException, 
InterruptedException {
-               final ProcessBuilder builder = new ProcessBuilder();
-
-               String configFilename = (args.length > 0) ? args[0] : 
System.getProperty("org.apache.nifi.boostrap.config.file");
+               if ( args.length != 1 ) {
+                       printUsage();
+                       return;
+               }
+               
+               switch (args[0].toLowerCase()) {
+                       case "start":
+                       case "stop":
+                       case "status":
+                               break;
+                       default:
+                               System.out.println("Invalid argument: " + 
args[0]);
+                               System.out.println();
+                               printUsage();
+                               return;
+               }
+               
+               String configFilename = 
System.getProperty("org.apache.nifi.boostrap.config.file");
                
                if ( configFilename == null ) {
                        final String nifiHome = System.getenv("NIFI_HOME");
@@ -50,12 +123,122 @@ public class RunNiFi {
                }
                
                final File configFile = new File(configFilename);
-               if ( !configFile.exists() ) {
+               
+               final RunNiFi runNiFi = new RunNiFi(configFile);
+               
+               switch (args[0].toLowerCase()) {
+                       case "start":
+                               runNiFi.start();
+                               break;
+                       case "stop":
+                               runNiFi.stop();
+                               break;
+                       case "status":
+                               runNiFi.status();
+                               break;
+               }
+       }
+       
+       
+       public File getStatusFile() {
+               final File rootDir = bootstrapConfigFile.getParentFile();
+               final File statusFile = new File(rootDir, "nifi.port");
+               return statusFile;
+       }
+
+       private Integer getCurrentPort() throws IOException {
+               try {
+                       final File statusFile = getStatusFile();
+                       final byte[] info = 
Files.readAllBytes(statusFile.toPath());
+                       final String text = new String(info);
+                       
+                       final int port = Integer.parseInt(text);
+                       
+                       try (final Socket socket = new Socket("localhost", 
port)) {
+                               final OutputStream out = 
socket.getOutputStream();
+                               out.write((PING_CMD + 
"\n").getBytes(StandardCharsets.UTF_8));
+                               out.flush();
+                               
+                               final InputStream in = socket.getInputStream();
+                               final BufferedReader reader = new 
BufferedReader(new InputStreamReader(in));
+                               final String response = reader.readLine();
+                               if ( response.equals(PING_CMD) ) {
+                                       return port;
+                               }
+                       } catch (final IOException ioe) {
+                               System.out.println("Found NiFi instance info at 
" + statusFile + " but information appears to be stale. Removing file.");
+                               if ( !statusFile.delete() ) {
+                                       System.err.println("Unable to remove 
status file");
+                               }
+                               
+                               throw ioe;
+                       }
+               } catch (final Exception e) {
+                       return null;
+               }
+               
+               return null;
+       }
+       
+       
+       public void status() throws IOException {
+               final Integer port = getCurrentPort();
+               if ( port == null ) {
+                       System.out.println("Apache NiFi does not appear to be 
running");
+               } else {
+                       System.out.println("Apache NiFi is currently running, 
listening on port " + port);
+               }
+               return;
+       }
+       
+       
+       public void stop() throws IOException {
+               final Integer port = getCurrentPort();
+               if ( port == null ) {
+                       System.out.println("Apache NiFi is not currently 
running");
+                       return;
+               }
+               
+               try (final Socket socket = new Socket()) {
+                       socket.setSoTimeout(60000);
+                       socket.connect(new InetSocketAddress("localhost", 
port));
+                       socket.setSoTimeout(60000);
+                       
+                       final OutputStream out = socket.getOutputStream();
+                       out.write((SHUTDOWN_CMD + 
"\n").getBytes(StandardCharsets.UTF_8));
+                       out.flush();
+                       
+                       final InputStream in = socket.getInputStream();
+                       final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+                       final String response = reader.readLine();
+                       if ( SHUTDOWN_CMD.equals(response) ) {
+                               System.out.println("Apache NiFi has accepted 
the Shutdown Command and is shutting down now");
+                       } else {
+                               System.err.println("When sending SHUTDOWN 
command to NiFi, got unexpected response " + response);
+                       }
+               } catch (final IOException ioe) {
+                       System.err.println("Failed to communicate with Apache 
NiFi");
+                       return;
+               }
+       }
+       
+       
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       public void start() throws IOException, InterruptedException {
+               final Integer port = getCurrentPort();
+               if ( port != null ) {
+                       System.out.println("Apache NiFi is already running, 
listening on port " + port);
+                       return;
+               }
+               
+               final ProcessBuilder builder = new ProcessBuilder();
+
+               if ( !bootstrapConfigFile.exists() ) {
                        throw new FileNotFoundException(DEFAULT_CONFIG_FILE);
                }
                
                final Properties properties = new Properties();
-               try (final FileInputStream fis = new 
FileInputStream(configFile)) {
+               try (final FileInputStream fis = new 
FileInputStream(bootstrapConfigFile)) {
                        properties.load(fis);
                }
                
@@ -136,32 +319,67 @@ public class RunNiFi {
                        javaCmd = "java";
                }
                
+               final NiFiListener listener = new NiFiListener();
+               final int listenPort = listener.start(this);
+               
                final List<String> cmd = new ArrayList<>();
                cmd.add(javaCmd);
                cmd.add("-classpath");
                cmd.add(classPath);
                cmd.addAll(javaAdditionalArgs);
                cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
+               cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
                cmd.add("org.apache.nifi.NiFi");
                
-               builder.command(cmd).inheritIO();
+               builder.command(cmd);
                
                final StringBuilder cmdBuilder = new StringBuilder();
                for ( final String s : cmd ) {
                        cmdBuilder.append(s).append(" ");
                }
+
                System.out.println("Starting Apache NiFi...");
                System.out.println("Working Directory: " + 
workingDir.getAbsolutePath());
                System.out.println("Command: " + cmdBuilder.toString());
-
-               final Process proc = builder.start();
-               Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc));
-               final int statusCode = proc.waitFor();
-               System.out.println("Apache NiFi exited with Status Code " + 
statusCode);
+               
+               builder.start();
+               boolean started = waitForStart();
+               
+               if ( started ) {
+                       System.out.println("Successfully started Apache NiFi");
+               } else {
+                       System.err.println("Apache NiFi does not appear to have 
started");
+               }
+               
+               listener.stop();
        }
        
        
-       private static File getFile(final String filename, final File 
workingDir) {
+       private boolean waitForStart() {
+               lock.lock();
+               try {
+                       final long startTime = System.nanoTime();
+                       
+                       while ( ccPort < 1 ) {
+                               try {
+                                       startupCondition.await(1, 
TimeUnit.SECONDS);
+                               } catch (final InterruptedException ie) {
+                                       return false;
+                               }
+                               
+                               final long waitNanos = System.nanoTime() - 
startTime;
+                               final long waitSeconds = 
TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+                               if (waitSeconds > STARTUP_WAIT_SECONDS) {
+                                       return false;
+                               }
+                       }
+               } finally {
+                       lock.unlock();
+               }
+               return true;
+       }
+       
+       private File getFile(final String filename, final File workingDir) {
                File libDir = new File(filename);
                if ( !libDir.isAbsolute() ) {
                        libDir = new File(workingDir, filename);
@@ -170,7 +388,29 @@ public class RunNiFi {
                return libDir;
        }
        
-       private static String replaceNull(final String value, final String 
replacement) {
+       private String replaceNull(final String value, final String 
replacement) {
                return (value == null) ? replacement : value;
        }
+       
+       void setAutoRestartNiFi(final boolean restart) {
+               this.autoRestartNiFi = restart;
+       }
+       
+       void setNiFiCommandControlPort(final int port) {
+               this.ccPort = port;
+
+               final File statusFile = getStatusFile();
+               try (final FileOutputStream fos = new 
FileOutputStream(statusFile)) {
+                       
fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8));
+                       fos.getFD().sync();
+               } catch (final IOException ioe) {
+                       System.err.println("Apache NiFi has started but failed 
to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to 
" + ioe);
+               }
+               
+               System.out.println("Apache NiFi now running and listening for 
requests on port " + port);
+       }
+       
+       int getNiFiCommandControlPort() {
+               return this.ccPort;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
index 55e1f45..142d984 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
@@ -1,14 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.bootstrap;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
 public class ShutdownHook extends Thread {
        private final Process nifiProcess;
+       private final RunNiFi runner;
+       
+       public static final int WAIT_SECONDS = 10;
        
-       public ShutdownHook(final Process nifiProcess) {
+       public ShutdownHook(final Process nifiProcess, final RunNiFi runner) {
                this.nifiProcess = nifiProcess;
+               this.runner = runner;
        }
        
        @Override
        public void run() {
+               runner.setAutoRestartNiFi(false);
+               final int ccPort = runner.getNiFiCommandControlPort();
+               if ( ccPort > 0 ) {
+                       System.out.println("Initiating Shutdown of NiFi...");
+                       
+                       try {
+                               final Socket socket = new Socket("localhost", 
ccPort);
+                               final OutputStream out = 
socket.getOutputStream();
+                               
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+                               out.flush();
+                               
+                               socket.close();
+                       } catch (final IOException ioe) {
+                               System.out.println("Failed to Shutdown NiFi due 
to " + ioe);
+                       }
+               }
+               
+               try {
+                       nifiProcess.waitFor(WAIT_SECONDS, TimeUnit.SECONDS);
+               } catch (final InterruptedException ie) {
+               }
+
+               if ( nifiProcess.isAlive() ) {
+                       System.out.println("NiFi has not finished shutting down 
after " + WAIT_SECONDS + " seconds. Killing process.");
+               }
                nifiProcess.destroy();
+               
+               final File statusFile = runner.getStatusFile();
+               if ( !statusFile.delete() ) {
+                       System.err.println("Failed to delete status file " + 
statusFile.getAbsolutePath());
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64657049/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
new file mode 100644
index 0000000..962aa1c
--- /dev/null
+++ 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.exception;
+
+public class InvalidCommandException extends Exception {
+       private static final long serialVersionUID = 1L;
+
+       public InvalidCommandException() {
+               super();
+       }
+       
+       public InvalidCommandException(final String message) {
+               super(message);
+       }
+       
+       public InvalidCommandException(final Throwable t) {
+               super(t);
+       }
+       
+       public InvalidCommandException(final String message, final Throwable t) 
{
+               super(message, t);
+       }
+}

Reply via email to