fixing issues introduced due to jstorm related refactoring
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d17b3b9c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d17b3b9c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d17b3b9c Branch: refs/heads/1.x-branch Commit: d17b3b9c3cbc89d854bfb436d213d11cfd4545ec Parents: 2c02bc9 Author: Roshan Naik <[email protected]> Authored: Thu Jan 14 00:40:43 2016 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:57 2016 -0800 ---------------------------------------------------------------------- .../jvm/storm/starter/HdfsSpoutTopology.java | 24 ++++++++------- .../org/apache/storm/hdfs/spout/FileLock.java | 11 +++++-- .../org/apache/storm/hdfs/spout/FileReader.java | 1 - .../org/apache/storm/hdfs/spout/HdfsSpout.java | 32 +++++++++++--------- .../storm/hdfs/spout/ProgressTracker.java | 10 +++--- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 9 +++--- 6 files changed, 48 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java index ca6b045..191886c 100644 --- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java @@ -18,18 +18,20 @@ package storm.starter; -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.Nimbus; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.metric.LoggingMetricsConsumer; +import org.apache.storm.starter.FastWordCountTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; import org.apache.storm.hdfs.spout.Configs; import org.apache.storm.hdfs.spout.HdfsSpout; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.topology.*; -import backtype.storm.tuple.*; -import backtype.storm.task.*; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.*; +import org.apache.storm.tuple.*; +import org.apache.storm.task.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +123,7 @@ public class HdfsSpoutTopology { // 3 - Create and configure topology conf.setDebug(true); conf.setNumWorkers(WORKER_NUM); - conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + conf.registerMetricsConsumer(LoggingMetricsConsumer.class); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout, spoutNum); http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index 0217cf9..a7cb2b8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -112,8 +112,7 @@ public class FileLock { /** returns lock on file or null if file is already locked. throws if unexpected problem */ public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) throws IOException { - String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName(); - Path lockFile = new Path(lockFileName); + Path lockFile = new Path(lockDirPath, fileToLock.getName()); try { FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); @@ -148,7 +147,13 @@ public class FileLock { // timestamp in last line of file to see when the last update was made LogEntry lastEntry = getLastEntry(fs, lockFile); if(lastEntry==null) { - throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid."); + LOG.warn("Empty lock file found. Deleting it. {}", lockFile); + try { + if(!fs.delete(lockFile, false)) + throw new IOException("Empty lock file deletion failed"); + } catch (Exception e) { + LOG.error("Unable to delete empty lock file " + lockFile, e); + } } if( lastEntry.eventTime <= olderThan ) return lastEntry; http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java index 1cb1f59..54a90d4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java @@ -18,7 +18,6 @@ package org.apache.storm.hdfs.spout; -import backtype.storm.tuple.Fields; import org.apache.hadoop.fs.Path; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 5428570..06896b2 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -30,7 +30,7 @@ import java.util.TimerTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import backtype.storm.Config; +import org.apache.storm.Config; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,11 +39,11 @@ import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; public class HdfsSpout extends BaseRichSpout { @@ -309,7 +309,7 @@ public class HdfsSpout extends BaseRichSpout { Map<String, Object> map = (Map<String, Object>)conf.get(configKey); if(map != null) { for(String keyName : map.keySet()){ - LOG.info("HDFS Config override : " + keyName + " = " + String.valueOf(map.get(keyName))); + LOG.info("HDFS Config override : {} = {} ", keyName, String.valueOf(map.get(keyName))); this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName))); } try { @@ -373,9 +373,9 @@ public class HdfsSpout extends BaseRichSpout { this.ackEnabled = (ackerCount>0); LOG.debug("ACKer count = {}", ackerCount); } - else { - this.ackEnabled = false; - LOG.debug("No ACKers config found"); + else { // ackers==null when ackerCount not explicitly set on the topology + this.ackEnabled = true; + LOG.debug("ACK count not explicitly set on topology."); } LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled"); @@ -393,13 +393,15 @@ public class HdfsSpout extends BaseRichSpout { } } - // -- max duplicate - if( conf.get(Configs.MAX_OUTSTANDING) !=null ) - maxOutstanding = Integer.parseInt( conf.get(Configs.MAX_OUTSTANDING).toString() ); + // -- max outstanding tuples + if( conf.get(Configs.MAX_OUTSTANDING) !=null ) { + maxOutstanding = Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString()); + } // -- clocks in sync - if( conf.get(Configs.CLOCKS_INSYNC) !=null ) + if( conf.get(Configs.CLOCKS_INSYNC) !=null ) { clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString()); + } // -- spout id spoutId = context.getThisComponentId(); @@ -530,7 +532,7 @@ public class HdfsSpout extends BaseRichSpout { /** * If clocks in sync, then acquires the oldest expired lock * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock - * @return + * @return a lock object * @throws IOException */ private FileLock getOldestExpiredLock() throws IOException { http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java index d7de3ed..e2e7126 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java @@ -25,7 +25,7 @@ public class ProgressTracker { TreeSet<FileOffset> offsets = new TreeSet<>(); - public void recordAckedOffset(FileOffset newOffset) { + public synchronized void recordAckedOffset(FileOffset newOffset) { if(newOffset==null) { return; } @@ -40,7 +40,7 @@ public class ProgressTracker { // remove contiguous elements from the head of the heap // e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15 - private void trimHead() { + private synchronized void trimHead() { if(offsets.size()<=1) { return; } @@ -53,18 +53,18 @@ public class ProgressTracker { return; } - public FileOffset getCommitPosition() { + public synchronized FileOffset getCommitPosition() { if(!offsets.isEmpty()) { return offsets.first().clone(); } return null; } - public void dumpState(PrintStream stream) { + public synchronized void dumpState(PrintStream stream) { stream.println(offsets); } - public int size() { + public synchronized int size() { return offsets.size(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 835a714..330afe9 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -18,9 +18,9 @@ package org.apache.storm.hdfs.spout; -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Writable; @@ -44,7 +44,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.io.BufferedReader; import java.io.File; @@ -58,6 +57,7 @@ import java.util.List; import java.util.Map; import org.apache.storm.hdfs.common.HdfsUtils.Pair; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; public class TestHdfsSpout { @@ -557,6 +557,7 @@ public class TestHdfsSpout { conf.put(Configs.ARCHIVE_DIR, archive.toString()); conf.put(Configs.BAD_DIR, badfiles.toString()); conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString()); + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0"); return conf; }
