[ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196354#comment-15196354
 ] 

ASF GitHub Bot commented on STORM-1464:
---------------------------------------

Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1044#discussion_r56249577
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
    @@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
         public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
         }
     
    -    /**
    -     * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
    -     *
    -     * this.offset is also updated to reflect additional data written
    -     *
    -     * @param tuple
    -     * @throws IOException
    -     */
    -    abstract void writeTuple(Tuple tuple) throws IOException;
    +    private void syncAllWriters() throws IOException {
    +        for (AbstractHDFSWriter writer : writers.values()) {
    +            writer.sync();
    +        }
    +    }
     
    -    /**
    -     * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
    -     * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
    -     * SequenceFileBolt.
    -     *
    -     * @throws IOException
    -     */
    -    abstract void syncTuples() throws IOException;
    +    private String getKeyToOldestWriter()
    +    {
    +        String oldestKey = null;
    +        long oldestTime = Long.MAX_VALUE;
    +        for (final Map.Entry<String, AbstractHDFSWriter> entry : 
writers.entrySet()) {
    +            if (entry.getValue().getLastUsedTime() < oldestTime) {
    +                oldestKey = entry.getKey();
    +                oldestTime = entry.getValue().getLastUsedTime();
    +            }
    +        }
     
    -    abstract void closeOutputFile() throws IOException;
    +        return oldestKey;
    +    }
     
    -    abstract Path createOutputFile() throws IOException;
    +    private void startTimedRotationPolicy() {
    +        long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
    +        this.rotationTimer = new Timer(true);
    +        TimerTask task = new TimerTask() {
    +            @Override
    +            public void run() {
    +                for (final AbstractHDFSWriter writer : writers.values()) {
    +                    try {
    +                        rotateOutputFile(writer);
    +                    } catch (IOException e) {
    +                        LOG.warn("IOException during scheduled file 
rotation.", e);
    +                    }
    +                }
    +                writers.clear();
    +            }
    +        };
    +        this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
    +    }
    +
    +    protected Path getBasePathForNextFile(Tuple tuple) {
    +
    +        Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
    +                this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
    +
    +        return fullPathToFile;
    +    }
     
         abstract void doPrepare(Map conf, TopologyContext topologyContext, 
OutputCollector collector) throws IOException;
     
    +    abstract String getWriterKey(Tuple tuple);
    --- End diff --
    
    I don't believe it can go away entirely.  One of the things it does is 
track the need for simultaneous multiple open files within the same partition 
directory.  For example, the Avro bolt must write objects with different 
schemas to different files regardless of partitioning.  
    
    I think it's best to keep that separate notion and have a key definition 
that isn't just the partition path.  I will think about that some more though.


> storm-hdfs should support writing to multiple files
> ---------------------------------------------------
>
>                 Key: STORM-1464
>                 URL: https://issues.apache.org/jira/browse/STORM-1464
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-hdfs
>            Reporter: Aaron Dossett
>            Assignee: Aaron Dossett
>              Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to