Updated Branches: refs/heads/flume-1.5 96602b9fc -> 7bb4fc046
FLUME-2157. Spool directory source does not shut down correctly when Flume is reconfigured. (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7bb4fc04 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7bb4fc04 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7bb4fc04 Branch: refs/heads/flume-1.5 Commit: 7bb4fc0465c0ba39c8151df6de92c62f3049655a Parents: 96602b9 Author: Hari Shreedharan <[email protected]> Authored: Fri Aug 9 21:49:19 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Aug 9 21:50:08 2013 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 13 +- .../flume/source/SpoolDirectorySource.java | 41 ++++- .../flume/source/TestSpoolDirectorySource.java | 31 ++++ .../flume/test/agent/TestSpooldirSource.java | 174 +++++++++++++++++++ .../apache/flume/test/util/StagedInstall.java | 30 +++- 5 files changed, 271 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7bb4fc04/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index f82fe1f..724ab38 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -133,12 +133,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { // Do a canary test to make sure we have access to spooling directory try { - File f1 = File.createTempFile("flume", "test", spoolDirectory); - Files.write("testing flume file permissions\n", f1, Charsets.UTF_8); - Files.readLines(f1, Charsets.UTF_8); - if (!f1.delete()) { - throw new IOException("Unable to delete canary file " + f1); + File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary", + spoolDirectory); + Files.write("testing flume file permissions\n", canary, Charsets.UTF_8); + List<String> lines = Files.readLines(canary, Charsets.UTF_8); + Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary); + if (!canary.delete()) { + throw new IOException("Unable to delete canary file " + canary); } + logger.debug("Successfully created and deleted canary file: {}", canary); } catch (IOException e) { throw new FlumeException("Unable to read and modify files" + " in the spooling directory: " + spoolDirectory, e); http://git-wip-us.apache.org/repos/asf/flume/blob/7bb4fc04/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 7145580..957eb8b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -24,6 +24,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.apache.flume.*; import org.apache.flume.client.avro.ReliableSpoolingFileEventReader; import org.apache.flume.conf.Configurable; @@ -58,17 +60,18 @@ Configurable, EventDrivenSource { private Context deserializerContext; private String deletePolicy; private String inputCharset; + private volatile boolean hasFatalError = false; private SourceCounter sourceCounter; ReliableSpoolingFileEventReader reader; + private ScheduledExecutorService executor; @Override - public void start() { + public synchronized void start() { logger.info("SpoolDirectorySource source starting with directory: {}", spoolDirectory); - ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(); + executor = Executors.newSingleThreadScheduledExecutor(); File directory = new File(spoolDirectory); try { @@ -99,7 +102,15 @@ Configurable, EventDrivenSource { } @Override - public void stop() { + public synchronized void stop() { + executor.shutdown(); + try { + executor.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + logger.info("Interrupted while awaiting termination", ex); + } + executor.shutdownNow(); + super.stop(); sourceCounter.stop(); logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), @@ -107,7 +118,13 @@ Configurable, EventDrivenSource { } @Override - public void configure(Context context) { + public String toString() { + return "Spool Directory source " + getName() + + ": { spoolDir: " + spoolDirectory + " }"; + } + + @Override + public synchronized void configure(Context context) { spoolDirectory = context.getString(SPOOL_DIRECTORY); Preconditions.checkState(spoolDirectory != null, "Configuration must specify a spooling directory"); @@ -143,6 +160,11 @@ Configurable, EventDrivenSource { } } + @VisibleForTesting + protected boolean hasFatalError() { + return hasFatalError; + } + private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; private SourceCounter sourceCounter; @@ -170,10 +192,11 @@ Configurable, EventDrivenSource { sourceCounter.incrementAppendBatchAcceptedCount(); } } catch (Throwable t) { - logger.error("Uncaught exception in Runnable", t); - if (t instanceof Error) { - throw (Error) t; - } + logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + + "Uncaught exception in SpoolDirectorySource thread. " + + "Restart or reconfigure Flume to continue processing.", t); + hasFatalError = true; + Throwables.propagate(t); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/7bb4fc04/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 652d2a2..837cf15 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -21,8 +21,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; @@ -132,4 +134,33 @@ public class TestSpoolDirectorySource { source.getLifecycleState()); } } + + @Test + public void testReconfigure() throws InterruptedException, IOException { + final int NUM_RECONFIGS = 20; + for (int i = 0; i < NUM_RECONFIGS; i++) { + Context context = new Context(); + File file = new File(tmpDir.getAbsolutePath() + "/file-" + i); + Files.write("File " + i, file, Charsets.UTF_8); + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + Configurables.configure(source, context); + source.start(); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + Transaction txn = channel.getTransaction(); + txn.begin(); + try { + Event event = channel.take(); + String content = new String(event.getBody(), Charsets.UTF_8); + Assert.assertEquals("File " + i, content); + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + source.stop(); + Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError()); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/7bb4fc04/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java new file mode 100644 index 0000000..6018380 --- /dev/null +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.test.agent; + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.flume.test.util.StagedInstall; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestSpooldirSource { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TestSpooldirSource.class); + + private Properties agentProps; + private File sinkOutputDir; + private List<File> spoolDirs = Lists.newArrayList(); + + @Before + public void setup() throws Exception { + + File agentDir = StagedInstall.getInstance().getStageDir(); + LOGGER.debug("Using agent stage dir: {}", agentDir); + + File testDir = new File(agentDir, TestSpooldirSource.class.getName()); + assertTrue(testDir.mkdirs()); + + File spoolParentDir = new File(testDir, "spools"); + assertTrue("Unable to create sink output dir: " + spoolParentDir.getPath(), + spoolParentDir.mkdir()); + + final int NUM_SOURCES = 100; + + agentProps = new Properties(); + List<String> spooldirSrcNames = Lists.newArrayList(); + String channelName = "mem-01"; + + // Create source dirs and property file chunks + for (int i = 0; i < NUM_SOURCES; i++) { + String srcName = String.format("spooldir-%03d", i); + File spoolDir = new File(spoolParentDir, srcName); + assertTrue(spoolDir.mkdir()); + spooldirSrcNames.add(srcName); + spoolDirs.add(spoolDir); + + agentProps.put(String.format("agent.sources.%s.type", srcName), + "SPOOLDIR"); + agentProps.put(String.format("agent.sources.%s.spoolDir", srcName), + spoolDir.getPath()); + agentProps.put(String.format("agent.sources.%s.channels", srcName), + channelName); + } + + // Create the rest of the properties file + agentProps.put("agent.channels.mem-01.type", "MEMORY"); + agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000)); + + sinkOutputDir = new File(testDir, "out"); + assertTrue("Unable to create sink output dir: " + sinkOutputDir.getPath(), + sinkOutputDir.mkdir()); + + agentProps.put("agent.sinks.roll-01.channel", channelName); + agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL"); + agentProps.put("agent.sinks.roll-01.sink.directory", sinkOutputDir.getPath()); + agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0"); + + agentProps.put("agent.sources", Joiner.on(" ").join(spooldirSrcNames)); + agentProps.put("agent.channels", channelName); + agentProps.put("agent.sinks", "roll-01"); + } + + @After + public void teardown() throws Exception { + StagedInstall.getInstance().stopAgent(); + } + + private String getTestString(int dirNum, int fileNum) { + return String.format("Test dir %03d, test file %03d.\n", dirNum, fileNum); + } + + /** Create a bunch of test files. */ + private void createInputTestFiles(List<File> spoolDirs, int numFiles, int startNum) + throws IOException { + int numSpoolDirs = spoolDirs.size(); + for (int dirNum = 0; dirNum < numSpoolDirs; dirNum++) { + File spoolDir = spoolDirs.get(dirNum); + for (int fileNum = startNum; fileNum < numFiles; fileNum++) { + // Stage the files on what is almost certainly the same FS partition. + File tmp = new File(spoolDir.getParent(), UUID.randomUUID().toString()); + Files.append(getTestString(dirNum, fileNum), tmp, Charsets.UTF_8); + File dst = new File(spoolDir, String.format("test-file-%03d", fileNum)); + // Ensure we move them into the spool directory atomically, if possible. + assertTrue(String.format("Failed to rename %s to %s", tmp, dst), + tmp.renameTo(dst)); + } + } + } + + private void validateSeenEvents(File outDir, int outFiles, int dirs, int events) + throws IOException { + File[] sinkOutputDirChildren = outDir.listFiles(); + assertEquals("Unexpected number of files in output dir", + outFiles, sinkOutputDirChildren.length); + Set<String> seenEvents = Sets.newHashSet(); + for (File outFile : sinkOutputDirChildren) { + List<String> lines = Files.readLines(outFile, Charsets.UTF_8); + for (String line : lines) { + seenEvents.add(line); + } + } + for (int dirNum = 0; dirNum < dirs; dirNum++) { + for (int fileNum = 0; fileNum < events; fileNum++) { + String event = getTestString(dirNum, fileNum).trim(); + assertTrue("Missing event: {" + event + "}", seenEvents.contains(event)); + } + } + } + + @Test + public void testManySpooldirs() throws Exception { + LOGGER.debug("testManySpooldirs() started."); + + StagedInstall.getInstance().startAgent("agent", agentProps); + + final int NUM_FILES_PER_DIR = 10; + createInputTestFiles(spoolDirs, NUM_FILES_PER_DIR, 0); + + TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files + + // Ensure we received all events. + validateSeenEvents(sinkOutputDir,1, spoolDirs.size(), NUM_FILES_PER_DIR); + LOGGER.debug("Processed all the events!"); + + LOGGER.debug("testManySpooldirs() ended."); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/7bb4fc04/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java index bc58340..336ffc4 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java @@ -29,8 +29,11 @@ import java.net.Socket; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; +import com.google.common.base.Preconditions; +import com.google.common.io.Files; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.log4j.Logger; @@ -62,6 +65,7 @@ public class StagedInstall { private final String logDirPath; // State per invocation - config file, process, shutdown hook + private String agentName; private String configFilePath; private Process process; private ProcessShutdownHook shutdownHook; @@ -113,17 +117,22 @@ public class StagedInstall { public synchronized void startAgent(String name, Properties properties) throws Exception { + Preconditions.checkArgument(!name.isEmpty(), "agent name must not be empty"); + Preconditions.checkNotNull(properties, "properties object must not be null"); + + agentName = name; + if (process != null) { throw new Exception("A process is already running"); } - LOGGER.info("Starting process for agent: " + name + " using config: " + LOGGER.info("Starting process for agent: " + agentName + " using config: " + properties); - File configFile = createConfigurationFile(name, properties); + File configFile = createConfigurationFile(agentName, properties); configFilePath = configFile.getCanonicalPath(); String configFileName = configFile.getName(); - String logFileName = "flume-" + name + "-" + String logFileName = "flume-" + agentName + "-" + configFileName.substring(0, configFileName.indexOf('.')) + ".log"; LOGGER.info("Created configuration file: " + configFilePath); @@ -136,7 +145,7 @@ public class StagedInstall { builder.add("--classpath", agentClasspath); } builder.add("--conf-file", configFilePath); - builder.add("--name", name); + builder.add("--name", agentName); builder.add("-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath); builder.add("-D" + ENV_FLUME_ROOT_LOGGER + "=" + ENV_FLUME_ROOT_LOGGER_VALUE); @@ -168,8 +177,21 @@ public class StagedInstall { this.agentClasspath = agentClasspath; } + public synchronized void reconfigure(Properties properties) throws Exception { + File configFile = createConfigurationFile(agentName, properties); + Files.copy(configFile, new File(configFilePath)); + configFile.delete(); + LOGGER.info("Updated agent config file: " + configFilePath); + } + + public synchronized File getStageDir() { + return stageDir; + } + private File createConfigurationFile(String agentName, Properties properties) throws Exception { + Preconditions.checkNotNull(properties, "properties object must not be null"); + File file = File.createTempFile("agent", "config.properties", stageDir); OutputStream os = null;
