Repository: samza
Updated Branches:
  refs/heads/master 963bd3085 -> ed7e4af2f


SAMZA-1596: Staging directory name has to be formatted in config

When we instantiate a HDFS config staging directory we missing a formatter for 
getStagingDirectory so systems.hdfs-system-name.stagingDirectory does not parse 
from config, but only systems.%s.stagingDirectory only parses instead.

Solution is to add formatter to getStagingDirectory method.

Author: Akim Akimov <zed...@gmail.com>

Reviewers: Jagadish <jagad...@apache.org>, Hai L<h...@linkedin.com>

Closes #431 from Zedmor/FixStagingDirHDFS


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ed7e4af2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ed7e4af2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ed7e4af2

Branch: refs/heads/master
Commit: ed7e4af2fd059910425dec8e3faf7c8e0eecc9fd
Parents: 963bd30
Author: Akim Akimov <zed...@gmail.com>
Authored: Thu Mar 1 10:58:06 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Mar 1 10:58:06 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java  | 2 +-
 .../java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java    | 2 +-
 .../src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala | 4 ++--
 .../org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java     | 2 +-
 4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 9251db0..28a1bac 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -100,7 +100,7 @@ public class HdfsSystemAdmin implements SystemAdmin {
     directoryPartitioner = new 
DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
       hdfsConfig.getPartitionerBlackList(systemName), 
hdfsConfig.getPartitionerGroupPattern(systemName),
       new HdfsFileSystemAdapter());
-    stagingDirectory = hdfsConfig.getStagingDirectory();
+    stagingDirectory = hdfsConfig.getStagingDirectory(systemName);
     readerType = 
HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index 230625d..92457ab 100644
--- 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -121,7 +121,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap 
{
     super(consumerMetrics.getMetricsRegistry());
     hdfsConfig = new HdfsConfig(config);
     readerType = 
HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
-    stagingDirectory = hdfsConfig.getStagingDirectory();
+    stagingDirectory = hdfsConfig.getStagingDirectory(systemName);
     bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
     numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
     readers = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 52e19bf..06bda2a 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -197,7 +197,7 @@ class HdfsConfig(config: Config) extends 
ScalaMapConfig(config) {
    * Staging directory for storing partition description. If not set, will use 
the staging directory set
    * by yarn job.
    */
-  def getStagingDirectory(): String = {
-    getOrElse(HdfsConfig.STAGING_DIRECTORY, 
getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, 
HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+  def getStagingDirectory(systemName: String): String = {
+    getOrElse(HdfsConfig.STAGING_DIRECTORY format systemName, 
getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, 
HdfsConfig.STAGING_DIRECTORY_DEFAULT))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ed7e4af2/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
 
b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
index 481988f..6cbf7ba 100644
--- 
a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
+++ 
b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
@@ -63,7 +63,7 @@ public class TestHdfsSystemConsumer {
     properties.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), 
SYSTEM_NAME), ".*TestHdfsSystemConsumer.*avro");
     Path stagingDirectory = Files.createTempDirectory("staging");
     stagingDirectory.toFile().deleteOnExit();
-    properties.put(HdfsConfig.STAGING_DIRECTORY(), 
stagingDirectory.toString());
+    properties.put(String.format(HdfsConfig.STAGING_DIRECTORY(), SYSTEM_NAME), 
stagingDirectory.toString());
     return new MapConfig(properties);
   }
 

Reply via email to