NIFI-4683: Add ability to execute Spark jobs via Livy

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2192138b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2192138b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2192138b

Branch: refs/heads/master
Commit: 2192138b06eade6ceb6b2475580f005f4b75ee69
Parents: d543cfd
Author: Matthew Burgess <[email protected]>
Authored: Wed Dec 13 21:43:35 2017 -0500
Committer: joewitt <[email protected]>
Committed: Thu Dec 14 16:23:41 2017 -0500

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  12 +
 .../pom.xml                                     |  39 ++
 .../src/main/resources/META-INF/LICENSE         | 209 +++++++
 .../src/main/resources/META-INF/NOTICE          |  47 ++
 .../nifi-livy-controller-service-api/pom.xml    |  33 ++
 .../controller/api/livy/LivySessionService.java |  34 ++
 .../nifi-livy-controller-service/pom.xml        |  63 +++
 .../controller/livy/LivySessionController.java  | 554 +++++++++++++++++++
 ...org.apache.nifi.controller.ControllerService |  16 +
 .../nifi-spark-bundle/nifi-livy-nar/pom.xml     |  48 ++
 .../src/main/resources/META-INF/LICENSE         | 209 +++++++
 .../src/main/resources/META-INF/NOTICE          |  55 ++
 .../nifi-livy-processors/pom.xml                |  81 +++
 .../livy/ExecuteSparkInteractive.java           | 304 ++++++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../livy/ExecuteSparkInteractiveTestBase.java   |  96 ++++
 .../livy/TestExecuteSparkInteractive.java       | 102 ++++
 .../livy/TestExecuteSparkInteractiveSSL.java    | 134 +++++
 .../apache/nifi/processors/livy/TestServer.java | 167 ++++++
 .../src/test/resources/localhost-ks.jks         | Bin 0 -> 3512 bytes
 .../src/test/resources/localhost-ts.jks         | Bin 0 -> 1816 bytes
 nifi-nar-bundles/nifi-spark-bundle/pom.xml      |  85 +++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  14 +-
 24 files changed, 2318 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 49e562d..8ad3930 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -541,6 +541,18 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-metrics-reporting-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-livy-controller-service-api-nar</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-livy-nar</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/pom.xml
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/pom.xml
new file mode 100644
index 0000000..db652d5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/pom.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-spark-bundle</artifactId>
+               <version>1.5.0-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>nifi-livy-controller-service-api-nar</artifactId>
+       <packaging>nar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-standard-services-api-nar</artifactId>
+                       <type>nar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       
<artifactId>nifi-livy-controller-service-api</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..f3c8ece
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..0b2cb0a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,47 @@
+nifi-livy-controller-service-api-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson ([email protected])
+
+      
===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been 
translated
+      from the original php source code available at 
http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
new file mode 100644
index 0000000..11d23e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/pom.xml
@@ -0,0 +1,33 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-spark-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-livy-controller-service-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
new file mode 100644
index 0000000..7627aa4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service-api/src/main/java/org/apache/nifi/controller/api/livy/LivySessionService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.api.livy;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+
+public interface LivySessionService extends ControllerService {
+    String APPLICATION_JSON = "application/json";
+    String USER = "nifi";
+    String GET = "GET";
+    String POST = "POST";
+
+    Map<String, String> getSession();
+
+    HttpURLConnection getConnection(String urlString) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
new file mode 100644
index 0000000..0875e70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/pom.xml
@@ -0,0 +1,63 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-spark-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-livy-controller-service</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-livy-controller-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jettison</groupId>
+            <artifactId>jettison</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
new file mode 100644
index 0000000..bc20c01
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/java/org/apache/nifi/controller/livy/LivySessionController.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.livy;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import org.apache.nifi.controller.api.livy.LivySessionService;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+@Tags({"Livy", "REST", "Spark", "http"})
+@CapabilityDescription("Manages pool of Spark sessions over HTTP")
+public class LivySessionController extends AbstractControllerService 
implements LivySessionService {
+
+    public static final PropertyDescriptor LIVY_HOST = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-livy-host")
+            .displayName("Livy Host")
+            .description("The hostname (or IP address) of the Livy server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor LIVY_PORT = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-livy-port")
+            .displayName("Livy Port")
+            .description("The port number for the Livy server.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("8998")
+            .build();
+
+    public static final PropertyDescriptor SESSION_POOL_SIZE = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-session-pool-size")
+            .displayName("Session Pool Size")
+            .description("Number of sessions to keep open")
+            .required(true)
+            .defaultValue("2")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor SESSION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-session-kind")
+            .displayName("Session Type")
+            .description("The type of Spark session to start (spark, pyspark, 
pyspark3, sparkr, e.g.)")
+            .required(true)
+            .allowableValues("spark", "pyspark", "pyspark3", "sparkr")
+            .defaultValue("spark")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SESSION_MGR_STATUS_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-session-manager-status-interval")
+            .displayName("Session Manager Status Interval")
+            .description("The amount of time to wait between requesting 
session information updates.")
+            .required(true)
+            .defaultValue("2 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor JARS = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-session-jars")
+            .displayName("Session JARs")
+            .description("JARs to be used in the Spark session.")
+            .required(false)
+            .addValidator(StandardValidators.createListValidator(true, true, 
StandardValidators.FILE_EXISTS_VALIDATOR))
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor FILES = new 
PropertyDescriptor.Builder()
+            .name("livy-cs-session-files")
+            .displayName("Session Files")
+            .description("Files to be used in the Spark session.")
+            .required(false)
+            .addValidator(StandardValidators.createListValidator(true, true, 
StandardValidators.FILE_EXISTS_VALIDATOR))
+            .defaultValue(null)
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor CONNECT_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Connection Timeout")
+            .description("Max wait time for connection to remote service.")
+            .required(true)
+            .defaultValue("5 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private volatile String livyUrl;
+    private volatile int sessionPoolSize;
+    private volatile String controllerKind;
+    private volatile String jars;
+    private volatile String files;
+    private volatile Map<Integer, JSONObject> sessions = new 
ConcurrentHashMap<>();
+    private volatile SSLContextService sslContextService;
+    private volatile SSLContext sslContext;
+    private volatile int connectTimeout;
+    private volatile Thread livySessionManagerThread = null;
+    private volatile boolean enabled = true;
+
+    private List<PropertyDescriptor> properties;
+
+    @Override
+    protected void init(ControllerServiceInitializationContext config) {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(LIVY_HOST);
+        props.add(LIVY_PORT);
+        props.add(SESSION_POOL_SIZE);
+        props.add(SESSION_TYPE);
+        props.add(SESSION_MGR_STATUS_INTERVAL);
+        props.add(SSL_CONTEXT_SERVICE);
+        props.add(CONNECT_TIMEOUT);
+        props.add(JARS);
+        props.add(FILES);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) {
+        ComponentLog log = getLogger();
+        log.info("********** Starting Livy Session Controller Service...");
+
+        final String livyHost = 
context.getProperty(LIVY_HOST).evaluateAttributeExpressions().getValue();
+        final String livyPort = 
context.getProperty(LIVY_PORT).evaluateAttributeExpressions().getValue();
+        final String sessionPoolSize = 
context.getProperty(SESSION_POOL_SIZE).evaluateAttributeExpressions().getValue();
+        final String sessionKind = 
context.getProperty(SESSION_TYPE).getValue();
+        final long sessionManagerStatusInterval = 
context.getProperty(SESSION_MGR_STATUS_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String jars = 
context.getProperty(JARS).evaluateAttributeExpressions().getValue();
+        final String files = 
context.getProperty(FILES).evaluateAttributeExpressions().getValue();
+        sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+        connectTimeout = 
Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
+
+        this.livyUrl = "http" + (sslContextService != null ? "s" : "") + "://" 
+ livyHost + ":" + livyPort;
+        this.controllerKind = sessionKind;
+        this.jars = jars;
+        this.files = files;
+        this.sessionPoolSize = Integer.valueOf(sessionPoolSize);
+        this.enabled = true;
+
+        livySessionManagerThread = new Thread(() -> {
+            while (enabled) {
+                try {
+                    manageSessions();
+                    Thread.sleep(sessionManagerStatusInterval);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    log.debug("********** " + Thread.currentThread().getName()
+                            + " run() Interrupt Status: " + 
Thread.currentThread().isInterrupted());
+                    enabled = false;
+                } catch (IOException ioe) {
+                    throw new ProcessException(ioe);
+                }
+            }
+        });
+        livySessionManagerThread.setName("Livy-Session-Manager-" + 
controllerKind);
+        livySessionManagerThread.start();
+    }
+
+    @OnDisabled
+    public void shutdown() {
+        ComponentLog log = getLogger();
+        try {
+            log.info("********** Starting Livy Session Controller Service...");
+            enabled = false;
+            livySessionManagerThread.interrupt();
+            livySessionManagerThread.join();
+        } catch (InterruptedException e) {
+            log.error("Livy Session Manager Thread interrupted");
+        }
+    }
+
+    public Map<String, String> getSession() {
+        Map<String, String> sessionMap = new HashMap<>();
+        try {
+            final Map<Integer, JSONObject> sessionsCopy = sessions;
+            for (int sessionId : sessionsCopy.keySet()) {
+                JSONObject currentSession = sessions.get(sessionId);
+                String state = currentSession.getString("state");
+                String sessionKind = currentSession.getString("kind");
+                if (state.equalsIgnoreCase("idle") && 
sessionKind.equalsIgnoreCase(controllerKind)) {
+                    sessionMap.put("sessionId", String.valueOf(sessionId));
+                    sessionMap.put("livyUrl", livyUrl);
+                }
+            }
+        } catch (JSONException e) {
+            getLogger().error("Unexpected data found when looking for JSON 
object with 'state' and 'kind' fields", e);
+        }
+        return sessionMap;
+    }
+
+    public HttpURLConnection getConnection(String urlString) throws 
IOException {
+        URL url = new URL(urlString);
+        HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+        connection.setConnectTimeout(connectTimeout);
+        if (sslContextService != null) {
+            try {
+                setSslSocketFactory((HttpsURLConnection) connection, 
sslContextService, sslContext);
+            } catch (KeyStoreException | CertificateException | 
NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException 
e) {
+                throw new IOException(e);
+            }
+        }
+        return connection;
+    }
+
+    private void manageSessions() throws InterruptedException, IOException {
+        int idleSessions = 0;
+        JSONObject newSessionInfo;
+        Map<Integer, JSONObject> sessionsInfo;
+        ComponentLog log = getLogger();
+
+        try {
+            sessionsInfo = listSessions();
+            if (sessions.isEmpty()) {
+                log.debug("********** manageSessions() the active session list 
is empty, populating from acquired list...");
+                sessions.putAll(sessionsInfo);
+            }
+            for (Integer sessionId : new ArrayList<>(sessions.keySet())) {
+                JSONObject currentSession = sessions.get(sessionId);
+                log.debug("********** manageSessions() Updating current 
session: " + currentSession);
+                if (sessionsInfo.containsKey(sessionId)) {
+                    String state = currentSession.getString("state");
+                    String sessionKind = currentSession.getString("kind");
+                    log.debug("********** manageSessions() controller kind: 
{}, session kind: {}, session state: {}",
+                            new Object[]{controllerKind, sessionKind, state});
+                    if (state.equalsIgnoreCase("idle") && 
sessionKind.equalsIgnoreCase(controllerKind)) {
+                        // Keep track of how many sessions are in an idle 
state and thus available
+                        idleSessions++;
+                        sessions.put(sessionId, sessionsInfo.get(sessionId));
+                        // Remove session from session list source of truth 
snapshot since it has been dealt with
+                        sessionsInfo.remove(sessionId);
+                    } else if ((state.equalsIgnoreCase("busy") || 
state.equalsIgnoreCase("starting")) && 
sessionKind.equalsIgnoreCase(controllerKind)) {
+                        // Update status of existing sessions
+                        sessions.put(sessionId, sessionsInfo.get(sessionId));
+                        // Remove session from session list source of truth 
snapshot since it has been dealt with
+                        sessionsInfo.remove(sessionId);
+                    } else {
+                        // Prune sessions of kind != controllerKind and whose 
state is:
+                        // not_started, shutting_down, error, dead, success 
(successfully stopped)
+                        sessions.remove(sessionId);
+                        //Remove session from session list source of truth 
snapshot since it has been dealt with
+                        sessionsInfo.remove(sessionId);
+                    }
+                } else {
+                    // Prune sessions that no longer exist
+                    log.debug("********** manageSessions() session exists in 
session pool but not in source snapshot, removing from pool...");
+                    sessions.remove(sessionId);
+                    // Remove session from session list source of truth 
snapshot since it has been dealt with
+                    sessionsInfo.remove(sessionId);
+                }
+            }
+            int numSessions = sessions.size();
+            log.debug("********** manageSessions() There are " + numSessions + 
" sessions in the pool");
+            // Open new sessions equal to the number requested by 
sessionPoolSize
+            if (numSessions == 0) {
+                for (int i = 0; i < sessionPoolSize; i++) {
+                    newSessionInfo = openSession();
+                    sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
+                    log.debug("********** manageSessions() Registered new 
session: " + newSessionInfo);
+                }
+            } else {
+                // Open one new session if there are no idle sessions
+                if (idleSessions == 0) {
+                    log.debug("********** manageSessions() There are " + 
numSessions + " sessions in the pool but none of them are idle sessions, 
creating...");
+                    newSessionInfo = openSession();
+                    sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
+                    log.debug("********** manageSessions() Registered new 
session: " + newSessionInfo);
+                }
+                // Open more sessions if number of sessions is less than 
target pool size
+                if (numSessions < sessionPoolSize) {
+                    log.debug("********** manageSessions() There are " + 
numSessions + ", need more sessions to equal requested pool size of " + 
sessionPoolSize + ", creating...");
+                    for (int i = 0; i < sessionPoolSize - numSessions; i++) {
+                        newSessionInfo = openSession();
+                        sessions.put(newSessionInfo.getInt("id"), 
newSessionInfo);
+                        log.debug("********** manageSessions() Registered new 
session: " + newSessionInfo);
+                    }
+                }
+            }
+        } catch (ConnectException | SocketTimeoutException ce) {
+            log.error("Timeout connecting to Livy service to retrieve 
sessions", ce);
+        } catch (JSONException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Map<Integer, JSONObject> listSessions() throws IOException {
+        String sessionsUrl = livyUrl + "/sessions";
+        int numSessions;
+        JSONObject sessionsInfo;
+        Map<Integer, JSONObject> sessionsMap = new HashMap<>();
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", APPLICATION_JSON);
+        headers.put("X-Requested-By", USER);
+        try {
+            sessionsInfo = readJSONFromUrl(sessionsUrl, headers);
+            numSessions = sessionsInfo.getJSONArray("sessions").length();
+            for (int i = 0; i < numSessions; i++) {
+                int currentSessionId = 
sessionsInfo.getJSONArray("sessions").getJSONObject(i).getInt("id");
+                JSONObject currentSession = 
sessionsInfo.getJSONArray("sessions").getJSONObject(i);
+                sessionsMap.put(currentSessionId, currentSession);
+            }
+        } catch (JSONException e) {
+            throw new IOException(e);
+        }
+
+        return sessionsMap;
+    }
+
+    private JSONObject getSessionInfo(int sessionId) throws IOException {
+        String sessionUrl = livyUrl + "/sessions/" + sessionId;
+        JSONObject sessionInfo;
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", APPLICATION_JSON);
+        headers.put("X-Requested-By", USER);
+        try {
+            sessionInfo = readJSONFromUrl(sessionUrl, headers);
+        } catch (JSONException e) {
+            throw new IOException(e);
+        }
+
+        return sessionInfo;
+    }
+
+    private JSONObject openSession() throws IOException, JSONException, 
InterruptedException {
+        ComponentLog log = getLogger();
+        JSONObject newSessionInfo;
+        final ObjectMapper mapper = new ObjectMapper();
+
+        String sessionsUrl = livyUrl + "/sessions";
+        StringBuilder payload = new StringBuilder("{\"kind\":\"" + 
controllerKind + "\"");
+        if (jars != null) {
+            List<String> jarsArray = Arrays.stream(jars.split(","))
+                    .filter(StringUtils::isNotBlank)
+                    .map(String::trim).collect(Collectors.toList());
+
+            String jarsJsonArray = mapper.writeValueAsString(jarsArray);
+            payload.append(",\"jars\":");
+            payload.append(jarsJsonArray);
+        }
+        if (files != null) {
+            List<String> filesArray = Arrays.stream(files.split(","))
+                    .filter(StringUtils::isNotBlank)
+                    .map(String::trim).collect(Collectors.toList());
+            String filesJsonArray = mapper.writeValueAsString(filesArray);
+            payload.append(",\"files\":");
+            payload.append(filesJsonArray);
+        }
+
+        payload.append("}");
+        log.debug("********** openSession() Session Payload: " + 
payload.toString());
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", APPLICATION_JSON);
+        headers.put("X-Requested-By", USER);
+
+        newSessionInfo = readJSONObjectFromUrlPOST(sessionsUrl, headers, 
payload.toString());
+        Thread.sleep(1000);
+        while (newSessionInfo.getString("state").equalsIgnoreCase("starting")) 
{
+            log.debug("********** openSession() Waiting for session to 
start...");
+            newSessionInfo = getSessionInfo(newSessionInfo.getInt("id"));
+            log.debug("********** openSession() newSessionInfo: " + 
newSessionInfo);
+            Thread.sleep(1000);
+        }
+
+        return newSessionInfo;
+    }
+
+    private JSONObject readJSONObjectFromUrlPOST(String urlString, Map<String, 
String> headers, String payload) throws IOException, JSONException {
+        URL url = new URL(urlString);
+        HttpURLConnection connection = getConnection(urlString);
+
+        connection.setRequestMethod(POST);
+        connection.setDoOutput(true);
+
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+
+        OutputStream os = connection.getOutputStream();
+        os.write(payload.getBytes());
+        os.flush();
+
+        if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && 
connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
+            throw new RuntimeException("Failed : HTTP error code : " + 
connection.getResponseCode() + " : " + connection.getResponseMessage());
+        }
+
+        InputStream content = connection.getInputStream();
+        return readAllIntoJSONObject(content);
+    }
+
+    private JSONArray readJSONArrayFromUrlPOST(String urlString, Map<String, 
String> headers, String payload) throws IOException, JSONException {
+        URL url = new URL(urlString);
+        HttpURLConnection connection = getConnection(urlString);
+        connection.setRequestMethod(POST);
+        connection.setDoOutput(true);
+
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+
+        OutputStream os = connection.getOutputStream();
+        os.write(payload.getBytes());
+        os.flush();
+
+        if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && 
connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) {
+            throw new RuntimeException("Failed : HTTP error code : " + 
connection.getResponseCode() + " : " + connection.getResponseMessage());
+        }
+
+        InputStream content = connection.getInputStream();
+        return readAllIntoJSONArray(content);
+    }
+
+    private JSONObject readJSONFromUrl(String urlString, Map<String, String> 
headers) throws IOException, JSONException {
+
+        HttpURLConnection connection = getConnection(urlString);
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+        connection.setRequestMethod(GET);
+        connection.setDoOutput(true);
+        InputStream content = connection.getInputStream();
+        return readAllIntoJSONObject(content);
+    }
+
+    private JSONArray readJSONArrayFromUrl(String urlString, Map<String, 
String> headers) throws IOException, JSONException {
+        HttpURLConnection connection = getConnection(urlString);
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+        connection.setRequestMethod(GET);
+        connection.setDoOutput(true);
+        InputStream content = connection.getInputStream();
+        return readAllIntoJSONArray(content);
+    }
+
+    private JSONObject readAllIntoJSONObject(InputStream content) throws 
IOException, JSONException {
+        BufferedReader rd = new BufferedReader(new InputStreamReader(content, 
StandardCharsets.UTF_8));
+        String jsonText = IOUtils.toString(rd);
+        return new JSONObject(jsonText);
+    }
+
+    private JSONArray readAllIntoJSONArray(InputStream content) throws 
IOException, JSONException {
+        BufferedReader rd = new BufferedReader(new InputStreamReader(content, 
StandardCharsets.UTF_8));
+        String jsonText = IOUtils.toString(rd);
+        return new JSONArray(jsonText);
+    }
+
+    private void setSslSocketFactory(HttpsURLConnection httpsURLConnection, 
SSLContextService sslService, SSLContext sslContext)
+            throws IOException, KeyStoreException, CertificateException, 
NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
+        final String keystoreLocation = sslService.getKeyStoreFile();
+        final String keystorePass = sslService.getKeyStorePassword();
+        final String keystoreType = sslService.getKeyStoreType();
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+        try (FileInputStream keyStoreStream = new 
FileInputStream(keystoreLocation)) {
+            keyStore.load(keyStoreStream, keystorePass.toCharArray());
+        }
+
+        final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+        // load truststore
+        final String truststoreLocation = sslService.getTrustStoreFile();
+        final String truststorePass = sslService.getTrustStorePassword();
+        final String truststoreType = sslService.getTrustStoreType();
+
+        KeyStore truststore = KeyStore.getInstance(truststoreType);
+        final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance("X509");
+        truststore.load(new FileInputStream(truststoreLocation), 
truststorePass.toCharArray());
+        trustManagerFactory.init(truststore);
+
+        sslContext.init(keyManagerFactory.getKeyManagers(), 
trustManagerFactory.getTrustManagers(), null);
+
+        final SSLSocketFactory sslSocketFactory = 
sslContext.getSocketFactory();
+        httpsURLConnection.setSSLSocketFactory(sslSocketFactory);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..8b4b046
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.controller.livy.LivySessionController
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/pom.xml 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/pom.xml
new file mode 100644
index 0000000..e6e310b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/pom.xml
@@ -0,0 +1,48 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-spark-bundle</artifactId>
+               <version>1.5.0-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>nifi-livy-nar</artifactId>
+       <packaging>nar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       
<artifactId>nifi-livy-controller-service-api-nar</artifactId>
+                       <type>nar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-livy-controller-service</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-livy-processors</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..f3c8ece
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..604541a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,55 @@
+nifi-livy-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Configuration
+    The following NOTICE information applies:
+      Apache Commons Configuration
+      Copyright 2001-2017 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson ([email protected])
+
+      
===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been 
translated
+      from the original php source code available at 
http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons Logging
+    The following NOTICE information applies:
+      Apache Commons Logging
+      Copyright 2003-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2016 The Apache Software Foundation
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml 
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
new file mode 100644
index 0000000..e4a1ec6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/pom.xml
@@ -0,0 +1,81 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-spark-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-livy-processors</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-livy-controller-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jettison</groupId>
+            <artifactId>jettison</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-livy-controller-service</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

Reply via email to