Fixing bugs related to switching to next file in setting fileReadCompletely=true/false and reader=null for ACK mode reading. Added UTs. incorprated review comments from Satish and others
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e52f083 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e52f083 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e52f083 Branch: refs/heads/1.x-branch Commit: 1e52f0837aed03cc47b86b1e02037b6136c8c8b0 Parents: 152856d Author: Roshan Naik <[email protected]> Authored: Mon Dec 21 20:22:03 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../hdfs/common/CmpFilesByModificationTime.java | 32 ----- .../org/apache/storm/hdfs/common/HdfsUtils.java | 4 +- .../storm/hdfs/common/ModifTimeComparator.java | 32 +++++ .../storm/hdfs/spout/AbstractFileReader.java | 2 - .../org/apache/storm/hdfs/spout/FileLock.java | 17 ++- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 101 ++++++-------- .../apache/storm/hdfs/spout/TextFileReader.java | 19 +-- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 134 +++++++++++++++---- 8 files changed, 207 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java deleted file mode 100644 index 67420aa..0000000 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.hdfs.common; - -import org.apache.hadoop.fs.FileStatus; - -import java.util.Comparator; - - -public class CmpFilesByModificationTime - implements Comparator<FileStatus> { - @Override - public int compare(FileStatus o1, FileStatus o2) { - return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() ); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index e8df78d..86b9ee8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -49,7 +49,7 @@ public class HdfsUtils { fstats.add(fileStatus); } } - Collections.sort(fstats, new CmpFilesByModificationTime() ); + Collections.sort(fstats, new ModifTimeComparator() ); ArrayList<Path> result = new ArrayList<>(fstats.size()); for (LocatedFileStatus fstat : fstats) { @@ -59,7 +59,7 @@ public class HdfsUtils { } /** - * Returns true if succeeded. False if file already exists. throws if there was unexpected problem + * Returns null if file already exists. throws if there was unexpected problem */ public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException { try { http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java new file mode 100644 index 0000000..de5613e --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.FileStatus; + +import java.util.Comparator; + + +public class ModifTimeComparator + implements Comparator<FileStatus> { + @Override + public int compare(FileStatus o1, FileStatus o2) { + return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() ); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java index 09dc0d3..6efea81 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java @@ -26,13 +26,11 @@ import org.apache.hadoop.fs.Path; abstract class AbstractFileReader implements FileReader { private final Path file; - private final FileSystem fs; private Fields fields; public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) { if (fs == null || file == null) throw new IllegalArgumentException("file and filesystem args cannot be null"); - this.fs = fs; this.file = file; this.fields = fieldNames; } http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index b40d1dd..89ed855 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -48,7 +48,7 @@ public class FileLock { private final FSDataOutputStream lockFileStream; private LogEntry lastEntry; - private static final Logger log = LoggerFactory.getLogger(DirLock.class); + private static final Logger log = LoggerFactory.getLogger(FileLock.class); private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId) throws IOException { @@ -89,9 +89,15 @@ public class FileLock { lastEntry = entry; // update this only after writing to hdfs } + /** Release lock by deleting file + * @throws IOException if lock file could not be deleted + */ public void release() throws IOException { lockFileStream.close(); - fs.delete(lockFile, false); + if(!fs.delete(lockFile, false)){ + log.warn("Unable to delete lock file"); + throw new IOException("Unable to delete lock file"); + } log.debug("Released lock file {}", lockFile); } @@ -109,10 +115,10 @@ public class FileLock { try { FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); if (ostream != null) { - log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile); + log.debug("Acquired lock on file {}. LockFile=", fileToLock, lockFile); return new FileLock(fs, lockFile, ostream, spoutId); } else { - log.info("Cannot lock file {} as its already locked.", fileToLock); + log.debug("Cannot lock file {} as its already locked.", fileToLock); return null; } } catch (IOException e) { @@ -166,7 +172,6 @@ public class FileLock { return LogEntry.deserialize(lastLine); } - // takes ownership of the lock file /** * Takes ownership of the lock file if possible. * @param lockFile @@ -184,7 +189,7 @@ public class FileLock { return new FileLock(fs, lockFile, spoutId, lastEntry); } catch (RemoteException e) { if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) { - log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile); + log.warn("Lock file {} is currently open. Cannot transfer ownership now. Will try later.", lockFile); return null; } else { // unexpected error throw e; http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 50c2172..3d95ea7 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -86,7 +86,7 @@ public class HdfsSpout extends BaseRichSpout { private int acksSinceLastCommit = 0 ; private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false); private final Timer commitTimer = new Timer(); - private boolean fileReadCompletely = false; + private boolean fileReadCompletely = true; private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs @@ -130,12 +130,15 @@ public class HdfsSpout extends BaseRichSpout { // 3) Select a new file if one is not open already if (reader == null) { reader = pickNextFile(); + fileReadCompletely=false; if (reader == null) { LOG.debug("Currently no new files to process under : " + sourceDirPath); return; } } - + if( fileReadCompletely ) { // wait for more ACKs before proceeding + return; + } // 4) Read record from file, emit to collector and record progress List<Object> tuple = reader.next(); if (tuple != null) { @@ -145,7 +148,7 @@ public class HdfsSpout extends BaseRichSpout { emitData(tuple, msgId); if(!ackEnabled) { - ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode + ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode commitProgress(reader.getFileOffset()); } else { commitProgress(tracker.getCommitPosition()); @@ -175,6 +178,8 @@ public class HdfsSpout extends BaseRichSpout { // will commit progress into lock file if commit threshold is reached private void commitProgress(FileOffset position) { + if(position==null) + return; if ( lock!=null && canCommitNow() ) { try { lock.heartbeat(position.toString()); @@ -205,15 +210,13 @@ public class HdfsSpout extends BaseRichSpout { } private void markFileAsDone(Path filePath) { - fileReadCompletely = false; try { Path newFile = renameCompletedFile(reader.getFilePath()); LOG.info("Completed processing {}", newFile); } catch (IOException e) { LOG.error("Unable to archive completed file" + filePath, e); } - unlockAndCloseReader(); - + closeReaderAndResetTrackers(); } private void markFileAsBad(Path file) { @@ -222,19 +225,22 @@ public class HdfsSpout extends BaseRichSpout { String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); - LOG.info("Moving bad file {} to {} ", originalName, newFile); + LOG.info("Moving bad file {} to {}. Processed it till offset {}", originalName, newFile, tracker.getCommitPosition()); try { if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception } } catch (IOException e) { - LOG.warn("Error moving bad file: " + file + ". to destination : " + newFile); + LOG.warn("Error moving bad file: " + file + " to destination " + newFile, e); } - - unlockAndCloseReader(); + closeReaderAndResetTrackers(); } - private void unlockAndCloseReader() { + private void closeReaderAndResetTrackers() { + inflight.clear(); + tracker.offsets.clear(); + retryList.clear(); + reader.close(); reader = null; try { @@ -245,8 +251,6 @@ public class HdfsSpout extends BaseRichSpout { lock = null; } - - protected void emitData(List<Object> tuple, MessageId id) { LOG.debug("Emitting - {}", id); this.collector.emit(tuple, id); @@ -306,21 +310,7 @@ public class HdfsSpout extends BaseRichSpout { throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required"); } this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() ); - - try { - if(hdfs.exists(archiveDirPath)) { - if(! hdfs.isDirectory(archiveDirPath) ) { - LOG.error("Archive directory is a file. " + archiveDirPath); - throw new RuntimeException("Archive directory is a file. " + archiveDirPath); - } - } else if(! hdfs.mkdirs(archiveDirPath) ) { - LOG.error("Unable to create archive directory. " + archiveDirPath); - throw new RuntimeException("Unable to create archive directory " + archiveDirPath); - } - } catch (IOException e) { - LOG.error("Unable to create archive dir ", e); - throw new RuntimeException("Unable to create archive directory ", e); - } + validateOrMakeDir(hdfs, archiveDirPath, "Archive"); // -- bad files dir config if ( !conf.containsKey(Configs.BAD_DIR) ) { @@ -329,23 +319,9 @@ public class HdfsSpout extends BaseRichSpout { } this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString()); + validateOrMakeDir(hdfs, badFilesDirPath, "bad files"); - try { - if(hdfs.exists(badFilesDirPath)) { - if(! hdfs.isDirectory(badFilesDirPath) ) { - LOG.error("Bad files directory is a file: " + badFilesDirPath); - throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath); - } - } else if(! hdfs.mkdirs(badFilesDirPath) ) { - LOG.error("Unable to create directory for bad files: " + badFilesDirPath); - throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath); - } - } catch (IOException e) { - LOG.error("Unable to create archive dir ", e); - throw new RuntimeException(e.getMessage(), e); - } - - // -- ignore filename suffix + // -- ignore filename suffix if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) { this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString(); } @@ -353,21 +329,7 @@ public class HdfsSpout extends BaseRichSpout { // -- lock dir config String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ; this.lockDirPath = new Path(lockDir); - - try { - if(hdfs.exists(lockDirPath)) { - if(! hdfs.isDirectory(lockDirPath) ) { - LOG.error("Lock directory is a file: " + lockDirPath); - throw new RuntimeException("Lock directory is a file: " + lockDirPath); - } - } else if(! hdfs.mkdirs(lockDirPath) ) { - LOG.error("Unable to create lock directory: " + lockDirPath); - throw new RuntimeException("Unable to create lock directory: " + lockDirPath); - } - } catch (IOException e) { - LOG.error("Unable to create lock dir: " + lockDirPath, e); - throw new RuntimeException(e.getMessage(), e); - } + validateOrMakeDir(hdfs,lockDirPath,"locks"); // -- lock timeout if( conf.get(Configs.LOCK_TIMEOUT) !=null ) @@ -403,6 +365,23 @@ public class HdfsSpout extends BaseRichSpout { setupCommitElapseTimer(); } + private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) { + try { + if(fs.exists(dir)) { + if(! fs.isDirectory(dir) ) { + LOG.error(dirDescription + " directory is a file, not a dir. " + dir); + throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir); + } + } else if(! fs.mkdirs(dir) ) { + LOG.error("Unable to create " + dirDescription + " directory " + dir); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir); + } + } catch (IOException e) { + LOG.error("Unable to create " + dirDescription + " directory " + dir, e); + throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e); + } + } + private String getDefaultLockDir(Path sourceDirPath) { return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; } @@ -425,12 +404,14 @@ public class HdfsSpout extends BaseRichSpout { @Override public void ack(Object msgId) { + if(!ackEnabled) + throw new IllegalStateException("Received an ACKs when ack-ing is disabled" ); MessageId id = (MessageId) msgId; inflight.remove(id); ++acksSinceLastCommit; tracker.recordAckedOffset(id.offset); commitProgress(tracker.getCommitPosition()); - if(fileReadCompletely) { + if(fileReadCompletely && inflight.isEmpty()) { markFileAsDone(reader.getFilePath()); reader = null; } http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index 6e4a8b0..b998d30 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -46,19 +46,20 @@ class TextFileReader extends AbstractFileReader { private TextFileReader.Offset offset; public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException { - super(fs, file, new Fields(DEFAULT_FIELD_NAME)); - FSDataInputStream in = fs.open(file); - String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); - int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); - reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); - offset = new TextFileReader.Offset(0,0); + this(fs, file, conf, new TextFileReader.Offset(0,0) ); } public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException { + this(fs, file, conf, new TextFileReader.Offset(startOffset) ); + } + + private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) throws IOException { super(fs, file, new Fields(DEFAULT_FIELD_NAME)); - offset = new TextFileReader.Offset(startOffset); + offset = startOffset; FSDataInputStream in = fs.open(file); - in.seek(offset.byteOffset); + if(offset.byteOffset>0) + in.seek(offset.byteOffset); + String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); @@ -97,6 +98,8 @@ class TextFileReader extends AbstractFileReader { } public Offset(String offset) { + if(offset!=null) + throw new IllegalArgumentException("offset cannot be null"); try { String[] parts = offset.split(":"); this.byteOffset = Long.parseLong(parts[0].split("=")[1]); http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 98d21f8..f64400a 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -18,6 +18,7 @@ package org.apache.storm.hdfs.spout; +import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -48,6 +49,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -109,31 +111,51 @@ public class TestHdfsSpout { } @Test - public void testSimpleText() throws IOException { + public void testSimpleText_noACK() throws IOException { Path file1 = new Path(source.toString() + "/file1.txt"); createTextFile(file1, 5); Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - listDir(source); - Map conf = getDefaultConfig(); conf.put(Configs.COMMIT_FREQ_COUNT, "1"); conf.put(Configs.COMMIT_FREQ_SEC, "1"); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); - List<String> res = runSpout(spout,"r11", "a0", "a1", "a2", "a3", "a4"); - for (String re : res) { - System.err.println(re); - } + runSpout(spout,"r11"); - listCompletedDir(); Path arc1 = new Path(archive.toString() + "/file1.txt"); Path arc2 = new Path(archive.toString() + "/file2.txt"); checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); } + @Test + public void testSimpleText_ACK() throws IOException { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 5); + + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 5); + + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "1"); + conf.put(Configs.COMMIT_FREQ_SEC, "1"); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable acking + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + + // consume file 1 + runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4"); + Path arc1 = new Path(archive.toString() + "/file1.txt"); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1); + + // consume file 2 + runSpout(spout, "r6", "a5", "a6", "a7", "a8", "a9"); + Path arc2 = new Path(archive.toString() + "/file2.txt"); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); + } + private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException { ArrayList<String> expected = new ArrayList<>(); for (Path txtFile : txtFiles) { @@ -190,11 +212,6 @@ public class TestHdfsSpout { return result; } - private void listCompletedDir() throws IOException { - listDir(source); - listDir(archive); - } - private List<String> listDir(Path p) throws IOException { ArrayList<String> result = new ArrayList<>(); System.err.println("*** Listing " + p); @@ -209,28 +226,97 @@ public class TestHdfsSpout { @Test - public void testSimpleSequenceFile() throws IOException { + public void testMultipleFileConsumption_Ack() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 5); + + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "1"); + conf.put(Configs.COMMIT_FREQ_SEC, "1"); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + + // read few lines from file1 dont ack + runSpout(spout, "r3"); + FileReader reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + + // read remaining lines + runSpout(spout, "r3"); + reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely") ); + + // ack few + runSpout(spout, "a0", "a1", "a2"); + reader = getField(spout, "reader"); + Assert.assertNotNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + + //ack rest + runSpout(spout, "a3", "a4"); + reader = getField(spout, "reader"); + Assert.assertNull(reader); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + + + // go to next file + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 5); + + // Read 1 line + runSpout(spout, "r1"); + Assert.assertNotNull(getField(spout, "reader")); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + // ack 1 tuple + runSpout(spout, "a5"); + Assert.assertNotNull(getField(spout, "reader")); + Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely")); + + + // read and ack remaining lines + runSpout(spout, "r5", "a6", "a7", "a8", "a9"); + Assert.assertNull(getField(spout, "reader")); + Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely")); + } + + private static <T> T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + readerFld.setAccessible(true); + return (T) readerFld.get(spout); + } + + private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Field readerFld = HdfsSpout.class.getDeclaredField(fieldName); + readerFld.setAccessible(true); + return readerFld.getBoolean(spout); + } + + + @Test + public void testSimpleSequenceFile() throws IOException { + //1) create a couple files to consume source = new Path("/tmp/hdfsspout/source"); fs.mkdirs(source); archive = new Path("/tmp/hdfsspout/archive"); fs.mkdirs(archive); Path file1 = new Path(source + "/file1.seq"); - createSeqFile(fs, file1); + createSeqFile(fs, file1, 5); Path file2 = new Path(source + "/file2.seq"); - createSeqFile(fs, file2); + createSeqFile(fs, file2, 5); Map conf = getDefaultConfig(); HdfsSpout spout = makeSpout(0, conf, Configs.SEQ); - List<String> res = runSpout(spout, "r11", "a0", "a1", "a2", "a3", "a4"); - for (String re : res) { - System.err.println(re); - } + // consume both files + List<String> res = runSpout(spout, "r11"); + Assert.assertEquals(10, res.size()); - listDir(archive); + Assert.assertEquals(2, listDir(archive).size()); Path f1 = new Path(archive + "/file1.seq"); @@ -401,7 +487,7 @@ public class TestHdfsSpout { * fN - fail, item number: N */ - private List<String> runSpout(HdfsSpout spout, String... cmds) { + private List<String> runSpout(HdfsSpout spout, String... cmds) { MockCollector collector = (MockCollector) spout.getCollector(); for(String cmd : cmds) { if(cmd.startsWith("r")) { @@ -437,7 +523,7 @@ public class TestHdfsSpout { - private static void createSeqFile(FileSystem fs, Path file) throws IOException { + private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException { Configuration conf = new Configuration(); try { @@ -446,7 +532,7 @@ public class TestHdfsSpout { } SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class ); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < rowCount; i++) { w.append(new IntWritable(i), new Text("line " + i)); } w.close();
