Updated Branches: refs/heads/flume-1.5 eb99bb41e -> e272d1129
FLUME-2198. Avro Source should disable itself if ipFilterRules contains invalid rules (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e272d112 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e272d112 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e272d112 Branch: refs/heads/flume-1.5 Commit: e272d11295c55abab76412f28a80be00ef69c04e Parents: eb99bb4 Author: Mike Percy <[email protected]> Authored: Fri Sep 27 21:48:05 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 27 21:49:17 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/source/AvroSource.java | 125 +++++++-------- .../org/apache/flume/source/TestAvroSource.java | 151 +++++++++++-------- 2 files changed, 141 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e272d112/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index f6e4cfe..c1ee3a9 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -59,6 +59,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.codec.compression.ZlibDecoder; import org.jboss.netty.handler.codec.compression.ZlibEncoder; +import org.jboss.netty.handler.ipfilter.IpFilterRule; import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler; import org.jboss.netty.handler.ipfilter.PatternRule; import org.jboss.netty.handler.ssl.SslHandler; @@ -153,6 +154,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private int maxThreads; private ScheduledExecutorService connectionCountUpdater; + private List<IpFilterRule> rules; + @Override public void configure(Context context) { Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY); @@ -191,11 +194,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, if (enableIpFilter) { patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY); if (patternRuleConfigDefinition == null || - patternRuleConfigDefinition.isEmpty()) { + patternRuleConfigDefinition.trim().isEmpty()) { throw new FlumeException( "ipFilter is configured with true but ipFilterRules is not defined:" + " "); } + String[] patternRuleDefinitions = patternRuleConfigDefinition.split( + ","); + rules = new ArrayList<IpFilterRule>(patternRuleDefinitions.length); + for (String patternRuleDefinition : patternRuleDefinitions) { + rules.add(generateRule(patternRuleDefinition)); + } } if (sourceCounter == null) { @@ -369,11 +378,53 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, return Status.OK; } + private PatternRule generateRule( + String patternRuleDefinition) throws FlumeException { + patternRuleDefinition = patternRuleDefinition.trim(); + //first validate the format + int firstColonIndex = patternRuleDefinition.indexOf(":"); + if (firstColonIndex == -1) { + throw new FlumeException( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:<pattern>"); + } else { + String ruleAccessFlag = patternRuleDefinition.substring(0, + firstColonIndex); + int secondColonIndex = patternRuleDefinition.indexOf(":", + firstColonIndex + 1); + if ((!ruleAccessFlag.equals("allow") && + !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) { + throw new FlumeException( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:<pattern>"); + } + + String patternTypeFlag = patternRuleDefinition.substring( + firstColonIndex + 1, secondColonIndex); + if ((!patternTypeFlag.equals("ip") && + !patternTypeFlag.equals("name"))) { + throw new FlumeException( + "Invalid ipFilter patternRule '" + patternRuleDefinition + + "' should look like <'allow' or 'deny'>:<'ip' or " + + "'name'>:<pattern>"); + } + + boolean isAllow = ruleAccessFlag.equals("allow"); + String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n") + + ":" + patternRuleDefinition.substring(secondColonIndex + 1); + logger.info("Adding ipFilter PatternRule: " + + (isAllow ? "Allow" : "deny") + " " + patternRuleString); + return new PatternRule(isAllow, patternRuleString); + } + } + /** * Factory of SSL-enabled server worker channel pipelines * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test */ - private static class AdvancedChannelPipelineFactory + private class AdvancedChannelPipelineFactory implements ChannelPipelineFactory { private boolean enableCompression; @@ -448,23 +499,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, logger.info("Setting up ipFilter with the following rule definition: " + patternRuleConfigDefinition); IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler(); - - if (patternRuleConfigDefinition != null && - !patternRuleConfigDefinition.isEmpty()) { - String[] patternRuleDefinitions = patternRuleConfigDefinition.split( - ","); - for (String patternRuleDefinition : patternRuleDefinitions) { - - PatternRule patternRule - = PatternRuleBuilder.withConfigRuleDefinition( - patternRuleDefinition); - - if (patternRule != null) { - ipFilterHandler.add(patternRule); - } - } - } - + ipFilterHandler.addAll(rules); logger.info( "Adding ipFilter with " + ipFilterHandler.size() + " rules"); @@ -473,57 +508,5 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, return pipeline; } - - public static class PatternRuleBuilder { - public static PatternRule withConfigRuleDefinition( - String patternRuleDefinition) throws FlumeException { - patternRuleDefinition = patternRuleDefinition.trim(); - //first validation the format - - int firstColonIndex = patternRuleDefinition.indexOf(":"); - if (firstColonIndex == -1) { - logger.error( - "Invalid ipFilter patternRule '" + patternRuleDefinition + - "' should look like <'allow' or 'deny'>:<'ip' or " + - "'name'>:<pattern>"); - return null; - } else { - - String ruleAccessFlag = patternRuleDefinition.substring(0, - firstColonIndex); - int secondColonIndex = patternRuleDefinition.indexOf(":", - firstColonIndex + 1); - if ((!ruleAccessFlag.equals("allow") && - !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) { - logger.error( - "Invalid ipFilter patternRule '" + patternRuleDefinition + - "' should look like <'allow' or 'deny'>:<'ip' or " + - "'name'>:<pattern>"); - return null; - } - - String patternTypeFlag = patternRuleDefinition.substring( - firstColonIndex + 1, secondColonIndex); - if ((!patternTypeFlag.equals("ip") && - !patternTypeFlag.equals("name"))) { - logger.error( - "Invalid ipFilter patternRule '" + patternRuleDefinition + - "' should look like <'allow' or 'deny'>:<'ip' or " + - "'name'>:<pattern>"); - return null; - } - - boolean isAllow = ruleAccessFlag.equals("allow"); - String patternRuleString = - (patternTypeFlag.equals("ip") ? "i" : "n") + ":" + - patternRuleDefinition.substring(secondColonIndex + 1); - logger.info("Adding ipFilter PatternRule: " - + (isAllow ? "Allow" : "deny") + - " " + patternRuleString); - return new PatternRule(isAllow, patternRuleString); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/flume/blob/e272d112/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index e208fff..c75d098 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -21,13 +21,14 @@ package org.apache.flume.source; import java.io.IOException; import java.net.Inet4Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -72,9 +73,11 @@ public class TestAvroSource { private int selectedPort; private AvroSource source; private Channel channel; + private InetAddress localhost; @Before - public void setUp() { + public void setUp() throws UnknownHostException { + localhost = InetAddress.getByName("127.0.0.1"); source = new AvroSource(); channel = new MemoryChannel(); @@ -383,65 +386,84 @@ public class TestAvroSource { } @Test - public void testValidIpFilterAllows() throws InterruptedException, IOException { - - doIpFilterTest("allow:name:localhost,deny:ip:*", true, false); - doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",deny:ip:*", true, false); - doIpFilterTest("allow:ip:*", true, false); - doIpFilterTest("allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,deny:ip:*", true, false); - doIpFilterTest("allow:ip:127.0.0.2,allow:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,deny:ip:*", true, false); - - doIpFilterTest("allow:name:localhost,deny:ip:*", true, true); - doIpFilterTest("allow:ip:*", true, true); - + public void testValidIpFilterAllows() + throws InterruptedException, IOException { + doIpFilterTest(localhost, "allow:name:localhost,deny:ip:*", true, false); + doIpFilterTest(localhost, "allow:ip:" + localhost.getHostAddress() + + ",deny:ip:*", true, false); + doIpFilterTest(localhost, "allow:ip:*", true, false); + doIpFilterTest(localhost, "allow:ip:" + + localhost.getHostAddress().substring(0, 3) + + "*,deny:ip:*", true, false); + doIpFilterTest(localhost, "allow:ip:127.0.0.2,allow:ip:" + + localhost.getHostAddress().substring(0, 3) + + "*,deny:ip:*", true, false); + doIpFilterTest(localhost, "allow:name:localhost,deny:ip:*", true, true); + doIpFilterTest(localhost, "allow:ip:*", true, true); } @Test - public void testValidIpFilterDenys() throws InterruptedException, IOException { - - doIpFilterTest("deny:ip:*", false, false); - doIpFilterTest("deny:name:localhost", false, false); - doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + ",allow:ip:*", false, false); - doIpFilterTest("deny:ip:*", false, false); - doIpFilterTest("allow:ip:45.2.2.2,deny:ip:*", false, false); - doIpFilterTest("deny:ip:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,allow:ip:*", false, false); - - - doIpFilterTest("deny:ip:*", false, true); + public void testValidIpFilterDenys() + throws InterruptedException, IOException { + doIpFilterTest(localhost, "deny:ip:*", false, false); + doIpFilterTest(localhost, "deny:name:localhost", false, false); + doIpFilterTest(localhost, "deny:ip:" + localhost.getHostAddress() + + ",allow:ip:*", false, false); + doIpFilterTest(localhost, "deny:ip:*", false, false); + doIpFilterTest(localhost, "allow:ip:45.2.2.2,deny:ip:*", false, false); + doIpFilterTest(localhost, "deny:ip:" + + localhost.getHostAddress().substring(0, 3) + + "*,allow:ip:*", false, false); + doIpFilterTest(localhost, "deny:ip:*", false, true); } @Test public void testInvalidIpFilter() throws InterruptedException, IOException { - - doIpFilterTest("deny:ip?*", true, false); - doIpFilterTest("deny?name:localhost", true, false); - doIpFilterTest("deny:ip:127.0.0.2,allow:ip?*,deny:ip:" + Inet4Address.getLocalHost().getHostAddress() + "", false, false); - doIpFilterTest("deny:*", true, false); - doIpFilterTest("deny:id:" + Inet4Address.getLocalHost().getHostAddress().substring(0, 3) + "*,allow:ip:*", true, false); + doIpFilterTest(localhost, "deny:ip:*", false, false); + doIpFilterTest(localhost, "allow:name:localhost", true, false); + doIpFilterTest(localhost, "deny:ip:127.0.0.2,allow:ip:*,deny:ip:" + + localhost.getHostAddress(), true, false); + doIpFilterTest(localhost, "deny:ip:" + + localhost.getHostAddress().substring(0, 3) + "*,allow:ip:*", + false, false); try { - doIpFilterTest(null, true, false); - Assert.fail("The null ipFilterRules config should had thrown an exception."); + doIpFilterTest(localhost, null, false, false); + Assert.fail( + "The null ipFilterRules config should have thrown an exception."); } catch (FlumeException e) { //Do nothing } - try{ - doIpFilterTest("", true, false); - Assert.fail("The empty string ipFilterRules config should had thrown an exception."); - } catch (FlumeException e) { + try { + doIpFilterTest(localhost, "", true, false); + Assert.fail("The empty string ipFilterRules config should have thrown " + + "an exception"); + } catch (FlumeException e) { //Do nothing } - + try { + doIpFilterTest(localhost, "homer:ip:45.4.23.1", true, false); + Assert.fail("Bad ipFilterRules config should have thrown an exception."); + } catch (FlumeException e) { + //Do nothing + } + try { + doIpFilterTest(localhost, "allow:sleeps:45.4.23.1", true, false); + Assert.fail("Bad ipFilterRules config should have thrown an exception."); + } catch (FlumeException e) { + //Do nothing + } } - public void doIpFilterTest(String ruleDefinition, boolean eventShouldBeAllowed, boolean testWithSSL) throws InterruptedException, IOException { + public void doIpFilterTest(InetAddress dest, String ruleDefinition, + boolean eventShouldBeAllowed, boolean testWithSSL) + throws InterruptedException, IOException { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { try { Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); context.put("bind", "0.0.0.0"); context.put("ipFilter", "true"); @@ -476,34 +498,41 @@ public class TestAvroSource { source.getLifecycleState()); AvroSourceProtocol client; - NettyTransceiver nettyTransceiver; - - if (testWithSSL) { - nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort), new SSLChannelFactory()); - client = SpecificRequestor.getClient( - AvroSourceProtocol.class, nettyTransceiver ); - } else { - nettyTransceiver = new NettyTransceiver(new InetSocketAddress(selectedPort)); - client = SpecificRequestor.getClient( + NettyTransceiver nettyTransceiver = null; + try { + if (testWithSSL) { + nettyTransceiver = new NettyTransceiver( + new InetSocketAddress (dest, selectedPort), + new SSLChannelFactory()); + client = SpecificRequestor.getClient( AvroSourceProtocol.class, nettyTransceiver); - } + } else { + nettyTransceiver = new NettyTransceiver( + new InetSocketAddress (dest, selectedPort)); + client = SpecificRequestor.getClient( + AvroSourceProtocol.class, nettyTransceiver); + } - AvroFlumeEvent avroEvent = new AvroFlumeEvent(); - avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>()); - avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes())); + AvroFlumeEvent avroEvent = new AvroFlumeEvent(); + avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>()); + avroEvent.setBody(ByteBuffer.wrap("Hello avro ipFilter".getBytes())); - try { logger.info("Client about to append"); Status status = client.append(avroEvent); logger.info("Client appended"); Assert.assertEquals(Status.OK, status); - } catch(IOException e) { - Assert.assertTrue("Should have been Allowed:" + ruleDefinition, !eventShouldBeAllowed); + } catch (IOException e) { + Assert.assertTrue("Should have been allowed: " + ruleDefinition, + !eventShouldBeAllowed); return; + } finally { + if (nettyTransceiver != null) { + nettyTransceiver.close(); + } + source.stop(); } - Assert.assertTrue("Should have been denied:" + ruleDefinition, eventShouldBeAllowed); - - + Assert.assertTrue("Should have been denied: " + ruleDefinition, + eventShouldBeAllowed); Transaction transaction = channel.getTransaction(); transaction.begin(); @@ -514,17 +543,11 @@ public class TestAvroSource { new String(event.getBody())); transaction.commit(); transaction.close(); - logger.debug("Round trip event:{}", event); - nettyTransceiver.close(); - source.stop(); Assert.assertTrue("Reached stop or error", LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); Assert.assertEquals("Server is stopped", LifecycleState.STOP, source.getLifecycleState()); - - } - }
