Repository: incubator-nifi
Updated Branches:
  refs/heads/http-processors c53b0f9d1 -> 7c9905418


NIFI-221: Finished initial implementation of http procs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7c990541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7c990541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7c990541

Branch: refs/heads/http-processors
Commit: 7c99054183428b0a696cdf16be46267089f9faf4
Parents: c53b0f9
Author: Mark Payne <marka...@hotmail.com>
Authored: Sun Mar 1 14:31:26 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Sun Mar 1 14:31:26 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/HandleHttpRequest.java  |  66 +++++--
 .../standard/TestHandleHttpRequest.java         | 127 ++++++++++++++
 .../standard/TestHandleHttpResponse.java        | 172 +++++++++++++++++++
 3 files changed, 353 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 98b5fdd..e72df7d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLDecoder;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -73,6 +75,7 @@ import com.sun.jersey.api.client.ClientResponse.Status;
 @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. 
For each request, creates a FlowFile and transfers to 'success'. This Processor 
is designed to be used in conjunction with the HandleHttpResponse Processor in 
order to create a Web Service")
 public class HandleHttpRequest extends AbstractProcessor {
     public static final String HTTP_CONTEXT_ID = "http.context.identifier";
+    private static final Pattern URL_QUERY_PARAM_DELIMITER = 
Pattern.compile("&");
     
     // Allowable values for client auth
     public static final AllowableValue CLIENT_NONE = new AllowableValue("No 
Authentication", "Processor will not authenticate clients. Anyone can 
communicate with this Processor anonymously");
@@ -107,6 +110,13 @@ public class HandleHttpRequest extends AbstractProcessor {
         .required(false)
         .identifiesControllerService(SSLContextService.class)
         .build();
+    public static final PropertyDescriptor URL_CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("URL Character Set")
+        .description("The character set to use for decoding URL parameters")
+        .required(true)
+        .defaultValue("UTF-8")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PATH_REGEX = new 
PropertyDescriptor.Builder()
         .name("Allowed Paths")
         .description("A Regular Expression that specifies the valid HTTP Paths 
that are allowed in the incoming URL Requests. If this value is specified and 
the path of the HTTP Requests does not match this Regular Expression, the 
Processor will respond with a 404: NotFound")
@@ -189,6 +199,7 @@ public class HandleHttpRequest extends AbstractProcessor {
         descriptors.add(SSL_CONTEXT);
         descriptors.add(HTTP_CONTEXT_MAP);
         descriptors.add(PATH_REGEX);
+        descriptors.add(URL_CHARACTER_SET);
         descriptors.add(ALLOW_GET);
         descriptors.add(ALLOW_POST);
         descriptors.add(ALLOW_PUT);
@@ -360,7 +371,7 @@ public class HandleHttpRequest extends AbstractProcessor {
     protected int getPort() {
         for ( final Connector connector : server.getConnectors() ) {
             if ( connector instanceof ServerConnector ) {
-                return ((ServerConnector) connector).getPort();
+                return ((ServerConnector) connector).getLocalPort();
             }
         }
         
@@ -421,19 +432,50 @@ public class HandleHttpRequest extends AbstractProcessor {
             return;
         }
         
+        final String charset = 
context.getProperty(URL_CHARACTER_SET).getValue();
+        
         final String contextIdentifier = UUID.randomUUID().toString();
         final Map<String, String> attributes = new HashMap<>();
-        putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
-        putAttribute(attributes, "mime.type", request.getContentType());
-        putAttribute(attributes, "http.servlet.path", 
request.getServletPath());
-        putAttribute(attributes, "http.context.path", 
request.getContextPath());
-        putAttribute(attributes, "http.method", request.getMethod());
-        putAttribute(attributes, "http.query.string", 
request.getQueryString());
-        putAttribute(attributes, "http.remote.host", request.getRemoteHost());
-        putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
-        putAttribute(attributes, "http.remote.user", request.getRemoteUser());
-        putAttribute(attributes, "http.request.uri", request.getRequestURI());
-        putAttribute(attributes, "http.auth.type", request.getAuthType());
+        try {
+            putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
+            putAttribute(attributes, "mime.type", request.getContentType());
+            putAttribute(attributes, "http.servlet.path", 
request.getServletPath());
+            putAttribute(attributes, "http.context.path", 
request.getContextPath());
+            putAttribute(attributes, "http.method", request.getMethod());
+            if ( request.getQueryString() != null ) {
+                putAttribute(attributes, "http.query.string", 
URLDecoder.decode(request.getQueryString(), charset));
+            }
+            putAttribute(attributes, "http.remote.host", 
request.getRemoteHost());
+            putAttribute(attributes, "http.remote.addr", 
request.getRemoteAddr());
+            putAttribute(attributes, "http.remote.user", 
request.getRemoteUser());
+            putAttribute(attributes, "http.request.uri", 
request.getRequestURI());
+            putAttribute(attributes, "http.auth.type", request.getAuthType());
+            
+            final String queryString = request.getQueryString();
+            if ( queryString != null ) {
+                final String[] params = 
URL_QUERY_PARAM_DELIMITER.split(queryString);
+                for ( final String keyValueString : params ) {
+                    final int indexOf = keyValueString.indexOf("=");
+                    if ( indexOf < 0 ) {
+                        // no =, then it's just a key with no value
+                        attributes.put("http.query.param." + 
URLDecoder.decode(keyValueString, charset), "");
+                    } else {
+                        final String key = keyValueString.substring(0, 
indexOf);
+                        final String value;
+                        
+                        if ( indexOf == keyValueString.length() - 1 ) {
+                            value = "";
+                        } else {
+                            value = keyValueString.substring(indexOf + 1);
+                        }
+                        
+                        attributes.put("http.query.param." + 
URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
+                    }
+                }
+            }
+        } catch (final UnsupportedEncodingException uee) {
+            throw new ProcessException("Invalid character encoding", uee);  // 
won't happen because charset has been validated
+        }
         
         final Enumeration<String> headerNames = request.getHeaderNames();
         while ( headerNames.hasMoreElements() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
new file mode 100644
index 0000000..85f35e2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHandleHttpRequest {
+
+    @Test
+    public void testRequestAddedToService() throws InitializationException, 
MalformedURLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(HandleHttpRequest.class);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
+        
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, 
"http-context-map");
+        
+        // trigger processor to stop but not shutdown.
+        runner.run(1, false);
+        try {
+            final Thread httpThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        final int port = ((HandleHttpRequest) 
runner.getProcessor()).getPort();
+                        final HttpURLConnection connection = 
(HttpURLConnection) new URL("http://localhost:"; + port + 
"/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+                        connection.setDoOutput(false);
+                        connection.setRequestMethod("GET");
+                        connection.setRequestProperty("header1", "value1");
+                        connection.setRequestProperty("header2", "");
+                        connection.setRequestProperty("header3", 
"apple=orange");
+                        connection.setConnectTimeout(3000);
+                        connection.setReadTimeout(3000);
+                        
+                        StreamUtils.copy(connection.getInputStream(), new 
NullOutputStream());
+                    } catch (final Throwable t) {
+                        t.printStackTrace();
+                        Assert.fail(t.toString());
+                    }
+                }
+            });
+            httpThread.start();
+            
+            try { Thread.sleep(100L); } catch (final InterruptedException ie) 
{}
+            
+            // process the request.
+            runner.run(1, false);
+            
+            
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+            assertEquals(1, contextMap.size());
+            
+            final MockFlowFile mff = 
runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+            mff.assertAttributeEquals("http.query.param.query", "true");
+            mff.assertAttributeEquals("http.query.param.value1", "value1");
+            mff.assertAttributeEquals("http.query.param.value2", "");
+            mff.assertAttributeEquals("http.query.param.value3", "");
+            mff.assertAttributeEquals("http.query.param.value4", 
"apple=orange");
+            mff.assertAttributeEquals("http.headers.header1", "value1");
+            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+        } finally {
+            // shut down the server
+            runner.run(1, true);
+        }
+    }
+    
+    
+    private static class MockHttpContextMap extends AbstractControllerService 
implements HttpContextMap {
+        private final ConcurrentMap<String, HttpServletResponse> responseMap = 
new ConcurrentHashMap<>();
+        
+        @Override
+        public boolean register(String identifier, HttpServletRequest request, 
HttpServletResponse response, AsyncContext context) {
+            responseMap.put(identifier, response);
+            return true;
+        }
+
+        @Override
+        public HttpServletResponse getResponse(String identifier) {
+            return responseMap.get(identifier);
+        }
+
+        @Override
+        public void complete(String identifier) {
+            responseMap.remove(identifier);
+        }
+        
+        public int size() {
+            return responseMap.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
new file mode 100644
index 0000000..7b41809
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestHandleHttpResponse {
+
+    @Test
+    public void testEnsureCompleted() throws InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(HandleHttpResponse.class);
+        
+        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id");
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, 
"http-context-map");
+        runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
+        runner.setProperty("my-attr", "${my-attr}");
+        runner.setProperty("no-valid-attr", "${no-valid-attr}");
+        
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
+        attributes.put("my-attr", "hello");
+        attributes.put("status.code", "201");
+        
+        runner.enqueue("hello".getBytes(), attributes);
+        
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 
1);
+        
+        assertEquals("hello", contextMap.baos.toString());
+        assertEquals("hello", contextMap.headersSent.get("my-attr"));
+        assertNull(contextMap.headersSent.get("no-valid-attr"));
+        assertEquals(201, contextMap.statusCode);
+        assertEquals(1, contextMap.getCompletionCount());
+        assertTrue(contextMap.headersWithNoValue.isEmpty());
+    }
+    
+    
+    private static class MockHttpContextMap extends AbstractControllerService 
implements HttpContextMap {
+        private final String id;
+        private final AtomicInteger completedCount = new AtomicInteger(0);
+        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        private final ConcurrentMap<String, String> headersSent = new 
ConcurrentHashMap<>();
+        private volatile int statusCode = -1;
+        
+        private final List<String> headersWithNoValue = new 
CopyOnWriteArrayList<>();
+        
+        public MockHttpContextMap(final String expectedIdentifier) {
+            this.id = expectedIdentifier;
+        }
+        
+        @Override
+        public boolean register(String identifier, HttpServletRequest request, 
HttpServletResponse response, AsyncContext context) {
+            return true;
+        }
+
+        @Override
+        public HttpServletResponse getResponse(final String identifier) {
+            if ( !id.equals(identifier) ) {
+                Assert.fail("attempting to respond to wrong request; should 
have been " + id + " but was " + identifier);
+            }
+            
+            try {
+                final HttpServletResponse response = 
Mockito.mock(HttpServletResponse.class);
+                Mockito.when(response.getOutputStream()).thenReturn(new 
ServletOutputStream() {
+                    @Override
+                    public boolean isReady() { return true; }
+
+                    @Override
+                    public void setWriteListener(WriteListener writeListener) 
{}
+
+                    @Override
+                    public void write(int b) throws IOException { 
baos.write(b); }
+                    
+                    @Override
+                    public void write(byte[] b) throws IOException { 
baos.write(b); }
+                    
+                    @Override
+                    public void write(byte[] b, int off, int len) throws 
IOException { baos.write(b, off, len); }
+                });
+                
+                
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) 
throws Throwable {
+                        final String key = invocation.getArgumentAt(0, 
String.class);
+                        final String value = invocation.getArgumentAt(1, 
String.class);
+                        if ( value == null ) {
+                            headersWithNoValue.add(key);
+                        } else {
+                            headersSent.put(key, value);
+                        }
+                        
+                        return null;
+                    }
+                }).when(response).setHeader(Mockito.any(String.class), 
Mockito.any(String.class));
+                
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) 
throws Throwable {
+                        statusCode = invocation.getArgumentAt(0, int.class);
+                        return null;
+                    }
+                }).when(response).setStatus(Mockito.anyInt());
+                
+                return response;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+                return null;
+            }
+        }
+
+        @Override
+        public void complete(final String identifier) {
+            if ( !id.equals(identifier) ) {
+                Assert.fail("attempting to respond to wrong request; should 
have been " + id + " but was " + identifier);
+            }
+            
+            completedCount.incrementAndGet();
+        }
+        
+        public int getCompletionCount() {
+            return completedCount.get();
+        }
+    }
+}

Reply via email to