http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java new file mode 100644 index 0000000..d388148 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.livy; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import org.apache.nifi.processor.util.StandardValidators; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import org.apache.nifi.controller.api.livy.LivySessionService; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"spark", "livy", "http", "execute"}) +@CapabilityDescription("Execute Spark Code over a Livy-managed HTTP session to a live Spark context. Supports cached RDD sharing.") +public class ExecuteSparkInteractive extends AbstractProcessor { + + public static final PropertyDescriptor LIVY_CONTROLLER_SERVICE = new PropertyDescriptor.Builder() + .name("exec-spark-iactive-livy-controller-service") + .displayName("Livy Controller Service") + .description("The controller service to use for Livy-managed session(s).") + .required(true) + .identifiesControllerService(LivySessionService.class) + .build(); + + public static final PropertyDescriptor CODE = new PropertyDescriptor.Builder() + .name("exec-spark-iactive-code") + .displayName("Code") + .description("The code to execute in the session. This property can be empty, a constant value, or built from attributes " + + "using Expression Language. If this property is specified, it will be used regardless of the content of " + + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "to contain valid code to be issued by the processor to the session. Note that Expression " + + "Language is not evaluated for flow file contents.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * Points to the charset name corresponding to the incoming flow file's encoding. + */ + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("exec-spark-iactive-charset") + .displayName("Character Set") + .description("The character set encoding for the incoming flow file.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor STATUS_CHECK_INTERVAL = new PropertyDescriptor.Builder() + .name("exec-spark-iactive-status-check-interval") + .displayName("Status Check Interval") + .description("The amount of time to wait between checking the status of an operation.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("1 sec") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully processed are sent to this relationship") + .build(); + + public static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("FlowFiles that are waiting on an available Spark session will be sent to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship when they cannot be parsed") + .build(); + + private volatile List<PropertyDescriptor> properties; + private volatile Set<Relationship> relationships; + + + public void init(final ProcessorInitializationContext context) { + List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(LIVY_CONTROLLER_SERVICE); + properties.add(CODE); + properties.add(CHARSET); + properties.add(STATUS_CHECK_INTERVAL); + this.properties = Collections.unmodifiableList(properties); + + Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_WAIT); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog log = getLogger(); + final LivySessionService livySessionService = context.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class); + final Map<String, String> livyController = livySessionService.getSession(); + if (livyController == null || livyController.isEmpty()) { + log.debug("No Spark session available (yet), routing flowfile to wait"); + session.transfer(flowFile, REL_WAIT); + return; + } + final long statusCheckInterval = context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); + Charset charset; + try { + charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + } catch (Exception e) { + log.warn("Illegal character set name specified, defaulting to UTF-8"); + charset = StandardCharsets.UTF_8; + } + + String sessionId = livyController.get("sessionId"); + String livyUrl = livyController.get("livyUrl"); + String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(code)) { + try (InputStream inputStream = session.read(flowFile)) { + // If no code was provided, assume it is in the content of the incoming flow file + code = IOUtils.toString(inputStream, charset); + } catch (IOException ioe) { + log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + code = StringEscapeUtils.escapeJavaScript(code); + String payload = "{\"code\":\"" + code + "\"}"; + try { + final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); + log.debug("********** ExecuteSparkInteractive Result of Job Submit: " + result); + if (result == null) { + session.transfer(flowFile, REL_FAILURE); + } else { + try { + final JSONObject output = result.getJSONObject("data"); + flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); + session.transfer(flowFile, REL_SUCCESS); + } catch (JSONException je) { + // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) + log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); + flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + } catch (IOException ioe) { + log.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + + private JSONObject submitAndHandleJob(String livyUrl, LivySessionService livySessionService, String sessionId, String payload, long statusCheckInterval) throws IOException { + ComponentLog log = getLogger(); + String statementUrl = livyUrl + "/sessions/" + sessionId + "/statements"; + JSONObject output = null; + Map<String, String> headers = new HashMap<>(); + headers.put("Content-Type", LivySessionService.APPLICATION_JSON); + headers.put("X-Requested-By", LivySessionService.USER); + headers.put("Accept", "application/json"); + + log.debug("********** submitAndHandleJob() Submitting Job to Spark via: " + statementUrl); + try { + JSONObject jobInfo = readJSONObjectFromUrlPOST(statementUrl, livySessionService, headers, payload); + log.debug("********** submitAndHandleJob() Job Info: " + jobInfo); + String statementId = String.valueOf(jobInfo.getInt("id")); + statementUrl = statementUrl + "/" + statementId; + jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers); + String jobState = jobInfo.getString("state"); + + log.debug("********** submitAndHandleJob() New Job Info: " + jobInfo); + Thread.sleep(statusCheckInterval); + if (jobState.equalsIgnoreCase("available")) { + log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". returning output..."); + output = jobInfo.getJSONObject("output"); + } else if (jobState.equalsIgnoreCase("running") || jobState.equalsIgnoreCase("waiting")) { + while (!jobState.equalsIgnoreCase("available")) { + log.debug("********** submitAndHandleJob() Job status is: " + jobState + ". Waiting for job to complete..."); + Thread.sleep(statusCheckInterval); + jobInfo = readJSONObjectFromUrl(statementUrl, livySessionService, headers); + jobState = jobInfo.getString("state"); + } + output = jobInfo.getJSONObject("output"); + } else if (jobState.equalsIgnoreCase("error") + || jobState.equalsIgnoreCase("cancelled") + || jobState.equalsIgnoreCase("cancelling")) { + log.debug("********** Job status is: " + jobState + ". Job did not complete due to error or has been cancelled. Check SparkUI for details."); + throw new IOException(jobState); + } + } catch (JSONException | InterruptedException e) { + throw new IOException(e); + } + return output; + } + + private JSONObject readJSONObjectFromUrlPOST(String urlString, LivySessionService livySessionService, Map<String, String> headers, String payload) + throws IOException, JSONException { + + HttpURLConnection connection = livySessionService.getConnection(urlString); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + + for (Map.Entry<String, String> entry : headers.entrySet()) { + connection.setRequestProperty(entry.getKey(), entry.getValue()); + } + + OutputStream os = connection.getOutputStream(); + os.write(payload.getBytes()); + os.flush(); + + if (connection.getResponseCode() != HttpURLConnection.HTTP_OK && connection.getResponseCode() != HttpURLConnection.HTTP_CREATED) { + throw new RuntimeException("Failed : HTTP error code : " + connection.getResponseCode() + " : " + connection.getResponseMessage()); + } + + InputStream content = connection.getInputStream(); + BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); + String jsonText = IOUtils.toString(rd); + return new JSONObject(jsonText); + } + + private JSONObject readJSONObjectFromUrl(final String urlString, LivySessionService livySessionService, final Map<String, String> headers) + throws IOException, JSONException { + + HttpURLConnection connection = livySessionService.getConnection(urlString); + for (Map.Entry<String, String> entry : headers.entrySet()) { + connection.setRequestProperty(entry.getKey(), entry.getValue()); + } + connection.setRequestMethod("GET"); + connection.setDoOutput(true); + InputStream content = connection.getInputStream(); + BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)); + String jsonText = IOUtils.toString(rd); + return new JSONObject(jsonText); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..8de81e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.livy.ExecuteSparkInteractive \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java new file mode 100644 index 0000000..3a2c67a --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/ExecuteSparkInteractiveTestBase.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.livy; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; + +public class ExecuteSparkInteractiveTestBase { + + public static class LivyAPIHandler extends AbstractHandler { + + int session1Requests = 0; + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + response.setStatus(200); + + if ("GET".equalsIgnoreCase(request.getMethod())) { + + String responseBody = "{}"; + response.setContentType("application/json"); + + if ("/sessions".equalsIgnoreCase(target)) { + responseBody = "{\"sessions\": [{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}]}"; + } else if (target.startsWith("/sessions/") && !target.contains("statement")) { + responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}"; + + } else if ("/sessions/1/statements/7".equalsIgnoreCase(target)) { + switch (session1Requests) { + case 0: + responseBody = "{\"state\": \"waiting\"}"; + break; + case 1: + responseBody = "{\"state\": \"running\"}"; + break; + case 2: + responseBody = "{\"state\": \"available\", \"output\": {\"data\": {\"text/plain\": \"Hello world\"}}}"; + break; + default: + responseBody = "{\"state\": \"error\"}"; + break; + } + session1Requests++; + } + + response.setContentLength(responseBody.length()); + + try (PrintWriter writer = response.getWriter()) { + writer.print(responseBody); + writer.flush(); + } + + } else if ("POST".equalsIgnoreCase(request.getMethod())) { + + String responseBody = "{}"; + response.setContentType("application/json"); + + if ("/sessions".equalsIgnoreCase(target)) { + responseBody = "{\"id\": 1, \"kind\": \"spark\", \"state\": \"idle\"}"; + } else if ("/sessions/1/statements".equalsIgnoreCase(target)) { + responseBody = "{\"id\": 7}"; + } + + response.setContentLength(responseBody.length()); + + try (PrintWriter writer = response.getWriter()) { + writer.print(responseBody); + writer.flush(); + } + + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java new file mode 100644 index 0000000..4219783 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractive.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.livy; + +import org.apache.nifi.controller.livy.LivySessionController; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.server.Handler; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class TestExecuteSparkInteractive extends ExecuteSparkInteractiveTestBase { + + public static TestServer server; + public static String url; + + public TestRunner runner; + + @BeforeClass + public static void beforeClass() throws Exception { + // useful for verbose logging output + // don't commit this with this property enabled, or any 'mvn test' will be really verbose + // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug"); + + // create a Jetty server on a random port + server = createServer(); + server.startServer(); + + // this is the base url with the random port + url = server.getUrl(); + } + + public void addHandler(Handler handler) { + server.addHandler(handler); + } + + @AfterClass + public static void afterClass() throws Exception { + server.shutdownServer(); + } + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class); + LivySessionController livyControllerService = new LivySessionController(); + runner.addControllerService("livyCS", livyControllerService); + runner.setProperty(livyControllerService, LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, url.lastIndexOf(":"))); + runner.setProperty(livyControllerService, LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1)); + runner.enableControllerService(livyControllerService); + runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, "livyCS"); + + server.clearHandlers(); + } + + @After + public void after() { + runner.shutdown(); + } + + private static TestServer createServer() throws IOException { + return new TestServer(); + } + + @Test + public void testSparkSession() throws Exception { + + addHandler(new LivyAPIHandler()); + + runner.enqueue("print \"hello world\""); + runner.run(); + List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); + while (!waitingFlowfiles.isEmpty()) { + Thread.sleep(1000); + runner.clearTransferState(); + runner.enqueue("print \"hello world\""); + runner.run(); + waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); + } + runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java new file mode 100644 index 0000000..e732617 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestExecuteSparkInteractiveSSL.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.livy; + +import org.apache.nifi.controller.livy.LivySessionController; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.server.Handler; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestExecuteSparkInteractiveSSL extends ExecuteSparkInteractiveTestBase { + + private static Map<String, String> sslProperties; + + public static TestServer server; + public static String url; + + public TestRunner runner; + + @BeforeClass + public static void beforeClass() throws Exception { + // useful for verbose logging output + // don't commit this with this property enabled, or any 'mvn test' will be really verbose + // System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug"); + + // create the SSL properties, which basically store keystore / trustore information + // this is used by the StandardSSLContextService and the Jetty Server + sslProperties = createSslProperties(); + + // create a Jetty server on a random port + server = createServer(); + server.startServer(); + + // Allow time for the server to start + Thread.sleep(1000); + + // this is the base url with the random port + url = server.getSecureUrl(); + } + + public void addHandler(Handler handler) { + server.addHandler(handler); + } + + @AfterClass + public static void afterClass() throws Exception { + server.shutdownServer(); + } + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(ExecuteSparkInteractive.class); + + final StandardSSLContextService sslService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslService, sslProperties); + runner.enableControllerService(sslService); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + LivySessionController livyControllerService = new LivySessionController(); + runner.addControllerService("livyCS", livyControllerService); + runner.setProperty(livyControllerService, LivySessionController.LIVY_HOST, url.substring(url.indexOf("://") + 3, url.lastIndexOf(":"))); + runner.setProperty(livyControllerService, LivySessionController.LIVY_PORT, url.substring(url.lastIndexOf(":") + 1)); + runner.setProperty(livyControllerService, LivySessionController.SSL_CONTEXT_SERVICE, "ssl-context"); + runner.enableControllerService(livyControllerService); + + runner.setProperty(ExecuteSparkInteractive.LIVY_CONTROLLER_SERVICE, "livyCS"); + + server.clearHandlers(); + } + + @After + public void after() { + runner.shutdown(); + } + + private static TestServer createServer() throws IOException { + return new TestServer(sslProperties); + } + + @Test + public void testSslSparkSession() throws Exception { + addHandler(new LivyAPIHandler()); + + runner.enqueue("print \"hello world\""); + runner.run(); + List<MockFlowFile> waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); + while (!waitingFlowfiles.isEmpty()) { + Thread.sleep(1000); + runner.clearTransferState(); + runner.enqueue("print \"hello world\""); + runner.run(); + waitingFlowfiles = runner.getFlowFilesForRelationship(ExecuteSparkInteractive.REL_WAIT); + } + runner.assertTransferCount(ExecuteSparkInteractive.REL_SUCCESS, 1); + } + + private static Map<String, String> createSslProperties() { + final Map<String, String> map = new HashMap<>(); + map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return map; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java new file mode 100644 index 0000000..396df07 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/java/org/apache/nifi/processors/livy/TestServer.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.livy; + +import org.apache.nifi.ssl.StandardSSLContextService; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import java.util.Map; + +/** + * Test server to assist with unit tests that requires a server to be stood up. + */ +/** + * Test server to assist with unit tests that requires a server to be stood up. + */ +public class TestServer { + + public static final String NEED_CLIENT_AUTH = "clientAuth"; + + private Server jetty; + private boolean secure = false; + + /** + * Creates the test server. + */ + public TestServer() { + createServer(null); + } + + /** + * Creates the test server. + * + * @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties. + */ + public TestServer(final Map<String, String> sslProperties) { + createServer(sslProperties); + } + + private void createServer(final Map<String, String> sslProperties) { + jetty = new Server(); + + // create the unsecure connector + createConnector(); + + // create the secure connector if sslProperties are specified + if (sslProperties != null) { + createSecureConnector(sslProperties); + } + + jetty.setHandler(new HandlerCollection(true)); + } + + /** + * Creates the http connection + */ + private void createConnector() { + final ServerConnector http = new ServerConnector(jetty); + http.setPort(0); + // Severely taxed environments may have significant delays when executing. + http.setIdleTimeout(30000L); + jetty.addConnector(http); + } + + private void createSecureConnector(final Map<String, String> sslProperties) { + SslContextFactory ssl = new SslContextFactory(); + + if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) { + ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName())); + ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName())); + ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + } + + if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { + ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName())); + ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName())); + ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if (clientAuth == null) { + ssl.setNeedClientAuth(true); + } else { + ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth)); + } + + // build the connector + final ServerConnector https = new ServerConnector(jetty, ssl); + + // set host and port + https.setPort(0); + // Severely taxed environments may have significant delays when executing. + https.setIdleTimeout(30000L); + + // add the connector + jetty.addConnector(https); + + // mark secure as enabled + secure = true; + } + + public void clearHandlers() { + HandlerCollection hc = (HandlerCollection) jetty.getHandler(); + Handler[] ha = hc.getHandlers(); + if (ha != null) { + for (Handler h : ha) { + hc.removeHandler(h); + } + } + } + + public void addHandler(Handler handler) { + ((HandlerCollection) jetty.getHandler()).addHandler(handler); + } + + public void startServer() throws Exception { + jetty.start(); + } + + public void shutdownServer() throws Exception { + jetty.stop(); + jetty.destroy(); + } + + private int getPort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); + } + + private int getSecurePort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort(); + } + + public String getUrl() { + return "http://localhost:" + getPort(); + } + + public String getSecureUrl() { + String url = null; + if (secure) { + url = "https://localhost:" + getSecurePort(); + } + return url; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000..df36197 Binary files /dev/null and b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks new file mode 100755 index 0000000..7824378 Binary files /dev/null and b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/test/resources/localhost-ts.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/nifi-spark-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-spark-bundle/pom.xml b/nifi-nar-bundles/nifi-spark-bundle/pom.xml new file mode 100644 index 0000000..91b0ce8 --- /dev/null +++ b/nifi-nar-bundles/nifi-spark-bundle/pom.xml @@ -0,0 +1,85 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-spark-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>nifi-livy-nar</module> + <module>nifi-livy-controller-service-api-nar</module> + <module>nifi-livy-controller-service-api</module> + <module>nifi-livy-controller-service</module> + <module>nifi-livy-processors</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20160810</version> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + <version>1.3.8</version> + </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>1.10</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.19.1</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 0462e72..f462883 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -89,6 +89,7 @@ <module>nifi-extension-utils</module> <module>nifi-redis-bundle</module> <module>nifi-metrics-reporting-bundle</module> + <module>nifi-spark-bundle</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/nifi/blob/2192138b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 11caa40..3f573b6 100644 --- a/pom.xml +++ b/pom.xml @@ -1462,7 +1462,19 @@ <version>1.5.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-livy-controller-service-api-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-livy-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>1.5.0-SNAPSHOT</version>
