Repository: samza Updated Branches: refs/heads/master 963bd3085 -> ed7e4af2f
SAMZA-1596: Staging directory name has to be formatted in config When we instantiate a HDFS config staging directory we missing a formatter for getStagingDirectory so systems.hdfs-system-name.stagingDirectory does not parse from config, but only systems.%s.stagingDirectory only parses instead. Solution is to add formatter to getStagingDirectory method. Author: Akim Akimov <zed...@gmail.com> Reviewers: Jagadish <jagad...@apache.org>, Hai L<h...@linkedin.com> Closes #431 from Zedmor/FixStagingDirHDFS Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ed7e4af2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ed7e4af2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ed7e4af2 Branch: refs/heads/master Commit: ed7e4af2fd059910425dec8e3faf7c8e0eecc9fd Parents: 963bd30 Author: Akim Akimov <zed...@gmail.com> Authored: Thu Mar 1 10:58:06 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Mar 1 10:58:06 2018 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java | 2 +- .../java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java | 2 +- .../src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala | 4 ++-- .../org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java index 9251db0..28a1bac 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java @@ -100,7 +100,7 @@ public class HdfsSystemAdmin implements SystemAdmin { directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName), hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName), new HdfsFileSystemAdapter()); - stagingDirectory = hdfsConfig.getStagingDirectory(); + stagingDirectory = hdfsConfig.getStagingDirectory(systemName); readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName)); } http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index 230625d..92457ab 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -121,7 +121,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { super(consumerMetrics.getMetricsRegistry()); hdfsConfig = new HdfsConfig(config); readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName)); - stagingDirectory = hdfsConfig.getStagingDirectory(); + stagingDirectory = hdfsConfig.getStagingDirectory(systemName); bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName); numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName); readers = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala index 52e19bf..06bda2a 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala @@ -197,7 +197,7 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) { * Staging directory for storing partition description. If not set, will use the staging directory set * by yarn job. */ - def getStagingDirectory(): String = { - getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT)) + def getStagingDirectory(systemName: String): String = { + getOrElse(HdfsConfig.STAGING_DIRECTORY format systemName, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java index 481988f..6cbf7ba 100644 --- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java +++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java @@ -63,7 +63,7 @@ public class TestHdfsSystemConsumer { properties.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*TestHdfsSystemConsumer.*avro"); Path stagingDirectory = Files.createTempDirectory("staging"); stagingDirectory.toFile().deleteOnExit(); - properties.put(HdfsConfig.STAGING_DIRECTORY(), stagingDirectory.toString()); + properties.put(String.format(HdfsConfig.STAGING_DIRECTORY(), SYSTEM_NAME), stagingDirectory.toString()); return new MapConfig(properties); }