-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50378/#review143374
-----------------------------------------------------------
Hi,
Your change interfears with a caching mechanism. I believe the best way to
describe this interference is this test below. The problem in a nutshell is
when a wildcard is specified then the parent directory used by file matching is
not the immediate parent but the directory high above. Thus its last
modification time won't be updated when a new file was added. Only mtime of
dir1 is updated when file2.txt is added but caching is initialized by fg1
directory so it only monitors that for changes. I think either a documentation
should state that when using wildcards then caching won't work or even better
an assertation should check this combination at startup. (Making cache work
with wildcard would be the best).
@Test
public void testWildcardsDirFilteringCache() throws IOException,
InterruptedException {
//first iteration everything is working as expected
File f1 = new File(tmpDir.getAbsolutePath() + "/fg1/dir1/file1.txt");
Files.createParentDirs(f1);
Files.write("file1\n", f1, Charsets.UTF_8);
Context context = new Context();
context.put(POSITION_FILE, posFilePath);
context.put(FILE_GROUPS, "fg1");
context.put(FILE_GROUPS_PREFIX + "fg1", tmpDir.getAbsolutePath() +
"/fg1/*/file.*");
Configurables.configure(source, context);
source.start();
source.process();
Transaction txn = channel.getTransaction();
txn.begin();
List<String> out = Lists.newArrayList();
for (int i = 0; i < 2; i++) {
Event e = channel.take();
if (e != null) {
out.add(TestTaildirEventReader.bodyAsString(e));
}
}
txn.commit();
txn.close();
// empty iterations simulating that time is passing by
Thread.sleep(1000);
source.process();
Thread.sleep(1000);
//file was created after a while it should be picked up as well
File f2 = new File(tmpDir.getAbsolutePath() + "/fg1/dir1/file2.txt");
Files.write("file2\n", f2, Charsets.UTF_8);
source.process();
txn = channel.getTransaction();
txn.begin();
for (int i = 0; i < 2; i++) {
Event e = channel.take();
if (e != null) {
out.add(TestTaildirEventReader.bodyAsString(e));
}
}
txn.commit();
txn.close();
assertEquals(2, out.size()); //fails as file2.txt won't appear in the
channel ever
assertTrue(out.contains("file1"));
assertTrue(out.contains("file2"));
}
- Attila Simon
On July 24, 2016, 10:37 a.m., qiao wen wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50378/
> -----------------------------------------------------------
>
> (Updated July 24, 2016, 10:37 a.m.)
>
>
> Review request for Flume.
>
>
> Repository: flume-git
>
>
> Description
> -------
>
> In our log management project, we wan't to track many log files like this:
> /app/dir1/log.*
> /app/dir2/log.*
> ...
> /app/dirn/log.*
> But TaildirSource can't support wildcards in filegroup directory name. The
> following config is expected:
> a1.sources.r1.filegroups.fg = /app/*/log.*
>
>
> Diffs
> -----
>
> flume-ng-doc/sphinx/FlumeUserGuide.rst 1334500
>
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirMatcher.java
> ad9f720
>
> flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
> 097ee0b
>
> Diff: https://reviews.apache.org/r/50378/diff/
>
>
> Testing
> -------
>
> All tests in TestTaildirSource passed.
>
>
> Thanks,
>
> qiao wen
>
>