Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1044#discussion_r55965923
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 ---
    @@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
         public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
         }
     
    -    /**
    -     * writes a tuple to the underlying filesystem but makes no guarantees 
about syncing data.
    -     *
    -     * this.offset is also updated to reflect additional data written
    -     *
    -     * @param tuple
    -     * @throws IOException
    -     */
    -    abstract void writeTuple(Tuple tuple) throws IOException;
    +    private void syncAllWriters() throws IOException {
    +        for (AbstractHDFSWriter writer : writers.values()) {
    +            writer.sync();
    +        }
    +    }
     
    -    /**
    -     * Make the best effort to sync written data to the underlying file 
system.  Concrete classes should very clearly
    -     * state the file state that sync guarantees.  For example, HdfsBolt 
can make a much stronger guarantee than
    -     * SequenceFileBolt.
    -     *
    -     * @throws IOException
    -     */
    -    abstract void syncTuples() throws IOException;
    +    private String getKeyToOldestWriter()
    +    {
    +        String oldestKey = null;
    +        long oldestTime = Long.MAX_VALUE;
    +        for (final Map.Entry<String, AbstractHDFSWriter> entry : 
writers.entrySet()) {
    +            if (entry.getValue().getLastUsedTime() < oldestTime) {
    +                oldestKey = entry.getKey();
    +                oldestTime = entry.getValue().getLastUsedTime();
    +            }
    +        }
     
    -    abstract void closeOutputFile() throws IOException;
    +        return oldestKey;
    +    }
     
    -    abstract Path createOutputFile() throws IOException;
    +    private void startTimedRotationPolicy() {
    +        long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
    +        this.rotationTimer = new Timer(true);
    +        TimerTask task = new TimerTask() {
    +            @Override
    +            public void run() {
    +                for (final AbstractHDFSWriter writer : writers.values()) {
    +                    try {
    +                        rotateOutputFile(writer);
    +                    } catch (IOException e) {
    +                        LOG.warn("IOException during scheduled file 
rotation.", e);
    +                    }
    +                }
    +                writers.clear();
    +            }
    +        };
    +        this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
    +    }
    +
    +    protected Path getBasePathForNextFile(Tuple tuple) {
    +
    +        Path fullPathToFile = new Path(this.fsUrl + 
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
    +                this.fileNameFormat.getName(this.rotation, 
System.currentTimeMillis()));
    --- End diff --
    
    Since `this.rotation` is not per writer, we could end up having 
non-contiguous files names (like file-1, file-5, file-8 etc) inside a partition 
path, correct ? If so it should be fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to