Updated Branches: refs/heads/trunk 5b5470bd5 -> e25661041
FLUME-2109. HTTPS support in HTTP Source. (Ashish Paliwal via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e2566104 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e2566104 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e2566104 Branch: refs/heads/trunk Commit: e25661041c2d478bf27d64d39241a9fce9a0d263 Parents: 5b5470b Author: Hari Shreedharan <[email protected]> Authored: Thu Aug 1 22:46:38 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Aug 1 22:46:38 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/source/http/HTTPSource.java | 56 +++++++++- .../http/HTTPSourceConfigurationConstants.java | 5 + .../flume/source/http/TestHTTPSource.java | 110 ++++++++++++++++++- flume-ng-core/src/test/resources/jettykeystore | Bin 0 -> 1355 bytes flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 + 5 files changed, 165 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java index c90f067..84ee33b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java @@ -29,6 +29,7 @@ import org.apache.flume.source.AbstractSource; import org.mortbay.jetty.Connector; import org.mortbay.jetty.Server; import org.mortbay.jetty.bio.SocketConnector; +import org.mortbay.jetty.security.SslSocketConnector; import org.mortbay.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,16 +89,46 @@ public class HTTPSource extends AbstractSource implements private HTTPSourceHandler handler; private SourceCounter sourceCounter; + // SSL configuration variable + private volatile Integer sslPort; + private volatile String keyStorePath; + private volatile String keyStorePassword; + private volatile Boolean sslEnabled; + + @Override public void configure(Context context) { try { + // SSL related config + sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false); + port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT); host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND, HTTPSourceConfigurationConstants.DEFAULT_BIND); - checkHostAndPort(); + + Preconditions.checkState(host != null && !host.isEmpty(), + "HTTPSource hostname specified is empty"); + // verify port only if its not ssl + if(!sslEnabled) { + Preconditions.checkNotNull(port, "HTTPSource requires a port number to be" + + " specified"); + } + String handlerClassName = context.getString( HTTPSourceConfigurationConstants.CONFIG_HANDLER, HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim(); + + if(sslEnabled) { + LOG.debug("SSL configuration enabled"); + sslPort = context.getInteger(HTTPSourceConfigurationConstants.SSL_PORT); + Preconditions.checkArgument(sslPort != null && sslPort > 0, "SSL Port cannot be null or less than 0" ); + keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE); + Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(), + "Keystore is required for SSL Conifguration" ); + keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD); + Preconditions.checkArgument(keyStorePassword != null, "Keystore password is required for SSL Configuration"); + } + @SuppressWarnings("unchecked") Class<? extends HTTPSourceHandler> clazz = (Class<? extends HTTPSourceHandler>) @@ -139,10 +170,25 @@ public class HTTPSource extends AbstractSource implements + " before I started one." + "Will not attempt to start."); srv = new Server(); - SocketConnector connector = new SocketConnector(); - connector.setPort(port); - connector.setHost(host); - srv.setConnectors(new Connector[] { connector }); + + // Connector Array + Connector[] connectors = new Connector[1]; + + + if(sslEnabled) { + SslSocketConnector sslSocketConnector = new SslSocketConnector(); + sslSocketConnector.setKeystore(keyStorePath); + sslSocketConnector.setKeyPassword(keyStorePassword); + sslSocketConnector.setPort(sslPort); + connectors[0] = sslSocketConnector; + } else { + SocketConnector connector = new SocketConnector(); + connector.setPort(port); + connector.setHost(host); + connectors[0] = connector; + } + + srv.setConnectors(connectors); try { org.mortbay.jetty.servlet.Context root = new org.mortbay.jetty.servlet.Context( http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java index f547e0f..205aeab 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java @@ -34,4 +34,9 @@ public class HTTPSourceConfigurationConstants { public static final String DEFAULT_HANDLER = "org.apache.flume.source.http.JSONHandler"; + public static final String SSL_PORT = "sslPort"; + public static final String SSL_KEYSTORE = "keystore"; + public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword"; + public static final String SSL_ENABLED = "enableSSL"; + } http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java index 8952db3..6c9fd86 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java @@ -22,11 +22,7 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import junit.framework.Assert; -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Transaction; +import org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; @@ -34,6 +30,7 @@ import org.apache.flume.conf.Configurables; import org.apache.flume.event.JSONEvent; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.SSLSocketFactory; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; import org.junit.AfterClass; @@ -41,10 +38,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import javax.net.ssl.*; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.lang.reflect.Type; import java.net.ServerSocket; +import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -58,8 +59,12 @@ import static org.fest.reflect.core.Reflection.field; public class TestHTTPSource { private static HTTPSource source; + private static HTTPSource httpsSource; +// private static Channel httpsChannel; + private static Channel channel; private static int selectedPort; + private static int sslPort; DefaultHttpClient httpClient; HttpPost postRequest; @@ -77,9 +82,13 @@ public class TestHTTPSource { source = new HTTPSource(); channel = new MemoryChannel(); + httpsSource = new HTTPSource(); +// httpsChannel = new MemoryChannel(); + Context ctx = new Context(); ctx.put("capacity", "100"); Configurables.configure(channel, ctx); +// Configurables.configure(httpsChannel, ctx); List<Channel> channels = new ArrayList<Channel>(1); channels.add(channel); @@ -90,19 +99,43 @@ public class TestHTTPSource { source.setChannelProcessor(new ChannelProcessor(rcs)); channel.start(); + + // Channel for HTTPS source +// List<Channel> sslChannels = new ArrayList<Channel>(1); +// channels.add(httpsChannel); +// +// ChannelSelector sslRcs = new ReplicatingChannelSelector(); +// rcs.setChannels(sslChannels); + + httpsSource.setChannelProcessor(new ChannelProcessor(rcs)); +// httpsChannel.start(); + + // HTTP context Context context = new Context(); context.put("port", String.valueOf(selectedPort)); context.put("host", "0.0.0.0"); + // SSL context props + Context sslContext = new Context(); + sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); + sslPort = findFreePort(); + sslContext.put(HTTPSourceConfigurationConstants.SSL_PORT, String.valueOf(sslPort)); + sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password"); + sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore"); + Configurables.configure(source, context); + Configurables.configure(httpsSource, sslContext); source.start(); + httpsSource.start(); } @AfterClass public static void tearDownClass() throws Exception { source.stop(); channel.stop(); + httpsSource.stop(); +// httpsChannel.stop(); } @Before @@ -268,6 +301,73 @@ public class TestHTTPSource { return new ResultWrapper(resp, events); } + @Test + public void testHttps() throws Exception { + Type listType = new TypeToken<List<JSONEvent>>() { + }.getType(); + List<JSONEvent> events = Lists.newArrayList(); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + Map<String, String> input = Maps.newHashMap(); + for (int j = 0; j < 10; j++) { + input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); + } + JSONEvent e = new JSONEvent(); + e.setHeaders(input); + e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); + events.add(e); + } + Gson gson = new Gson(); + String json = gson.toJson(events, listType); + HttpsURLConnection httpsURLConnection = null; + try { + TrustManager[] trustAllCerts = {new X509TrustManager() { + @Override + public void checkClientTrusted( + java.security.cert.X509Certificate[] x509Certificates, String s) + throws CertificateException { + // noop + } + + @Override + public void checkServerTrusted( + java.security.cert.X509Certificate[] x509Certificates, String s) + throws CertificateException { + // noop + } + + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + }}; + SSLContext sc = SSLContext.getInstance("SSL"); + + HostnameVerifier hv = new HostnameVerifier() { + public boolean verify(String arg0, SSLSession arg1) { + return true; + } + }; + sc.init(null, trustAllCerts, new SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + HttpsURLConnection.setDefaultHostnameVerifier( + SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + URL sslUrl = new URL("https://0.0.0.0:" + sslPort); + httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection(); + httpsURLConnection.setDoInput(true); + httpsURLConnection.setDoOutput(true); + httpsURLConnection.setRequestMethod("POST"); + httpsURLConnection.getOutputStream().write(json.getBytes()); + + int statusCode = httpsURLConnection.getResponseCode(); + Assert.assertEquals(200, statusCode); + } catch (Exception exception) { + Assert.fail("Exception not expected"); + exception.printStackTrace(); + } finally { + httpsURLConnection.disconnect(); + } + } + private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) throws Exception{ Transaction tx = channel.getTransaction(); http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/resources/jettykeystore ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/resources/jettykeystore b/flume-ng-core/src/test/resources/jettykeystore new file mode 100644 index 0000000..db76bcb Binary files /dev/null and b/flume-ng-core/src/test/resources/jettykeystore differ http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index fb42528..c614991 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1223,6 +1223,10 @@ selector.type replicating replicating or mul selector.* Depends on the selector.type value interceptors -- Space-separated list of interceptors interceptors.* +enableSSL false Set the property true, to enable SSL +sslPort The port to be used for SSL +keystore Location of the keystore includng keystore file name +keystorePassword Keystore password ================================================================================================================================== For example, a http source for agent named a1:
