Author: jeastman
Date: Tue Jan 4 00:02:06 2011
New Revision: 1054835
URL: http://svn.apache.org/viewvc?rev=1054835&view=rev
Log:
:Moved SequenceFilesFromDirectory under AbstractJob so that -D arguments work
Modified:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
Modified:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1054835&r1=1054834&r2=1054835&view=diff
==============================================================================
---
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
(original)
+++
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
Tue Jan 4 00:02:06 2011
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.FileLineIterable;
import org.slf4j.Logger;
@@ -53,20 +55,17 @@ import org.slf4j.LoggerFactory;
*
*
*/
-public final class SequenceFilesFromDirectory {
-
+public final class SequenceFilesFromDirectory extends AbstractJob {
+
private static final Logger log =
LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
-
+
private static ChunkedWriter createNewChunkedWriter(int chunkSizeInMB,
String outputDir) throws IOException {
return new ChunkedWriter(chunkSizeInMB, outputDir);
}
-
- public void createSequenceFiles(File parentDir,
- String outputDir,
- String prefix,
- int chunkSizeInMB,
- Charset charset,
- String filter) throws IOException,
ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException, InstantiationException {
+
+ public void createSequenceFiles(File parentDir, String outputDir, String
prefix, int chunkSizeInMB, Charset charset, String filter)
+ throws IOException, ClassNotFoundException, NoSuchMethodException,
InvocationTargetException, IllegalAccessException,
+ InstantiationException {
ChunkedWriter writer = createNewChunkedWriter(chunkSizeInMB, outputDir);
if ("PrefixAdditionFilter".equals(filter)) {
parentDir.listFiles(new PrefixAdditionFilter(prefix, writer, charset));
@@ -78,16 +77,22 @@ public final class SequenceFilesFromDire
}
writer.close();
}
-
+
public static class ChunkedWriter implements Closeable {
private final int maxChunkSizeInBytes;
+
private final String outputDir;
+
private SequenceFile.Writer writer;
+
private int currentChunkID;
+
private int currentChunkSize;
+
private final Configuration conf = new Configuration();
+
private final FileSystem fs;
-
+
public ChunkedWriter(int chunkSizeInMB, String outputDir) throws
IOException {
if (chunkSizeInMB > 1984) {
chunkSizeInMB = 1984;
@@ -98,47 +103,48 @@ public final class SequenceFilesFromDire
currentChunkID = 0;
writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID),
Text.class, Text.class);
}
-
+
private Path getPath(int chunkID) {
return new Path(outputDir + "/chunk-" + chunkID);
}
-
+
public void write(String key, String value) throws IOException {
if (currentChunkSize > maxChunkSizeInBytes) {
writer.close();
writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++),
Text.class, Text.class);
currentChunkSize = 0;
-
+
}
-
+
Text keyT = new Text(key);
Text valueT = new Text(value);
currentChunkSize += keyT.getBytes().length + valueT.getBytes().length;
// Overhead
writer.append(keyT, valueT);
}
-
+
@Override
public void close() throws IOException {
writer.close();
}
}
-
+
public class PrefixAdditionFilter implements FileFilter {
private final String prefix;
+
private final ChunkedWriter writer;
+
private final Charset charset;
-
+
public PrefixAdditionFilter(String prefix, ChunkedWriter writer, Charset
charset) {
this.prefix = prefix;
this.writer = writer;
this.charset = charset;
}
-
+
@Override
public boolean accept(File current) {
if (current.isDirectory()) {
- current.listFiles(new PrefixAdditionFilter(prefix + File.separator +
current.getName(), writer,
- charset));
+ current.listFiles(new PrefixAdditionFilter(prefix + File.separator +
current.getName(), writer, charset));
} else {
try {
StringBuilder file = new StringBuilder();
@@ -146,7 +152,7 @@ public final class SequenceFilesFromDire
file.append(aFit).append('\n');
}
writer.write(prefix + File.separator + current.getName(),
file.toString());
-
+
} catch (FileNotFoundException e) {
// Skip file.
} catch (IOException e) {
@@ -156,44 +162,45 @@ public final class SequenceFilesFromDire
}
return false;
}
-
+
}
-
+
public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SequenceFilesFromDirectory(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
-
- Option parentOpt =
obuilder.withLongName("input").withRequired(true).withArgument(
-
abuilder.withName("input").withMinimum(1).withMaximum(1).create()).withDescription(
- "The input dir containing the documents").withShortName("i").create();
-
- Option outputDirOpt =
obuilder.withLongName("output").withRequired(true).withArgument(
-
abuilder.withName("output").withMinimum(1).withMaximum(1).create()).withDescription(
- "The output directory").withShortName("o").create();
-
- Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
-
abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
- "The chunkSize in MegaBytes. Defaults to
64").withShortName("chunk").create();
-
- Option keyPrefixOpt = obuilder.withLongName("keyPrefix").withArgument(
-
abuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create()).withDescription(
- "The prefix to be prepended to the
key").withShortName("prefix").create();
-
- Option charsetOpt =
obuilder.withLongName("charset").withRequired(true).withArgument(
-
abuilder.withName("charset").withMinimum(1).withMaximum(1).create()).withDescription(
- "The name of the character encoding of the input
files").withShortName("c").create();
-
- Option fileFilterOpt =
obuilder.withLongName("fileFilterClass").withArgument(
-
abuilder.withName("fileFilterClass").withMinimum(1).withMaximum(1).create()).withDescription(
- "The name of the class to use for file parsing. Default:
PrefixAdditionFilter").withShortName("filter").create();
-
- Option helpOpt = obuilder.withLongName("help").withDescription("Print out
help").withShortName("h")
+
+ Option parentOpt =
obuilder.withLongName("input").withRequired(true).withArgument(abuilder.withName("input").withMinimum(1)
+ .withMaximum(1).create()).withDescription("The input dir containing
the documents").withShortName("i").create();
+
+ Option outputDirOpt =
obuilder.withLongName("output").withRequired(true).withArgument(abuilder.withName("output")
+ .withMinimum(1).withMaximum(1).create()).withDescription("The output
directory").withShortName("o").create();
+
+ Option chunkSizeOpt =
obuilder.withLongName("chunkSize").withArgument(abuilder.withName("chunkSize").withMinimum(1)
+ .withMaximum(1).create()).withDescription("The chunkSize in MegaBytes.
Defaults to 64").withShortName("chunk").create();
+
+ Option keyPrefixOpt =
obuilder.withLongName("keyPrefix").withArgument(abuilder.withName("keyPrefix").withMinimum(1)
+ .withMaximum(1).create()).withDescription("The prefix to be prepended
to the key").withShortName("prefix").create();
+
+ Option charsetOpt =
obuilder.withLongName("charset").withRequired(true).withArgument(abuilder.withName("charset")
+ .withMinimum(1).withMaximum(1).create()).withDescription("The name of
the character encoding of the input files")
+ .withShortName("c").create();
+
+ Option fileFilterOpt =
obuilder.withLongName("fileFilterClass").withArgument(abuilder.withName("fileFilterClass")
+ .withMinimum(1).withMaximum(1).create())
+ .withDescription("The name of the class to use for file parsing.
Default: PrefixAdditionFilter").withShortName("filter")
.create();
-
- Group group =
gbuilder.withName("Options").withOption(keyPrefixOpt).withOption(chunkSizeOpt).withOption(
-
charsetOpt).withOption(outputDirOpt).withOption(fileFilterOpt).withOption(helpOpt).withOption(parentOpt).create();
-
+
+ Option helpOpt = obuilder.withLongName("help").withDescription("Print out
help").withShortName("h").create();
+
+ Group group =
gbuilder.withName("Options").withOption(keyPrefixOpt).withOption(chunkSizeOpt).withOption(charsetOpt)
+
.withOption(outputDirOpt).withOption(fileFilterOpt).withOption(helpOpt).withOption(parentOpt).create();
+
try {
Parser parser = new Parser();
parser.setGroup(group);
@@ -201,16 +208,16 @@ public final class SequenceFilesFromDire
CommandLine cmdLine = parser.parse(args);
if (cmdLine.hasOption(helpOpt)) {
CommandLineUtil.printHelp(group);
- return;
+ return -1;
}
File parentDir = new File((String) cmdLine.getValue(parentOpt));
String outputDir = (String) cmdLine.getValue(outputDirOpt);
-
+
int chunkSize = 64;
if (cmdLine.hasOption(chunkSizeOpt)) {
chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
}
-
+
String prefix = "";
if (cmdLine.hasOption(keyPrefixOpt)) {
prefix = (String) cmdLine.getValue(keyPrefixOpt);
@@ -223,11 +230,12 @@ public final class SequenceFilesFromDire
Charset charset = Charset.forName((String) cmdLine.getValue(charsetOpt));
SequenceFilesFromDirectory dir = new SequenceFilesFromDirectory();
-
+
dir.createSequenceFiles(parentDir, outputDir, prefix, chunkSize,
charset, filter);
} catch (OptionException e) {
log.error("Exception", e);
CommandLineUtil.printHelp(group);
}
+ return 0;
}
}