[
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192886#comment-15192886
]
ASF GitHub Bot commented on STORM-1464:
---------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/1044#discussion_r55965901
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
---
@@ -198,22 +194,62 @@ public final void execute(Tuple tuple) {
}
}
- if(this.rotationPolicy.mark(tuple, this.offset)) {
- try {
- rotateOutputFile();
- this.rotationPolicy.reset();
- this.offset = 0;
- } catch (IOException e) {
- this.collector.reportError(e);
- LOG.warn("File could not be rotated");
- //At this point there is nothing to do. In all
likelihood any filesystem operations will fail.
- //The next tuple will almost certainly fail to write
and/or sync, which force a rotation. That
- //will give rotateAndReset() a chance to work which
includes creating a fresh file handle.
- }
+ if (writer != null && writer.needsRotation()) {
+ doRotationAndRemoveWriter(writerKey, writer);
}
}
}
+ private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple
tuple) throws IOException {
+ AbstractHDFSWriter writer;
+
+ writer = writers.get(writerKey);
+ if (writer == null) {
+ if (writers.size() >= maxOpenFiles)
+ {
+ String keyToOldest = getKeyToOldestWriter();
+ AbstractHDFSWriter oldest = writers.get(keyToOldest);
+ rotateOutputFile(oldest);
+ writers.remove(keyToOldest);
+ }
+
+ Path pathForNextFile = getBasePathForNextFile(tuple);
+ writer = makeNewWriter(pathForNextFile, tuple);
+ writers.put(writerKey, writer);
+ this.rotation++;
+ }
+ return writer;
+ }
+
+ /**
+ * A tuple must be mapped to a writer based on two factors:
+ * - bolt specific logic that must separate tuples into different
files in the same directory (see the avro bolt
+ * for an example of this)
+ * - the directory the tuple will be partioned into
+ *
+ * @param tuple
+ * @return
+ */
+ private String getHashKeyForTuple(Tuple tuple) {
+ final String boltKey = getWriterKey(tuple);
--- End diff --
Why a separate key instead of the partition path itself as the key ?
> storm-hdfs should support writing to multiple files
> ---------------------------------------------------
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-hdfs
> Reporter: Aaron Dossett
> Assignee: Aaron Dossett
> Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different
> file. Schema evolution is a common use of avro and the avro bolt should
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.
> For example, if the tuple contains a "USER" field, it should be possible to
> partition based on that value.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)