[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196339#comment-15196339
 ] 

ASF GitHub Bot commented on STORM-1464:
---------------------------------------

Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1044#discussion_r56247649
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
    @@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
         public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
         }
     
    -    /**
    -     * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
    -     *
    -     * this.offset is also updated to reflect additional data written
    -     *
    -     * @param tuple
    -     * @throws IOException
    -     */
    -    abstract void writeTuple(Tuple tuple) throws IOException;
    +    private void syncAllWriters() throws IOException {
    +        for (AbstractHDFSWriter writer : writers.values()) {
    +            writer.sync();
    +        }
    +    }
     
    -    /**
    -     * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
    -     * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
    -     * SequenceFileBolt.
    -     *
    -     * @throws IOException
    -     */
    -    abstract void syncTuples() throws IOException;
    +    private String getKeyToOldestWriter()
    +    {
    +        String oldestKey = null;
    +        long oldestTime = Long.MAX_VALUE;
    +        for (final Map.Entry<String, AbstractHDFSWriter> entry : 
writers.entrySet()) {
    +            if (entry.getValue().getLastUsedTime() < oldestTime) {
    +                oldestKey = entry.getKey();
    +                oldestTime = entry.getValue().getLastUsedTime();
    +            }
    +        }
     
    -    abstract void closeOutputFile() throws IOException;
    +        return oldestKey;
    +    }
     
    -    abstract Path createOutputFile() throws IOException;
    +    private void startTimedRotationPolicy() {
    +        long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
    +        this.rotationTimer = new Timer(true);
    +        TimerTask task = new TimerTask() {
    +            @Override
    +            public void run() {
    +                for (final AbstractHDFSWriter writer : writers.values()) {
    +                    try {
    +                        rotateOutputFile(writer);
    +                    } catch (IOException e) {
    +                        LOG.warn("IOException during scheduled file 
rotation.", e);
    +                    }
    +                }
    +                writers.clear();
    +            }
    +        };
    +        this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
    +    }
    +
    +    protected Path getBasePathForNextFile(Tuple tuple) {
    +
    +        Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
    +                this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
    --- End diff --
    
    Very good observation, thank you.


> storm-hdfs should support writing to multiple files
> ---------------------------------------------------
>
>                 Key: STORM-1464
>                 URL: https://issues.apache.org/jira/browse/STORM-1464
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-hdfs
>            Reporter: Aaron Dossett
>            Assignee: Aaron Dossett
>              Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to