http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
 
b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
deleted file mode 100644
index 60bfd5e..0000000
--- 
a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node.nodemanager;
-
-import java.util.Map.Entry;
-
-import org.apache.flume.Channel;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
-import org.apache.flume.node.NodeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.util.Properties;
-import java.util.Set;
-import org.apache.flume.Context;
-import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.MonitoringType;
-
-
-public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
-    implements NodeConfigurationAware {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(DefaultLogicalNodeManager.class);
-
-  private LifecycleSupervisor nodeSupervisor;
-  private LifecycleState lifecycleState;
-  private NodeConfiguration nodeConfiguration;
-
-  private MonitorService monitorServer;
-
-  public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
-  public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
-
-  public DefaultLogicalNodeManager() {
-    nodeSupervisor = new LifecycleSupervisor();
-    lifecycleState = LifecycleState.IDLE;
-    nodeConfiguration = null;
-  }
-
-  @Override
-  public void stopAllComponents() {
-    if (this.nodeConfiguration != null) {
-      logger.info("Shutting down configuration: {}", this.nodeConfiguration);
-      for (Entry<String, SourceRunner> entry : this.nodeConfiguration
-          .getSourceRunners().entrySet()) {
-        try{
-          logger.info("Stopping Source " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-
-      for (Entry<String, SinkRunner> entry :
-        this.nodeConfiguration.getSinkRunners().entrySet()) {
-        try{
-          logger.info("Stopping Sink " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-
-      for (Entry<String, Channel> entry :
-        this.nodeConfiguration.getChannels().entrySet()) {
-        try{
-          logger.info("Stopping Channel " + entry.getKey());
-          nodeSupervisor.unsupervise(entry.getValue());
-        } catch (Exception e){
-          logger.error("Error while stopping {}", entry.getValue(), e);
-        }
-      }
-    }
-    if(monitorServer != null) {
-      monitorServer.stop();
-    }
-  }
-
-  @Override
-  public void startAllComponents(NodeConfiguration nodeConfiguration) {
-    logger.info("Starting new configuration:{}", nodeConfiguration);
-
-    this.nodeConfiguration = nodeConfiguration;
-
-    for (Entry<String, Channel> entry :
-      nodeConfiguration.getChannels().entrySet()) {
-      try{
-        logger.info("Starting Channel " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e){
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    /*
-     * Wait for all channels to start.
-     */
-    for(Channel ch: nodeConfiguration.getChannels().values()){
-      while(ch.getLifecycleState() != LifecycleState.START
-          && !nodeSupervisor.isComponentInErrorState(ch)){
-        try {
-          logger.info("Waiting for channel: " + ch.getName() +
-              " to start. Sleeping for 500 ms");
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          logger.error("Interrupted while waiting for channel to start.", e);
-          Throwables.propagate(e);
-        }
-      }
-    }
-
-    for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
-        .entrySet()) {
-      try{
-        logger.info("Starting Sink " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e) {
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    for (Entry<String, SourceRunner> entry : nodeConfiguration
-        .getSourceRunners().entrySet()) {
-      try{
-        logger.info("Starting Source " + entry.getKey());
-        nodeSupervisor.supervise(entry.getValue(),
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      } catch (Exception e) {
-        logger.error("Error while starting {}", entry.getValue(), e);
-      }
-    }
-
-    this.loadMonitoring();
-  }
-
-  @Override
-  public boolean add(LifecycleAware node) {
-    /*
-     * FIXME: This type of overriding worries me. There should be a better
-     * separation of addition of nodes and management. (i.e. state vs. 
function)
-     */
-    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
-        "You can not add nodes to a manager that hasn't been started");
-
-    if (super.add(node)) {
-      nodeSupervisor.supervise(node,
-          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean remove(LifecycleAware node) {
-    /*
-     * FIXME: This type of overriding worries me. There should be a better
-     * separation of addition of nodes and management. (i.e. state vs. 
function)
-     */
-    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
-        "You can not remove nodes from a manager that hasn't been started");
-
-    if (super.remove(node)) {
-      nodeSupervisor.unsupervise(node);
-
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public void start() {
-
-    logger.info("Node manager starting");
-
-    nodeSupervisor.start();
-
-    logger.debug("Node manager started");
-
-    lifecycleState = LifecycleState.START;
-  }
-
-  @Override
-  public void stop() {
-
-    logger.info("Node manager stopping");
-
-    stopAllComponents();
-
-    nodeSupervisor.stop();
-
-    logger.debug("Node manager stopped");
-
-    lifecycleState = LifecycleState.STOP;
-  }
-
-  @Override
-  public LifecycleState getLifecycleState() {
-    return lifecycleState;
-  }
-
-  private void loadMonitoring() {
-    Properties systemProps = System.getProperties();
-    Set<String> keys = systemProps.stringPropertyNames();
-    try {
-      if (keys.contains(CONF_MONITOR_CLASS)) {
-        String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
-        Class<? extends MonitorService> klass;
-        try {
-          //Is it a known type?
-          klass = MonitoringType.valueOf(
-                  monitorType.toUpperCase()).getMonitorClass();
-        } catch (Exception e) {
-          //Not a known type, use FQCN
-          klass = (Class<? extends MonitorService>) Class.forName(monitorType);
-        }
-        this.monitorServer = klass.newInstance();
-        Context context = new Context();
-        for (String key : keys) {
-          if (key.startsWith(CONF_MONITOR_PREFIX)) {
-            context.put(key.substring(CONF_MONITOR_PREFIX.length()),
-                    systemProps.getProperty(key));
-          }
-        }
-        monitorServer.configure(context);
-        monitorServer.start();
-      }
-    } catch (Exception e) {
-      logger.warn("Error starting monitoring. "
-              + "Monitoring might not be available.", e);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
 
b/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
deleted file mode 100644
index c20bf9b..0000000
--- 
a/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node.nodemanager;
-
-import org.apache.flume.node.NodeConfiguration;
-
-public interface NodeConfigurationAware {
-
-  /**
-   * Stop all components currently running.
-   */
-  public void stopAllComponents();
-
-  /**
-   * Start components with the configuration provided.
-   * @param nodeConfiguration
-   */
-  public void startAllComponents(NodeConfiguration nodeConfiguration);
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
 
b/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
deleted file mode 100644
index d43aed6..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.conf.properties;
-
-import java.io.File;
-
-import org.apache.flume.channel.DefaultChannelFactory;
-import org.apache.flume.node.NodeConfiguration;
-import org.apache.flume.node.nodemanager.NodeConfigurationAware;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.flume.source.DefaultSourceFactory;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPropertiesFileConfigurationProvider {
-
-  private static final File TESTFILE = new File(
-      TestPropertiesFileConfigurationProvider.class.getClassLoader()
-          .getResource("flume-conf.properties").getFile());
-
-  @SuppressWarnings("unused")
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(TestPropertiesFileConfigurationProvider.class);
-
-  @Before
-  public void setUp() throws Exception {
-    File tmpDir = new File("target/test");
-    tmpDir.mkdirs();
-
-    File derbyLogFile = new File(tmpDir, "derbytest.log");
-    String derbyLogFilePath = derbyLogFile.getCanonicalPath();
-
-    System.setProperty("derby.stream.error.file", derbyLogFilePath);
-  }
-
-  @Test
-  public void testPropertyRead() throws Exception {
-    PropertiesFileConfigurationProvider provider =
-        new PropertiesFileConfigurationProvider();
-
-    provider.setNodeName("host1");
-    provider.setConfigurationAware(new DummyNodeConfigurationAware());
-
-    provider.setChannelFactory(new DefaultChannelFactory());
-    provider.setSourceFactory(new DefaultSourceFactory());
-    provider.setSinkFactory(new DefaultSinkFactory());
-
-    provider.setFile(TESTFILE);
-    provider.load();
-  }
-
-  private static class DummyNodeConfigurationAware implements
-    NodeConfigurationAware {
-
-    @Override
-    public void stopAllComponents(){
-
-    }
-    @Override
-    public void startAllComponents(NodeConfiguration config) {
-       // no handling necessary
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
deleted file mode 100644
index 1cbc269..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Sink;
-import org.apache.flume.SinkProcessor;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.Source;
-import org.apache.flume.SourceRunner;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
-import org.apache.flume.sink.DefaultSinkProcessor;
-import org.apache.flume.sink.NullSink;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestAbstractLogicalNodeManager {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(TestAbstractLogicalNodeManager.class);
-
-  private AbstractLogicalNodeManager nodeManager;
-
-  @Before
-  public void setUp() {
-    nodeManager = new AbstractLogicalNodeManager() {
-
-      private LifecycleState lifecycleState = LifecycleState.IDLE;
-
-      @Override
-      public void stop() {
-
-        for (LifecycleAware node : getNodes()) {
-          node.stop();
-
-          boolean reached = false;
-
-          try {
-            reached = LifecycleController.waitForOneOf(node,
-                LifecycleState.STOP_OR_ERROR);
-          } catch (InterruptedException e) {
-            // Do nothing.
-          }
-
-          if (!reached) {
-            logger.error(
-                "Unable to stop logical node:{} This *will* cause failures.",
-                node);
-          }
-
-          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
-            lifecycleState = LifecycleState.ERROR;
-          }
-        }
-
-        lifecycleState = LifecycleState.STOP;
-      }
-
-      @Override
-      public void start() {
-
-        for (LifecycleAware node : getNodes()) {
-          node.start();
-
-          boolean reached = false;
-
-          try {
-            reached = LifecycleController.waitForOneOf(node,
-                LifecycleState.START_OR_ERROR);
-          } catch (InterruptedException e) {
-            // Do nothing.
-          }
-
-          if (!reached) {
-            logger.error(
-                "Unable to stop logical node:{} This *will* cause failures.",
-                node);
-          }
-
-          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
-            lifecycleState = LifecycleState.ERROR;
-          }
-        }
-
-        lifecycleState = LifecycleState.START;
-      }
-
-      @Override
-      public LifecycleState getLifecycleState() {
-        return lifecycleState;
-      }
-    };
-  }
-
-  @Test
-  public void testEmptyLifecycle() throws LifecycleException,
-  InterruptedException {
-
-    nodeManager.start();
-    boolean reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.START_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-    nodeManager.stop();
-    reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.STOP_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
-  }
-
-  @Test
-  public void testLifecycle() throws LifecycleException, InterruptedException {
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-
-    Source generatorSource = new SequenceGeneratorSource();
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    generatorSource.setChannelProcessor(new ChannelProcessor(rcs));
-
-    NullSink nullSink = new NullSink();
-    nullSink.configure(new Context());
-    nullSink.setChannel(channel);
-
-    nodeManager.add(SourceRunner.forSource(generatorSource));
-    SinkProcessor processor = new DefaultSinkProcessor();
-    List<Sink> sinks = new ArrayList<Sink>();
-    sinks.add(nullSink);
-    processor.setSinks(sinks);
-    nodeManager.add(new SinkRunner(processor));
-
-    nodeManager.start();
-    boolean reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.START_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-    nodeManager.stop();
-    reached = LifecycleController.waitForOneOf(nodeManager,
-        LifecycleState.STOP_OR_ERROR);
-
-    Assert.assertTrue(reached);
-    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
-  }
-
-  @Test
-  public void testRapidLifecycleFlapping() throws LifecycleException,
-  InterruptedException {
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-
-    Source source = new SequenceGeneratorSource();
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
-
-    NullSink sink = new NullSink();
-    sink.configure(new Context());
-    sink.setChannel(channel);
-
-    nodeManager.add(SourceRunner.forSource(source));
-    SinkProcessor processor = new DefaultSinkProcessor();
-    List<Sink> sinks = new ArrayList<Sink>();
-    sinks.add(sink);
-    processor.setSinks(sinks);
-    nodeManager.add(new SinkRunner(processor));
-
-    for (int i = 0; i < 10; i++) {
-      nodeManager.start();
-      boolean reached = LifecycleController.waitForOneOf(nodeManager,
-          LifecycleState.START_OR_ERROR);
-
-      Assert.assertTrue(reached);
-      Assert
-      .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
-
-      nodeManager.stop();
-      reached = LifecycleController.waitForOneOf(nodeManager,
-          LifecycleState.STOP_OR_ERROR);
-
-      Assert.assertTrue(reached);
-      Assert.assertEquals(LifecycleState.STOP, 
nodeManager.getLifecycleState());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
deleted file mode 100644
index 530b299..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
-import org.apache.flume.source.PollableSourceRunner;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestDefaultLogicalNodeManager {
-
-  private NodeManager nodeManager;
-
-  @Before
-  public void setUp() {
-    nodeManager = new DefaultLogicalNodeManager();
-  }
-
-  @Test
-  public void testLifecycle() throws LifecycleException, InterruptedException {
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testLifecycleWithNodes() throws LifecycleException,
-      InterruptedException {
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (int i = 0; i < 3; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      nodeManager.add(sourceRunner);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testNodeStartStops() throws LifecycleException,
-      InterruptedException {
-
-    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
-
-    for (int i = 0; i < 30; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      testNodes.add(sourceRunner);
-    }
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (LifecycleAware node : testNodes) {
-      nodeManager.add(node);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-  @Test
-  public void testErrorNode() throws LifecycleException, InterruptedException {
-
-    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
-
-    for (int i = 0; i < 30; i++) {
-      SequenceGeneratorSource source = new SequenceGeneratorSource();
-      List<Channel> channels = new ArrayList<Channel>();
-      channels.add(new MemoryChannel());
-      ChannelSelector rcs = new ReplicatingChannelSelector();
-      rcs.setChannels(channels);
-
-      source.setChannelProcessor(new ChannelProcessor(rcs));
-
-      PollableSourceRunner sourceRunner = new PollableSourceRunner();
-      sourceRunner.setSource(source);
-
-      testNodes.add(sourceRunner);
-    }
-
-    nodeManager.start();
-    Assert.assertTrue("Node manager didn't reach START or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.START_OR_ERROR, 5000));
-
-    for (LifecycleAware node : testNodes) {
-      nodeManager.add(node);
-    }
-
-    Thread.sleep(5000);
-
-    nodeManager.stop();
-    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
-        LifecycleController.waitForOneOf(nodeManager,
-            LifecycleState.STOP_OR_ERROR, 5000));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
deleted file mode 100644
index f2dad6f..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import org.apache.flume.SourceRunner;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
-import org.apache.flume.source.SequenceGeneratorSource;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class TestFlumeNode {
-
-  private FlumeNode node;
-
-  @Before
-  public void setUp() {
-    node = new FlumeNode();
-
-    node.setName("test-node");
-    node.setNodeManager(new EmptyLogicalNodeManager());
-  }
-
-  @Ignore("Fails given recent changes to configuration system")
-  @Test
-  public void testLifecycle() throws InterruptedException, LifecycleException {
-    node.start();
-    boolean reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.START_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
-
-    node.stop();
-    reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.STOP_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
-  }
-
-  @Ignore("Fails given recent changes to configuration system")
-  @Test
-  public void testAddNodes() throws InterruptedException, LifecycleException {
-    node.start();
-    boolean reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.START_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
-
-    SourceRunner n1 = SourceRunner.forSource(new SequenceGeneratorSource());
-
-    node.getNodeManager().add(n1);
-
-    node.stop();
-    reached = LifecycleController.waitForOneOf(node,
-        LifecycleState.STOP_OR_ERROR, 5000);
-
-    Assert.assertTrue("Matched a known state", reached);
-    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
-  }
-
-  public static class EmptyLogicalNodeManager extends
-      AbstractLogicalNodeManager {
-
-    private LifecycleState lifecycleState;
-
-    public EmptyLogicalNodeManager() {
-      lifecycleState = LifecycleState.IDLE;
-    }
-
-    @Override
-    public void start() {
-      lifecycleState = LifecycleState.START;
-    }
-
-    @Override
-    public void stop() {
-      lifecycleState = LifecycleState.STOP;
-    }
-
-    @Override
-    public LifecycleState getLifecycleState() {
-      return lifecycleState;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
 
b/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
deleted file mode 100644
index f759af1..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNodeApplication.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.node;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Causes blocking with no method for clean shutdown")
-public class TestFlumeNodeApplication {
-
-  @Test
-  public void testApplication() {
-    String[] args = new String[] {};
-
-    Application.main(args);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
 
b/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
deleted file mode 100644
index 41e2f35..0000000
--- 
a/flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.source;
-
-import org.apache.flume.EventDeliveryException;
-
-public class FlakeySequenceGeneratorSource extends SequenceGeneratorSource {
-
-  @Override
-  public Status process() throws EventDeliveryException {
-
-    if (Math.round(Math.random()) == 1) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        // Do nothing.
-      }
-
-      throw new EventDeliveryException("I'm broken!");
-    } else {
-      return super.process();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java 
b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
deleted file mode 100644
index 3c17d3d..0000000
--- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.source;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.nio.channels.Channels;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Lists;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.lifecycle.LifecycleException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestNetcatSource {
-
-  private Channel channel;
-  private EventDrivenSource source;
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(TestNetcatSource.class);
-
-  @Before
-  public void setUp() {
-    logger.info("Running setup");
-
-    channel = new MemoryChannel();
-    source = new NetcatSource();
-
-    Context context = new Context();
-
-    Configurables.configure(channel, context);
-    List<Channel> channels = Lists.newArrayList(channel);
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
-  }
-
-  @Test
-  public void testLifecycle() throws InterruptedException, LifecycleException,
-      EventDeliveryException {
-
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    boolean bound = false;
-
-    for(int i = 0; i < 100 && !bound; i++) {
-      try {
-        Context context = new Context();
-        context.put("bind", "0.0.0.0");
-        context.put("port", "41414");
-
-        Configurables.configure(source, context);
-
-        source.start();
-        bound = true;
-      } catch (FlumeException e) {
-        // assume port in use, try another one
-      }
-    }
-
-    Runnable clientRequestRunnable = new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          SocketChannel clientChannel = SocketChannel
-              .open(new InetSocketAddress(41414));
-
-          Writer writer = Channels.newWriter(clientChannel, "utf-8");
-          BufferedReader reader = new BufferedReader(
-              Channels.newReader(clientChannel, "utf-8"));
-
-          writer.write("Test message\n");
-          writer.flush();
-
-          String response = reader.readLine();
-          Assert.assertEquals("Server should return OK", "OK", response);
-          clientChannel.close();
-        } catch (IOException e) {
-          logger.error("Caught exception: ", e);
-        }
-      }
-
-    };
-
-    ChannelSelector selector = source.getChannelProcessor().getSelector();
-    Transaction tx = selector.getAllChannels().get(0).getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < 100; i++) {
-      logger.info("Sending request");
-
-      executor.submit(clientRequestRunnable);
-
-      Event event = channel.take();
-
-      Assert.assertNotNull(event);
-      Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
-    }
-
-    tx.commit();
-    tx.close();
-    executor.shutdown();
-
-    while (!executor.isTerminated()) {
-      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
-    }
-
-    source.stop();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e47fd1e7/flume-ng-node/src/test/resources/flume-conf.properties
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/resources/flume-conf.properties 
b/flume-ng-node/src/test/resources/flume-conf.properties
index 2b74d4c..23cace9 100644
--- a/flume-ng-node/src/test/resources/flume-conf.properties
+++ b/flume-ng-node/src/test/resources/flume-conf.properties
@@ -23,43 +23,22 @@
 # host2, host3 etc.
 #
 
-host1.sources = avroSource syslogSource
-host1.channels = jdbcChannel memChannel
-host1.sinks = hdfsSink
+host1.sources = source1
+host1.channels = channel1
+host1.sinks = sink1
 
 # avroSource configuration
-host1.sources.avroSource.type = avro
-host1.sources.avroSource.bind = 127.0.0.1
-host1.sources.avroSource.port = 11001
-host1.sources.avroSource.channels = jdbcChannel
-
-# syslogSource configuration
-host1.sources.syslogSource.type = syslogtcp
-host1.sources.syslogSource.port = 13231
-host1.sources.syslogSource.channels = jdbcChannel memChannel
-host1.sources.syslogSource.selector.type = multiplexing
-host1.sources.syslogSource.selector.header = my.selector.header
-host1.sources.syslogSource.selector.mapping.all = jdbcChannel memChannel
-host1.sources.syslogSource.selector.mapping.persist = jdbcChannel
-host1.sources.syslogSource.selector.default = memChannel
-
-# jdbcChannel configuration
-host1.channels.jdbcChannel.type = jdbc
-host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
-host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
-host1.channels.jdbcChannel.jdbc.username = flume
-host1.channels.jdbcChannel.jdbc.password = flume
-
-# memChannel configuration
-host1.channels.memChannel.type = memory
-host1.channels.memChannel.capacity = 10000
+host1.sources.source1.type = seq
+host1.sources.source1.channels = channel1
+
+# memChannel1 configuration
+host1.channels.channel1.type = memory
+host1.channels.channel1.capacity = 10000
+
 
 # hdfsSink configuration
-host1.sinks.hdfsSink.type = hdfs
-host1.sinks.hdfsSink.namenode = hdfs://localhost/
-host1.sinks.hdfsSink.batchsize = 1000
-host1.sinks.hdfsSink.runner.type = polling
-host1.sinks.hdfsSink.runner.polling.interval = 60
+host1.sinks.sink1.type = null
+host1.sinks.sink1.channel = channel1
 
 #
 # Agent configuration for host2 - invalid because channels is not

Reply via email to