Repository: storm
Updated Branches:
  refs/heads/1.x-branch e1c24b9b2 -> 10ab96a5c


STORM-2517 add interface for Writer, make AbstractHDFSWriter properties 
protected


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07547cac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07547cac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07547cac

Branch: refs/heads/1.x-branch
Commit: 07547cacb4f75af294c701edb9055702cd97090c
Parents: 1352714
Author: Angus Helm <[email protected]>
Authored: Wed May 24 13:27:57 2017 -0500
Committer: Angus Helm <[email protected]>
Committed: Wed Jul 5 09:41:16 2017 -0500

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 23 ++++++-------
 .../java/org/apache/storm/hdfs/bolt/Writer.java | 35 ++++++++++++++++++++
 .../storm/hdfs/common/AbstractHDFSWriter.java   | 16 ++++-----
 3 files changed, 54 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/07547cac/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 43c01d2..4503191 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -30,7 +30,6 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.AbstractHDFSWriter;
 import org.apache.storm.hdfs.common.NullPartitioner;
 import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
@@ -57,7 +56,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
     private static final Integer DEFAULT_MAX_OPEN_FILES = 50;
 
-    protected Map<String, AbstractHDFSWriter> writers;
+    protected Map<String, Writer> writers;
     protected Map<String, Integer> rotationCounterMap = new HashMap<>();
     protected List<RotationAction> rotationActions = new ArrayList<>();
     protected OutputCollector collector;
@@ -78,7 +77,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
 
     protected transient Configuration hdfsConfig;
 
-    protected void rotateOutputFile(AbstractHDFSWriter writer) throws 
IOException {
+    protected void rotateOutputFile(Writer writer) throws IOException {
         LOG.info("Rotating output file...");
         long start = System.currentTimeMillis();
         synchronized (this.writeLock) {
@@ -136,7 +135,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
 
         synchronized (this.writeLock) {
             boolean forceSync = false;
-            AbstractHDFSWriter writer = null;
+            Writer writer = null;
             String writerKey = null;
 
             if (TupleUtils.isTick(tuple)) {
@@ -202,8 +201,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
         }
     }
 
-    private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple 
tuple) throws IOException {
-        AbstractHDFSWriter writer;
+    private Writer getOrCreateWriter(String writerKey, Tuple tuple) throws 
IOException {
+        Writer writer;
 
         writer = writers.get(writerKey);
         if (writer == null) {
@@ -229,7 +228,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
         return boltKey + "****" + partitionDir;
     }
 
-    void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter 
writer) {
+    void doRotationAndRemoveWriter(String writerKey, Writer writer) {
         try {
             rotateOutputFile(writer);
         } catch (IOException e) {
@@ -253,7 +252,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
     }
 
     private void syncAllWriters() throws IOException {
-        for (AbstractHDFSWriter writer : writers.values()) {
+        for (Writer writer : writers.values()) {
             writer.sync();
         }
     }
@@ -264,7 +263,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
         TimerTask task = new TimerTask() {
             @Override
             public void run() {
-                for (final AbstractHDFSWriter writer : writers.values()) {
+                for (final Writer writer : writers.values()) {
                     try {
                         rotateOutputFile(writer);
                     } catch (IOException e) {
@@ -297,9 +296,9 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
 
     abstract protected String getWriterKey(Tuple tuple);
 
-    abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple 
tuple) throws IOException;
+    abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws 
IOException;
 
-    static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
+    static class WritersMap extends LinkedHashMap<String, Writer> {
         final long maxWriters;
 
         public WritersMap(long maxWriters) {
@@ -308,7 +307,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt 
{
         }
 
         @Override
-        protected boolean removeEldestEntry(Map.Entry<String, 
AbstractHDFSWriter> eldest) {
+        protected boolean removeEldestEntry(Map.Entry<String, Writer> eldest) {
             return this.size() > this.maxWriters;
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/07547cac/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java
new file mode 100644
index 0000000..9108aab
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.IOException;
+
+public interface Writer {
+    long write(Tuple tuple) throws IOException;
+
+    void sync() throws IOException;
+
+    void close() throws IOException;
+
+    boolean needsRotation();
+
+    Path getFilePath();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/07547cac/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
index 4b36377..b052b34 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -18,25 +18,25 @@
 package org.apache.storm.hdfs.common;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.Writer;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.tuple.Tuple;
 
 import java.io.IOException;
 
-abstract public class AbstractHDFSWriter {
-    long lastUsedTime;
-    long offset;
-    boolean needsRotation;
-    Path filePath;
-    FileRotationPolicy rotationPolicy;
+abstract public class AbstractHDFSWriter implements Writer {
+    protected long offset;
+    protected boolean needsRotation;
+    final protected Path filePath;
+    final protected FileRotationPolicy rotationPolicy;
 
-    AbstractHDFSWriter(FileRotationPolicy policy, Path path) {
+    public AbstractHDFSWriter(FileRotationPolicy policy, Path path) {
         //This must be defensively copied, because a bolt probably has only 
one rotation policy object
         this.rotationPolicy = policy.copy();
         this.filePath = path;
     }
 
-    final public long write(Tuple tuple) throws IOException{
+    final public long write(Tuple tuple) throws IOException {
         doWrite(tuple);
         this.needsRotation = rotationPolicy.mark(tuple, offset);
 

Reply via email to