http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java deleted file mode 100644 index 60bfd5e..0000000 --- a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node.nodemanager; - -import java.util.Map.Entry; - -import org.apache.flume.Channel; -import org.apache.flume.SinkRunner; -import org.apache.flume.SourceRunner; -import org.apache.flume.lifecycle.LifecycleAware; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.lifecycle.LifecycleSupervisor; -import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy; -import org.apache.flume.node.NodeConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.util.Properties; -import java.util.Set; -import org.apache.flume.Context; -import org.apache.flume.instrumentation.MonitorService; -import org.apache.flume.instrumentation.MonitoringType; - - -public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager - implements NodeConfigurationAware { - - private static final Logger logger = LoggerFactory - .getLogger(DefaultLogicalNodeManager.class); - - private LifecycleSupervisor nodeSupervisor; - private LifecycleState lifecycleState; - private NodeConfiguration nodeConfiguration; - - private MonitorService monitorServer; - - public static final String CONF_MONITOR_CLASS = "flume.monitoring.type"; - public static final String CONF_MONITOR_PREFIX = "flume.monitoring."; - - public DefaultLogicalNodeManager() { - nodeSupervisor = new LifecycleSupervisor(); - lifecycleState = LifecycleState.IDLE; - nodeConfiguration = null; - } - - @Override - public void stopAllComponents() { - if (this.nodeConfiguration != null) { - logger.info("Shutting down configuration: {}", this.nodeConfiguration); - for (Entry<String, SourceRunner> entry : this.nodeConfiguration - .getSourceRunners().entrySet()) { - try{ - logger.info("Stopping Source " + entry.getKey()); - nodeSupervisor.unsupervise(entry.getValue()); - } catch (Exception e){ - logger.error("Error while stopping {}", entry.getValue(), e); - } - } - - for (Entry<String, SinkRunner> entry : - this.nodeConfiguration.getSinkRunners().entrySet()) { - try{ - logger.info("Stopping Sink " + entry.getKey()); - nodeSupervisor.unsupervise(entry.getValue()); - } catch (Exception e){ - logger.error("Error while stopping {}", entry.getValue(), e); - } - } - - for (Entry<String, Channel> entry : - this.nodeConfiguration.getChannels().entrySet()) { - try{ - logger.info("Stopping Channel " + entry.getKey()); - nodeSupervisor.unsupervise(entry.getValue()); - } catch (Exception e){ - logger.error("Error while stopping {}", entry.getValue(), e); - } - } - } - if(monitorServer != null) { - monitorServer.stop(); - } - } - - @Override - public void startAllComponents(NodeConfiguration nodeConfiguration) { - logger.info("Starting new configuration:{}", nodeConfiguration); - - this.nodeConfiguration = nodeConfiguration; - - for (Entry<String, Channel> entry : - nodeConfiguration.getChannels().entrySet()) { - try{ - logger.info("Starting Channel " + entry.getKey()); - nodeSupervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - } catch (Exception e){ - logger.error("Error while starting {}", entry.getValue(), e); - } - } - - /* - * Wait for all channels to start. - */ - for(Channel ch: nodeConfiguration.getChannels().values()){ - while(ch.getLifecycleState() != LifecycleState.START - && !nodeSupervisor.isComponentInErrorState(ch)){ - try { - logger.info("Waiting for channel: " + ch.getName() + - " to start. Sleeping for 500 ms"); - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Interrupted while waiting for channel to start.", e); - Throwables.propagate(e); - } - } - } - - for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners() - .entrySet()) { - try{ - logger.info("Starting Sink " + entry.getKey()); - nodeSupervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - } catch (Exception e) { - logger.error("Error while starting {}", entry.getValue(), e); - } - } - - for (Entry<String, SourceRunner> entry : nodeConfiguration - .getSourceRunners().entrySet()) { - try{ - logger.info("Starting Source " + entry.getKey()); - nodeSupervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - } catch (Exception e) { - logger.error("Error while starting {}", entry.getValue(), e); - } - } - - this.loadMonitoring(); - } - - @Override - public boolean add(LifecycleAware node) { - /* - * FIXME: This type of overriding worries me. There should be a better - * separation of addition of nodes and management. (i.e. state vs. function) - */ - Preconditions.checkState(getLifecycleState().equals(LifecycleState.START), - "You can not add nodes to a manager that hasn't been started"); - - if (super.add(node)) { - nodeSupervisor.supervise(node, - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - - return true; - } - - return false; - } - - @Override - public boolean remove(LifecycleAware node) { - /* - * FIXME: This type of overriding worries me. There should be a better - * separation of addition of nodes and management. (i.e. state vs. function) - */ - Preconditions.checkState(getLifecycleState().equals(LifecycleState.START), - "You can not remove nodes from a manager that hasn't been started"); - - if (super.remove(node)) { - nodeSupervisor.unsupervise(node); - - return true; - } - - return false; - } - - @Override - public void start() { - - logger.info("Node manager starting"); - - nodeSupervisor.start(); - - logger.debug("Node manager started"); - - lifecycleState = LifecycleState.START; - } - - @Override - public void stop() { - - logger.info("Node manager stopping"); - - stopAllComponents(); - - nodeSupervisor.stop(); - - logger.debug("Node manager stopped"); - - lifecycleState = LifecycleState.STOP; - } - - @Override - public LifecycleState getLifecycleState() { - return lifecycleState; - } - - private void loadMonitoring() { - Properties systemProps = System.getProperties(); - Set<String> keys = systemProps.stringPropertyNames(); - try { - if (keys.contains(CONF_MONITOR_CLASS)) { - String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS); - Class<? extends MonitorService> klass; - try { - //Is it a known type? - klass = MonitoringType.valueOf( - monitorType.toUpperCase()).getMonitorClass(); - } catch (Exception e) { - //Not a known type, use FQCN - klass = (Class<? extends MonitorService>) Class.forName(monitorType); - } - this.monitorServer = klass.newInstance(); - Context context = new Context(); - for (String key : keys) { - if (key.startsWith(CONF_MONITOR_PREFIX)) { - context.put(key.substring(CONF_MONITOR_PREFIX.length()), - systemProps.getProperty(key)); - } - } - monitorServer.configure(context); - monitorServer.start(); - } - } catch (Exception e) { - logger.warn("Error starting monitoring. " - + "Monitoring might not be available.", e); - } - - } -}
http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java deleted file mode 100644 index c20bf9b..0000000 --- a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node.nodemanager; - -import org.apache.flume.node.NodeConfiguration; - -public interface NodeConfigurationAware { - - /** - * Stop all components currently running. - */ - public void stopAllComponents(); - - /** - * Start components with the configuration provided. - * @param nodeConfiguration - */ - public void startAllComponents(NodeConfiguration nodeConfiguration); - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java deleted file mode 100644 index d43aed6..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.conf.properties; - -import java.io.File; - -import org.apache.flume.channel.DefaultChannelFactory; -import org.apache.flume.node.NodeConfiguration; -import org.apache.flume.node.nodemanager.NodeConfigurationAware; -import org.apache.flume.sink.DefaultSinkFactory; -import org.apache.flume.source.DefaultSourceFactory; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestPropertiesFileConfigurationProvider { - - private static final File TESTFILE = new File( - TestPropertiesFileConfigurationProvider.class.getClassLoader() - .getResource("flume-conf.properties").getFile()); - - @SuppressWarnings("unused") - private static final Logger LOGGER = LoggerFactory - .getLogger(TestPropertiesFileConfigurationProvider.class); - - @Before - public void setUp() throws Exception { - File tmpDir = new File("target/test"); - tmpDir.mkdirs(); - - File derbyLogFile = new File(tmpDir, "derbytest.log"); - String derbyLogFilePath = derbyLogFile.getCanonicalPath(); - - System.setProperty("derby.stream.error.file", derbyLogFilePath); - } - - @Test - public void testPropertyRead() throws Exception { - PropertiesFileConfigurationProvider provider = - new PropertiesFileConfigurationProvider(); - - provider.setNodeName("host1"); - provider.setConfigurationAware(new DummyNodeConfigurationAware()); - - provider.setChannelFactory(new DefaultChannelFactory()); - provider.setSourceFactory(new DefaultSourceFactory()); - provider.setSinkFactory(new DefaultSinkFactory()); - - provider.setFile(TESTFILE); - provider.load(); - } - - private static class DummyNodeConfigurationAware implements - NodeConfigurationAware { - - @Override - public void stopAllComponents(){ - - } - @Override - public void startAllComponents(NodeConfiguration config) { - // no handling necessary - } - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java deleted file mode 100644 index 1cbc269..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Context; -import org.apache.flume.Sink; -import org.apache.flume.SinkProcessor; -import org.apache.flume.SinkRunner; -import org.apache.flume.Source; -import org.apache.flume.SourceRunner; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; -import org.apache.flume.conf.Configurables; -import org.apache.flume.lifecycle.LifecycleAware; -import org.apache.flume.lifecycle.LifecycleController; -import org.apache.flume.lifecycle.LifecycleException; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager; -import org.apache.flume.sink.DefaultSinkProcessor; -import org.apache.flume.sink.NullSink; -import org.apache.flume.source.SequenceGeneratorSource; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestAbstractLogicalNodeManager { - - private static final Logger logger = LoggerFactory - .getLogger(TestAbstractLogicalNodeManager.class); - - private AbstractLogicalNodeManager nodeManager; - - @Before - public void setUp() { - nodeManager = new AbstractLogicalNodeManager() { - - private LifecycleState lifecycleState = LifecycleState.IDLE; - - @Override - public void stop() { - - for (LifecycleAware node : getNodes()) { - node.stop(); - - boolean reached = false; - - try { - reached = LifecycleController.waitForOneOf(node, - LifecycleState.STOP_OR_ERROR); - } catch (InterruptedException e) { - // Do nothing. - } - - if (!reached) { - logger.error( - "Unable to stop logical node:{} This *will* cause failures.", - node); - } - - if (node.getLifecycleState().equals(LifecycleState.ERROR)) { - lifecycleState = LifecycleState.ERROR; - } - } - - lifecycleState = LifecycleState.STOP; - } - - @Override - public void start() { - - for (LifecycleAware node : getNodes()) { - node.start(); - - boolean reached = false; - - try { - reached = LifecycleController.waitForOneOf(node, - LifecycleState.START_OR_ERROR); - } catch (InterruptedException e) { - // Do nothing. - } - - if (!reached) { - logger.error( - "Unable to stop logical node:{} This *will* cause failures.", - node); - } - - if (node.getLifecycleState().equals(LifecycleState.ERROR)) { - lifecycleState = LifecycleState.ERROR; - } - } - - lifecycleState = LifecycleState.START; - } - - @Override - public LifecycleState getLifecycleState() { - return lifecycleState; - } - }; - } - - @Test - public void testEmptyLifecycle() throws LifecycleException, - InterruptedException { - - nodeManager.start(); - boolean reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR); - - Assert.assertTrue(reached); - Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState()); - - nodeManager.stop(); - reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR); - - Assert.assertTrue(reached); - Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState()); - } - - @Test - public void testLifecycle() throws LifecycleException, InterruptedException { - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - - Source generatorSource = new SequenceGeneratorSource(); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(channel); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - generatorSource.setChannelProcessor(new ChannelProcessor(rcs)); - - NullSink nullSink = new NullSink(); - nullSink.configure(new Context()); - nullSink.setChannel(channel); - - nodeManager.add(SourceRunner.forSource(generatorSource)); - SinkProcessor processor = new DefaultSinkProcessor(); - List<Sink> sinks = new ArrayList<Sink>(); - sinks.add(nullSink); - processor.setSinks(sinks); - nodeManager.add(new SinkRunner(processor)); - - nodeManager.start(); - boolean reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR); - - Assert.assertTrue(reached); - Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState()); - - nodeManager.stop(); - reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR); - - Assert.assertTrue(reached); - Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState()); - } - - @Test - public void testRapidLifecycleFlapping() throws LifecycleException, - InterruptedException { - - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - - Source source = new SequenceGeneratorSource(); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(channel); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - - NullSink sink = new NullSink(); - sink.configure(new Context()); - sink.setChannel(channel); - - nodeManager.add(SourceRunner.forSource(source)); - SinkProcessor processor = new DefaultSinkProcessor(); - List<Sink> sinks = new ArrayList<Sink>(); - sinks.add(sink); - processor.setSinks(sinks); - nodeManager.add(new SinkRunner(processor)); - - for (int i = 0; i < 10; i++) { - nodeManager.start(); - boolean reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR); - - Assert.assertTrue(reached); - Assert - .assertEquals(LifecycleState.START, nodeManager.getLifecycleState()); - - nodeManager.stop(); - reached = LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR); - - Assert.assertTrue(reached); - Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState()); - } - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java deleted file mode 100644 index 530b299..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; -import org.apache.flume.lifecycle.LifecycleAware; -import org.apache.flume.lifecycle.LifecycleController; -import org.apache.flume.lifecycle.LifecycleException; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager; -import org.apache.flume.source.PollableSourceRunner; -import org.apache.flume.source.SequenceGeneratorSource; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestDefaultLogicalNodeManager { - - private NodeManager nodeManager; - - @Before - public void setUp() { - nodeManager = new DefaultLogicalNodeManager(); - } - - @Test - public void testLifecycle() throws LifecycleException, InterruptedException { - nodeManager.start(); - Assert.assertTrue("Node manager didn't reach START or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR, 5000)); - - nodeManager.stop(); - Assert.assertTrue("Node manager didn't reach STOP or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR, 5000)); - } - - @Test - public void testLifecycleWithNodes() throws LifecycleException, - InterruptedException { - - nodeManager.start(); - Assert.assertTrue("Node manager didn't reach START or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR, 5000)); - - for (int i = 0; i < 3; i++) { - SequenceGeneratorSource source = new SequenceGeneratorSource(); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(new MemoryChannel()); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - - PollableSourceRunner sourceRunner = new PollableSourceRunner(); - sourceRunner.setSource(source); - - nodeManager.add(sourceRunner); - } - - Thread.sleep(5000); - - nodeManager.stop(); - Assert.assertTrue("Node manager didn't reach STOP or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR, 5000)); - } - - @Test - public void testNodeStartStops() throws LifecycleException, - InterruptedException { - - Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>(); - - for (int i = 0; i < 30; i++) { - SequenceGeneratorSource source = new SequenceGeneratorSource(); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(new MemoryChannel()); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - - PollableSourceRunner sourceRunner = new PollableSourceRunner(); - sourceRunner.setSource(source); - - testNodes.add(sourceRunner); - } - - nodeManager.start(); - Assert.assertTrue("Node manager didn't reach START or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR, 5000)); - - for (LifecycleAware node : testNodes) { - nodeManager.add(node); - } - - Thread.sleep(5000); - - nodeManager.stop(); - Assert.assertTrue("Node manager didn't reach STOP or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR, 5000)); - } - - @Test - public void testErrorNode() throws LifecycleException, InterruptedException { - - Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>(); - - for (int i = 0; i < 30; i++) { - SequenceGeneratorSource source = new SequenceGeneratorSource(); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(new MemoryChannel()); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - - PollableSourceRunner sourceRunner = new PollableSourceRunner(); - sourceRunner.setSource(source); - - testNodes.add(sourceRunner); - } - - nodeManager.start(); - Assert.assertTrue("Node manager didn't reach START or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.START_OR_ERROR, 5000)); - - for (LifecycleAware node : testNodes) { - nodeManager.add(node); - } - - Thread.sleep(5000); - - nodeManager.stop(); - Assert.assertTrue("Node manager didn't reach STOP or ERROR", - LifecycleController.waitForOneOf(nodeManager, - LifecycleState.STOP_OR_ERROR, 5000)); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java deleted file mode 100644 index f2dad6f..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import org.apache.flume.SourceRunner; -import org.apache.flume.lifecycle.LifecycleController; -import org.apache.flume.lifecycle.LifecycleException; -import org.apache.flume.lifecycle.LifecycleState; -import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager; -import org.apache.flume.source.SequenceGeneratorSource; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -public class TestFlumeNode { - - private FlumeNode node; - - @Before - public void setUp() { - node = new FlumeNode(); - - node.setName("test-node"); - node.setNodeManager(new EmptyLogicalNodeManager()); - } - - @Ignore("Fails given recent changes to configuration system") - @Test - public void testLifecycle() throws InterruptedException, LifecycleException { - node.start(); - boolean reached = LifecycleController.waitForOneOf(node, - LifecycleState.START_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.START, node.getLifecycleState()); - - node.stop(); - reached = LifecycleController.waitForOneOf(node, - LifecycleState.STOP_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState()); - } - - @Ignore("Fails given recent changes to configuration system") - @Test - public void testAddNodes() throws InterruptedException, LifecycleException { - node.start(); - boolean reached = LifecycleController.waitForOneOf(node, - LifecycleState.START_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.START, node.getLifecycleState()); - - SourceRunner n1 = SourceRunner.forSource(new SequenceGeneratorSource()); - - node.getNodeManager().add(n1); - - node.stop(); - reached = LifecycleController.waitForOneOf(node, - LifecycleState.STOP_OR_ERROR, 5000); - - Assert.assertTrue("Matched a known state", reached); - Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState()); - } - - public static class EmptyLogicalNodeManager extends - AbstractLogicalNodeManager { - - private LifecycleState lifecycleState; - - public EmptyLogicalNodeManager() { - lifecycleState = LifecycleState.IDLE; - } - - @Override - public void start() { - lifecycleState = LifecycleState.START; - } - - @Override - public void stop() { - lifecycleState = LifecycleState.STOP; - } - - @Override - public LifecycleState getLifecycleState() { - return lifecycleState; - } - - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java deleted file mode 100644 index f759af1..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.node; - -import org.junit.Ignore; -import org.junit.Test; - -@Ignore("Causes blocking with no method for clean shutdown") -public class TestFlumeNodeApplication { - - @Test - public void testApplication() { - String[] args = new String[] {}; - - Application.main(args); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java deleted file mode 100644 index 41e2f35..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.source; - -import org.apache.flume.EventDeliveryException; - -public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource { - - @Override - public Status process() throws EventDeliveryException { - - if (Math.round(Math.random()) == 1) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Do nothing. - } - - throw new EventDeliveryException("I'm broken!"); - } else { - return super.process(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java deleted file mode 100644 index 3c17d3d..0000000 --- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.source; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.nio.channels.Channels; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Lists; -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.FlumeException; -import org.apache.flume.Transaction; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; -import org.apache.flume.conf.Configurables; -import org.apache.flume.lifecycle.LifecycleException; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestNetcatSource { - - private Channel channel; - private EventDrivenSource source; - - private static final Logger logger = - LoggerFactory.getLogger(TestNetcatSource.class); - - @Before - public void setUp() { - logger.info("Running setup"); - - channel = new MemoryChannel(); - source = new NetcatSource(); - - Context context = new Context(); - - Configurables.configure(channel, context); - List<Channel> channels = Lists.newArrayList(channel); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - } - - @Test - public void testLifecycle() throws InterruptedException, LifecycleException, - EventDeliveryException { - - ExecutorService executor = Executors.newFixedThreadPool(3); - boolean bound = false; - - for(int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - context.put("bind", "0.0.0.0"); - context.put("port", "41414"); - - Configurables.configure(source, context); - - source.start(); - bound = true; - } catch (FlumeException e) { - // assume port in use, try another one - } - } - - Runnable clientRequestRunnable = new Runnable() { - - @Override - public void run() { - try { - SocketChannel clientChannel = SocketChannel - .open(new InetSocketAddress(41414)); - - Writer writer = Channels.newWriter(clientChannel, "utf-8"); - BufferedReader reader = new BufferedReader( - Channels.newReader(clientChannel, "utf-8")); - - writer.write("Test message\n"); - writer.flush(); - - String response = reader.readLine(); - Assert.assertEquals("Server should return OK", "OK", response); - clientChannel.close(); - } catch (IOException e) { - logger.error("Caught exception: ", e); - } - } - - }; - - ChannelSelector selector = source.getChannelProcessor().getSelector(); - Transaction tx = selector.getAllChannels().get(0).getTransaction(); - tx.begin(); - - for (int i = 0; i < 100; i++) { - logger.info("Sending request"); - - executor.submit(clientRequestRunnable); - - Event event = channel.take(); - - Assert.assertNotNull(event); - Assert.assertArrayEquals("Test message".getBytes(), event.getBody()); - } - - tx.commit(); - tx.close(); - executor.shutdown(); - - while (!executor.isTerminated()) { - executor.awaitTermination(500, TimeUnit.MILLISECONDS); - } - - source.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/resources/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/resources/flume-conf.properties b/flume-ng-node/src/test/resources/flume-conf.properties index 2b74d4c..23cace9 100644 --- a/flume-ng-node/src/test/resources/flume-conf.properties +++ b/flume-ng-node/src/test/resources/flume-conf.properties @@ -23,43 +23,22 @@ # host2, host3 etc. # -host1.sources = avroSource syslogSource -host1.channels = jdbcChannel memChannel -host1.sinks = hdfsSink +host1.sources = source1 +host1.channels = channel1 +host1.sinks = sink1 # avroSource configuration -host1.sources.avroSource.type = avro -host1.sources.avroSource.bind = 127.0.0.1 -host1.sources.avroSource.port = 11001 -host1.sources.avroSource.channels = jdbcChannel - -# syslogSource configuration -host1.sources.syslogSource.type = syslogtcp -host1.sources.syslogSource.port = 13231 -host1.sources.syslogSource.channels = jdbcChannel memChannel -host1.sources.syslogSource.selector.type = multiplexing -host1.sources.syslogSource.selector.header = my.selector.header -host1.sources.syslogSource.selector.mapping.all = jdbcChannel memChannel -host1.sources.syslogSource.selector.mapping.persist = jdbcChannel -host1.sources.syslogSource.selector.default = memChannel - -# jdbcChannel configuration -host1.channels.jdbcChannel.type = jdbc -host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver -host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb -host1.channels.jdbcChannel.jdbc.username = flume -host1.channels.jdbcChannel.jdbc.password = flume - -# memChannel configuration -host1.channels.memChannel.type = memory -host1.channels.memChannel.capacity = 10000 +host1.sources.source1.type = seq +host1.sources.source1.channels = channel1 + +# memChannel1 configuration +host1.channels.channel1.type = memory +host1.channels.channel1.capacity = 10000 + # hdfsSink configuration -host1.sinks.hdfsSink.type = hdfs -host1.sinks.hdfsSink.namenode = hdfs://localhost/ -host1.sinks.hdfsSink.batchsize = 1000 -host1.sinks.hdfsSink.runner.type = polling -host1.sinks.hdfsSink.runner.polling.interval = 60 +host1.sinks.sink1.type = null +host1.sinks.sink1.channel = channel1 # # Agent configuration for host2 - invalid because channels is not
