SAMZA-956 : Disk Quotas: Add throttler and disk quota enforcement

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/27c9e4c2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/27c9e4c2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/27c9e4c2

Branch: refs/heads/master
Commit: 27c9e4c2ed114c12c040f449bfa6ed1af71bdb7d
Parents: 312c1b1
Author: Chris Pettitt <cpett...@linkedin.com>
Authored: Mon Jun 20 15:55:01 2016 -0700
Committer: Navina Ramesh <nram...@linkedin.com>
Committed: Mon Jun 20 15:55:01 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   6 +-
 .../samza/container/disk/DiskQuotaPolicy.java   |  32 +
 .../container/disk/DiskQuotaPolicyFactory.java  |  34 +
 .../samza/container/disk/DiskSpaceMonitor.java  |   3 +
 .../disk/NoThrottlingDiskQuotaPolicy.java       |  39 +
 .../NoThrottlingDiskQuotaPolicyFactory.java     |  32 +
 .../disk/WatermarkDiskQuotaPolicy.java          | 216 +++++
 .../disk/WatermarkDiskQuotaPolicyFactory.java   | 111 +++
 .../apache/samza/util/HighResolutionClock.java  |  47 ++
 .../samza/util/SystemHighResolutionClock.java   |  41 +
 .../apache/samza/util/ThrottlingExecutor.java   | 135 ++++
 .../org/apache/samza/container/RunLoop.scala    |  25 +-
 .../samza/container/SameThreadExecutor.scala    |  26 +
 .../apache/samza/container/SamzaContainer.scala |  33 +-
 .../samza/container/SamzaContainer.scala.orig   | 787 +++++++++++++++++++
 .../samza/container/SamzaContainerMetrics.scala |  14 +-
 .../main/scala/org/apache/samza/util/Util.scala |  23 +
 .../disk/TestDiskQuotaPolicyEntry.java          |  82 ++
 .../disk/TestWatermarkDiskQuotaPolicy.java      | 155 ++++
 .../samza/util/TestThrottlingExecutor.java      | 255 ++++++
 .../scala/org/apache/samza/util/TestUtil.scala  |  16 +
 21 files changed, 2084 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7a09c7e..d0a5c66 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -115,7 +115,8 @@
     <subpackage name="util">
         <allow pkg="org.apache.samza.metrics" />
         <allow pkg="org.apache.samza.system" />
-
+        <allow pkg="junit.framework" />
+        
         <allow class="org.apache.samza.Partition" />
         <allow class="org.apache.samza.SamzaException" />
         <allow class="joptsimple.OptionSet" />
@@ -138,7 +139,10 @@
         <allow pkg="org.apache.samza.config" />
         <allow pkg="org.apache.samza.container" />
         <allow pkg="org.apache.samza.coordinator.stream" />
+        <allow pkg="org.apache.samza.util" />
+        <allow pkg="junit.framework" />
         <allow 
class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
+
         <subpackage name="grouper">
             <subpackage name="stream">
                 <allow pkg="org.apache.samza.system" />

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java
new file mode 100644
index 0000000..060f980
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+/**
+ * An object that produces a new work rate for the container based on the 
current percentage of
+ * available disk quota assigned to the container.
+ */
+public interface DiskQuotaPolicy {
+  /**
+   * Given the latest percentage of available disk quota, this method returns 
the work rate that
+   * should be applied to the container as a value in (0.0, 1.0].
+   */
+  double apply(double availableDiskQuotaPercentage);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java
new file mode 100644
index 0000000..00202a4
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.apache.samza.config.Config;
+
+/**
+ * A factory for creating a {@link DiskQuotaPolicy}. This interface is 
required due to Samza's
+ * string-only configuration.
+ */
+public interface DiskQuotaPolicyFactory {
+  /**
+   * Creates and returns a disk quota policy instance.
+   * @param config configuration that may apply to this factory
+   */
+  DiskQuotaPolicy create(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
index 2a565be..162cabf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java
@@ -51,6 +51,9 @@ public interface DiskSpaceMonitor {
   interface Listener {
     /**
      * Invoked with new samples as they become available.
+     * <p>
+     * Updates are guaranteed to be serialized. In other words, a listener's 
onUpdate callback
+     * may only be invoked once at a time by a DiskSpaceMonitor.
      *
      * @param diskUsageSample the measured disk usage size in bytes.
      */

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java
new file mode 100644
index 0000000..5cd54e0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DiskQuotaPolicy} that does no throttling of the work rate.
+ */
+public class NoThrottlingDiskQuotaPolicy implements DiskQuotaPolicy {
+  private static final Logger log = 
LoggerFactory.getLogger(NoThrottlingDiskQuotaPolicy.class);
+
+  public NoThrottlingDiskQuotaPolicy() {
+    log.info("Using a no throttling disk quota policy");
+  }
+
+  @Override
+  public double apply(double availableDiskQuotaPercentage) {
+    return 1.0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java
new file mode 100644
index 0000000..cad1aa8
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.apache.samza.config.Config;
+
+/**
+ * A factory that creates a {@link NoThrottlingDiskQuotaPolicy}.
+ */
+public class NoThrottlingDiskQuotaPolicyFactory implements 
DiskQuotaPolicyFactory {
+  @Override
+  public DiskQuotaPolicy create(Config config) {
+    return new NoThrottlingDiskQuotaPolicy();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
new file mode 100644
index 0000000..21fbca2
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.apache.samza.util.ThrottlingExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An object that calculates the current work rate using a configurable set of
+ * {@link Entry} instances.
+ * <p>
+ * The supplied {@link Entry} instances may overlap as long as the range
+ * between low and high water mark of one policy entry does not contain the 
range of low and high
+ * water mark of another policy entry. For example, the following entries 
would be OK:
+ *
+ * <ul>
+ *   <li>Low: 0.5, High: 1.0</li>
+ *   <li>Low:0.2, High: 0.8</li>
+ * </ul>
+ *
+ * But the following entries would not:
+ *
+ * <ul>
+ *   <li>Low: 0.1, High: 1.0</li>
+ *   <li>Low: 0.2, High: 0.5</li>
+ * </ul>
+ *
+ * Policy entries do not stack. In other words, there is always a clear entry 
to apply and its work
+ * factor is not added, multiplied, or in any other way joined with another 
entry's work rate.
+ */
+public class WatermarkDiskQuotaPolicy implements DiskQuotaPolicy {
+  /**
+   * A comparator that orders {@link Entry} instances in descending order 
first by
+   * high water mark and then by low water mark.
+   */
+  private static final Comparator<Entry> POLICY_COMPARATOR = new 
Comparator<Entry>() {
+    @Override
+    public int compare(Entry lhs, Entry rhs) {
+      if (lhs.getHighWaterMarkPercent() > rhs.getHighWaterMarkPercent()) {
+        return -1;
+      } else if (lhs.getHighWaterMarkPercent() < 
rhs.getHighWaterMarkPercent()) {
+        return 1;
+      } else if (lhs.getLowWaterMarkPercent() > rhs.getLowWaterMarkPercent()) {
+        return -1;
+      } else if (lhs.getLowWaterMarkPercent() < rhs.getLowWaterMarkPercent()) {
+        return 1;
+      }
+      return 0;
+    }
+  };
+
+  private static final Logger log = 
LoggerFactory.getLogger(WatermarkDiskQuotaPolicy.class);
+
+  // Lock guards entryIndex
+  private final List<Entry> entries;
+  private int entryIndex = -1;
+
+  private static String dumpPolicyEntries(List<Entry> entries) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < entries.size(); ++i) {
+      final Entry entry = entries.get(i);
+      sb.append(String.format("\n    Policy entry %d. Low: %f. High: %f. Work 
Factor: %f",
+          i,
+          entry.getLowWaterMarkPercent(),
+          entry.getHighWaterMarkPercent(),
+          entry.getWorkFactor()));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Constructs a new watermark disk quota policy using the supplied policy 
entries.
+   *
+   * @param entries a list of {@link Entry} objects that describe how to 
adjust the work rate. May
+   *                 be empty, but cannot be {@code null}.
+   */
+  public WatermarkDiskQuotaPolicy(List<Entry> entries) {
+    // Copy entries, sort, make immutable
+    entries = new ArrayList<>(entries);
+    Collections.sort(entries, POLICY_COMPARATOR);
+    this.entries = Collections.unmodifiableList(entries);
+
+    // Validate entries
+    double lastHighWaterMark = 1.0;
+    double lastWorkFactor = ThrottlingExecutor.MAX_WORK_FACTOR;
+    for (int i = 0; i < entries.size(); ++i) {
+      final Entry entry = entries.get(i);
+
+      if (lastHighWaterMark < entry.getHighWaterMarkPercent()) {
+        throw new IllegalArgumentException("Policy entry " + i +
+            " has high water mark (" + entry.getHighWaterMarkPercent() +
+            ") > previous high water mark (" + lastHighWaterMark + "):" +
+            dumpPolicyEntries(entries));
+      }
+
+      if (lastWorkFactor < entry.getWorkFactor()) {
+        throw new IllegalArgumentException("Policy entry " + i +
+            " has work factor (" + entry.getWorkFactor() +
+            ") < previous work factor (" + lastWorkFactor + "):" +
+            dumpPolicyEntries(entries));
+      }
+
+      if (entry.getWorkFactor() < ThrottlingExecutor.MIN_WORK_FACTOR) {
+        throw new IllegalArgumentException("Policy entry " + i +
+            " has work factor (" + entry.getWorkFactor() +
+            ") < minimum work factor (" + ThrottlingExecutor.MIN_WORK_FACTOR + 
"):" +
+            dumpPolicyEntries(entries));
+      }
+
+      lastHighWaterMark = entry.getHighWaterMarkPercent();
+      lastWorkFactor = entry.getWorkFactor();
+    }
+
+    log.info("Using the following disk quota enforcement entries: {}",
+        entries.isEmpty() ? "NONE" : dumpPolicyEntries(entries));
+  }
+
+  @Override
+  public double apply(double availableDiskQuotaPercentage) {
+    double workFactor = entryIndex == -1 ? 1.0 : 
this.entries.get(entryIndex).getWorkFactor();
+    int entryIndex = this.entryIndex;
+
+    while (entryIndex >= 0 &&
+        entries.get(entryIndex).getHighWaterMarkPercent() <= 
availableDiskQuotaPercentage) {
+      --entryIndex;
+    }
+
+    while (entryIndex < entries.size() - 1 &&
+        entries.get(entryIndex + 1).getLowWaterMarkPercent() > 
availableDiskQuotaPercentage) {
+      ++entryIndex;
+    }
+
+    if (entryIndex != this.entryIndex) {
+      workFactor = entryIndex == -1 ? 1.0 : 
entries.get(entryIndex).getWorkFactor();
+      this.entryIndex = entryIndex;
+      log.info("Work factor has been updated: {}.", workFactor);
+    }
+
+    return workFactor;
+  }
+
+  /**
+   * A thread-safe value object that represents a policy entry for disk 
quotas. When the percentage
+   * of available disk space assigned via the disk quota drops below the low 
water mark the
+   * configured work rate is applied to reduce throughput of the task. When 
the available disk space
+   * rises above the high water mark the work rate throttling is removed (but 
other entries may
+   * still apply).
+   */
+  public static class Entry {
+    private final double lowWaterMarkPercent;
+    private final double highWaterMarkPercent;
+    private final double workFactor;
+
+    public Entry(double lowWaterMarkPercent, double highWaterMarkPercent, 
double workFactor) {
+      if (lowWaterMarkPercent < 0.0) {
+        throw new IllegalArgumentException("low water mark percent (" + 
lowWaterMarkPercent + ") < 0");
+      }
+
+      if (highWaterMarkPercent > 1.0) {
+        throw new IllegalArgumentException("high water mark percent (" + 
highWaterMarkPercent + ") > 1");
+      }
+
+      if (lowWaterMarkPercent > highWaterMarkPercent) {
+        throw new IllegalArgumentException("low water mark percent (" + 
lowWaterMarkPercent + ") > " +
+          "high water mark percent (" + highWaterMarkPercent + ")");
+      }
+
+      if (workFactor <= 0.0) {
+        throw new IllegalArgumentException("work factor (" + workFactor + ") 
<= 0");
+      }
+
+      if (workFactor > 1.0) {
+        throw new IllegalArgumentException("work factor (" + workFactor + ") > 
1");
+      }
+
+      this.lowWaterMarkPercent = lowWaterMarkPercent;
+      this.highWaterMarkPercent = highWaterMarkPercent;
+      this.workFactor = workFactor;
+    }
+
+    public double getLowWaterMarkPercent() {
+      return lowWaterMarkPercent;
+    }
+
+    public double getHighWaterMarkPercent() {
+      return highWaterMarkPercent;
+    }
+
+    public double getWorkFactor() {
+      return workFactor;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java
new file mode 100644
index 0000000..42e2d3b
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A factory for producing {@link WatermarkDiskQuotaPolicy} instances. This 
class is only required
+ * due to Samza's string-only config. In the event that a richer config system 
is added it would
+ * be possible to construct a {@link WatermarkDiskQuotaPolicy} directly.
+ * <p>
+ * <table>
+ *   <tr><th>Name</th><th>Default</th><th>Description</th></tr>
+ *   <tr>
+ *     <td>container.disk.quota.policy.count</td>
+ *     <td>0</td>
+ *     <td>
+ *       The number of entries for this policy. Entries are configured with 
the following keys by
+ *       substituting a 0-based index for each policy. For example, to 
configure the low water mark
+ *       for the first policy to 0.5, use 
<code>container.disk.quota.policy.0.lowWaterMark=0.5</code>.
+ *       Setting this value to 0 disables this policy.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td>container.disk.quota.policy.entry-number.lowWaterMark</td>
+ *     <td></td>
+ *     <td>
+ *       <strong>Required.</strong>
+ *       The low water mark for this entry. If the available percentage of 
disk quota drops below
+ *       this low water mark then the work factor for this entry is used 
unless the available
+ *       percentage drops below an entry with an even lower low water mark. 
For example, if the low
+ *       water mark has a value of <code>0.5</code> and the available 
percentage of disk quota drops
+ *       below 50% then the work factor for this entry is used. However, if 
there were another entry
+ *       that had a low water mark of <code>0.4</code> and the disk quota 
dropped below 40% then
+ *       the other entry's work factor would be used instead.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td>container.disk.quota.policy.entry-number.highWaterMark</td>
+ *     <td></td>
+ *     <td>
+ *       <strong>Required.</strong>
+ *       The high water mark for this entry. If the available percentage of 
disk quota rises above
+ *       this high water mark then the work factor for this entry is not 
longer used and the next
+ *       highest work factor is used instead. If there is no other higher work 
factor then the work
+ *       factor resets to <code>1.0</code>.
+ *     </td>
+ *   </tr>
+ *   <tr>
+ *     <td>container.disk.quota.policy.entry-number.workFactor</td>
+ *     <td></td>
+ *     <td>
+ *       <strong>Required.</strong>
+ *       The work factor to apply when this entry is triggered (i.e. due to 
the available percentage
+ *       of disk quota dropping below this entry's low water mark). The work 
factor is a hint at
+ *       the percentage of time that the run loop should be used to process 
new work. For example,
+ *       a value of <code>1.0</code> indicates that the run loop should be 
working at full rate
+ *       (i.e. as fast as it can). A value of <code>0.5</code> means that the 
run loop should only
+ *       run half as fast as it can. A value of <code>0.2</code> means that 
the run loop should only
+ *       run at a 20% of its usual speed.
+ *     </td>
+ *   </tr>
+ * </table>
+ */
+public class WatermarkDiskQuotaPolicyFactory implements DiskQuotaPolicyFactory 
{
+  private static final Logger log = 
LoggerFactory.getLogger(WatermarkDiskQuotaPolicyFactory.class);
+
+  private static final String POLICY_COUNT_KEY = 
"container.disk.quota.policy.count";
+
+  @Override
+  public DiskQuotaPolicy create(Config config) {
+    final int entryCount = config.getInt(POLICY_COUNT_KEY, 0);
+    if (entryCount == 0) {
+      log.info("Using a no throttling disk quota policy because policy entry 
count was missing or set to zero ({})",
+          POLICY_COUNT_KEY);
+      return new NoThrottlingDiskQuotaPolicy();
+    }
+
+    final List<WatermarkDiskQuotaPolicy.Entry> entries = new 
ArrayList<WatermarkDiskQuotaPolicy.Entry>();
+    for (int i = 0; i < entryCount; ++i) {
+      final double lowWaterMark = 
config.getDouble(String.format("container.disk.quota.policy.%d.lowWaterMark", 
i));
+      final double highWaterMark = 
config.getDouble(String.format("container.disk.quota.policy.%d.highWaterMark", 
i));
+      final double workFactor = 
config.getDouble(String.format("container.disk.quota.policy.%d.workFactor", i));
+      entries.add(new WatermarkDiskQuotaPolicy.Entry(lowWaterMark, 
highWaterMark, workFactor));
+    }
+
+    return new WatermarkDiskQuotaPolicy(entries);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
new file mode 100644
index 0000000..69ba441
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+/**
+ * An object that can provide time points (useful for getting the elapsed time 
between two time
+ * points) and can sleep for a specified period of time.
+ * <p>
+ * Instances of this interface must be thread-safe.
+ */
+interface HighResolutionClock {
+  /**
+   * Returns a time point that can be used to calculate the difference in 
nanoseconds with another
+   * time point. Resolution of the timer is platform dependent and not 
guaranteed to actually
+   * operate at nanosecond precision.
+   *
+   * @return current time point in nanoseconds
+   */
+  long nanoTime();
+
+  /**
+   * Sleeps for a period of time that approximates the requested number of 
nanoseconds. Actual sleep
+   * time can vary significantly based on the JVM implementation and platform. 
This function returns
+   * the measured error between expected and actual sleep time.
+   *
+   * @param nanos the number of nanoseconds to sleep.
+   * @throws InterruptedException if the current thread is interrupted while 
blocked in this method.
+   */
+  long sleep(long nanos) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
new file mode 100644
index 0000000..2e65b60
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.concurrent.TimeUnit;
+
+class SystemHighResolutionClock implements HighResolutionClock {
+  @Override
+  public long nanoTime() {
+    return System.nanoTime();
+  }
+
+  @Override
+  public long sleep(long nanos) throws InterruptedException {
+    if (nanos <= 0) {
+      return nanos;
+    }
+
+    final long start = System.nanoTime();
+    TimeUnit.NANOSECONDS.sleep(nanos);
+
+    return Util.clampAdd(nanos, -(System.nanoTime() - start));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
new file mode 100644
index 0000000..214cefd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.concurrent.Executor;
+
+/**
+ * An object that performs work on the current thread and optionally slows the 
rate of execution.
+ * By default work submitted with {@link #execute(Runnable)} will not be 
throttled. Work can be
+ * throttled by invoking {@link #setWorkFactor(double)}.
+ * <p>
+ * This class is *NOT* thread-safe. It is intended to be used from a single 
thread. However, the
+ * work factor may be set from any thread.
+ */
+public class ThrottlingExecutor implements Executor {
+  public static final double MAX_WORK_FACTOR = 1.0;
+  public static final double MIN_WORK_FACTOR = 0.001;
+
+  private final HighResolutionClock clock;
+
+  private volatile double workToIdleFactor;
+  private long pendingNanos;
+
+  public ThrottlingExecutor() {
+    this(new SystemHighResolutionClock());
+  }
+
+  ThrottlingExecutor(HighResolutionClock clock) {
+    this.clock = clock;
+  }
+
+  /**
+   * Executes the given command on the current thread. If throttling is 
enabled (the work factor
+   * is less than 1.0) this command may optionally insert a delay before 
returning to satisfy the
+   * requested work factor.
+   * <p>
+   * This method will not operate correct if used by more than one thread.
+   *
+   * @param command the work to execute
+   */
+  public void execute(Runnable command) {
+    final double currentWorkToIdleFactor = workToIdleFactor;
+
+    // If we're not throttling, do not get clock time, etc. This substantially 
reduces the overhead
+    // per invocation of this feature (by ~75%).
+    if (currentWorkToIdleFactor == 0.0) {
+      command.run();
+    } else {
+      final long startWorkNanos = clock.nanoTime();
+      command.run();
+      final long workNanos = clock.nanoTime() - startWorkNanos;
+
+      // NOTE: we accumulate pending delay nanos here, but we later update the 
pending nanos during
+      // the sleep operation (if applicable), so they do not continue to grow.
+      pendingNanos = Util.clampAdd(pendingNanos, (long) (workNanos * 
currentWorkToIdleFactor));
+      if (pendingNanos > 0) {
+        try {
+          pendingNanos = clock.sleep(pendingNanos);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  /**
+   * Sets the work factor for this executor. A work factor of {@code 1.0} 
indicates that execution
+   * should proceed at full throughput. A work factor of less than {@code 1.0} 
will introduce
+   * delays into the {@link #execute(Runnable)} call to approximate the 
requested work factor. For
+   * example, if the work factor is {@code 0.7} then approximately 70% of the 
execute call will be
+   * spent executing the supplied command while 30% will be spent idle.
+   *
+   * @param workFactor the work factor to set for this executor.
+   */
+  public void setWorkFactor(double workFactor) {
+    if (workFactor < MIN_WORK_FACTOR) {
+      throw new IllegalArgumentException("Work factor must be >= " + 
MIN_WORK_FACTOR);
+    }
+    if (workFactor > MAX_WORK_FACTOR) {
+      throw new IllegalArgumentException("Work factor must be <= " + 
MAX_WORK_FACTOR);
+    }
+
+    workToIdleFactor = (1.0 - workFactor) / workFactor;
+  }
+
+  /**
+   * Returns the current work factor in use.
+   * @see #setWorkFactor(double)
+   * @return the current work factor.
+   */
+  public double getWorkFactor() {
+    return 1.0 / (workToIdleFactor + 1.0);
+  }
+
+  /**
+   * Returns the total amount of delay (in nanoseconds) that needs to be 
applied to subsequent work.
+   * Alternatively this can be thought to capture the error between expected 
delay and actual
+   * applied delay. This accounts for variance in the precision of the clock 
and the delay
+   * mechanism, both of which may vary from platform to platform.
+   * <p>
+   * This is required for test purposes only.
+   *
+   * @return the total amount of delay (in nanoseconds) that needs to be 
applied to subsequent work.
+   */
+  long getPendingNanos() {
+    return pendingNanos;
+  }
+
+  /**
+   * A convenience method for test that allows the pending nanos for this 
executor to be set
+   * explicitly.
+   *
+   * @param pendingNanos the pending nanos to set.
+   */
+  void setPendingNanos(long pendingNanos) {
+    this.pendingNanos = pendingNanos;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 3f25eca..cf05c15 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.container
 
+import java.util.concurrent.Executor
+
 import org.apache.samza.system.{SystemConsumers, SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.util.{Logging, TimerUtils}
@@ -39,7 +41,8 @@ class RunLoop(
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
   val clock: () => Long = { System.nanoTime },
-  val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
+  val shutdownMs: Long = 5000,
+  val executor: Executor = new SameThreadExecutor()) extends Runnable with 
TimerUtils with Logging {
 
   private val metricsMsOffset = 1000000L
   private var lastWindowNs = clock()
@@ -69,14 +72,20 @@ class RunLoop(
   def run {
     addShutdownHook(Thread.currentThread())
 
+    val runTask = new Runnable() {
+      override def run(): Unit = {
+        val loopStartTime = clock()
+        process
+        window
+        commit
+        val totalNs = clock() - loopStartTime
+        metrics.utilization.set(activeNs.toFloat / totalNs)
+        activeNs = 0L
+      }
+    }
+
     while (!shutdownNow) {
-      val loopStartTime = clock()
-      process
-      window
-      commit
-      val totalNs = clock() - loopStartTime
-      metrics.utilization.set(activeNs.toFloat/totalNs)
-      activeNs = 0L
+      executor.execute(runTask)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala
new file mode 100644
index 0000000..29f737e
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.util.concurrent.Executor
+
+private[container] class SameThreadExecutor() extends Executor {
+  override def execute(command: Runnable): Unit = command.run()
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2e8a500..5cbdb4b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -32,7 +32,8 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
-import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, 
DiskSpaceMonitor}
+import org.apache.samza.container.disk.WatermarkDiskQuotaPolicy.Entry
+import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, 
DiskQuotaPolicyFactory, NoThrottlingDiskQuotaPolicy, WatermarkDiskQuotaPolicy, 
PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
@@ -56,7 +57,7 @@ import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util}
+import org.apache.samza.util.{ThrottlingExecutor, ExponentialSleepStrategy, 
Logging, Util}
 import scala.collection.JavaConversions._
 import java.net.{UnknownHostException, InetAddress, URL}
 import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
@@ -66,6 +67,7 @@ import java.lang.Thread.UncaughtExceptionHandler
 
 object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
+  val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
   def main(args: Array[String]) {
     safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => 
System.exit(1)))
@@ -520,18 +522,32 @@ object SamzaContainer extends Logging {
       (taskName, taskInstance)
     }).toMap
 
-    val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0)
+    val executor = new ThrottlingExecutor()
+
+    val diskQuotaBytes = config.getLong("container.disk.quota.bytes", 
Long.MaxValue)
+    samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
+
+    val diskQuotaPolicyFactoryString = 
config.get("container.disk.quota.policy.factory",
+      classOf[NoThrottlingDiskQuotaPolicyFactory].getName)
+    val diskQuotaPolicyFactory = 
Util.getObj[DiskQuotaPolicyFactory](diskQuotaPolicyFactoryString)
+    val diskQuotaPolicy = diskQuotaPolicyFactory.create(config)
+
     var diskSpaceMonitor: DiskSpaceMonitor = null
+    val diskPollMillis = config.getInt(DISK_POLL_INTERVAL_KEY, 0)
     if (diskPollMillis != 0) {
-      val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge()
-
       diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, 
diskPollMillis)
       diskSpaceMonitor.registerListener(new Listener {
-        override def onUpdate(diskUsageSample: Long): Unit =
-          diskUsage.set(diskUsageSample)
+        override def onUpdate(diskUsageBytes: Long): Unit = {
+          val newWorkRate = diskQuotaPolicy.apply(1.0 - 
(diskUsageBytes.toDouble / diskQuotaBytes))
+          executor.setWorkFactor(newWorkRate)
+          samzaContainerMetrics.executorWorkFactor.set(executor.getWorkFactor)
+          samzaContainerMetrics.diskUsageBytes.set(diskUsageBytes)
+        }
       })
 
       info("Initialized disk space monitor watch paths to: %s" format 
storeWatchPaths)
+    } else {
+      info(s"Disk quotas disabled because polling interval is not set 
($DISK_POLL_INTERVAL_KEY)")
     }
 
     val runLoop = new RunLoop(
@@ -540,7 +556,8 @@ object SamzaContainer extends Logging {
       metrics = samzaContainerMetrics,
       windowMs = taskWindowMs,
       commitMs = taskCommitMs,
-      shutdownMs = taskShutdownMs)
+      shutdownMs = taskShutdownMs,
+      executor = executor)
 
     info("Samza container setup complete.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
new file mode 100644
index 0000000..086531e
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container
+
+import java.io.File
+import java.nio.file.Path
+import java.util
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, 
OffsetManagerMetrics}
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.config.SerializerConfig.Config2Serializer
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
+import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, 
DiskSpaceMonitor}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.serializers.SerdeFactory
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.storage.TaskStorageManager
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemConsumersMetrics
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemProducersMetrics
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.chooser.DefaultChooser
+import org.apache.samza.system.chooser.MessageChooserFactory
+import org.apache.samza.system.chooser.RoundRobinChooserFactory
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util}
+import scala.collection.JavaConversions._
+import java.net.{UnknownHostException, InetAddress, URL}
+import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.config.JobConfig.Config2Job
+import java.lang.Thread.UncaughtExceptionHandler
+
+object SamzaContainer extends Logging {
+  val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
+
+  def main(args: Array[String]) {
+    safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => 
System.exit(1)))
+  }
+
+  def safeMain(
+    newJmxServer: () => JmxServer,
+    exceptionHandler: UncaughtExceptionHandler = null) {
+    if (exceptionHandler != null) {
+      Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+    }
+    putMDC("containerName", "samza-container-" + 
System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
+    // Break out the main method to make the JmxServer injectable so we can
+    // validate that we don't leak JMX non-daemon threads if we have an
+    // exception in the main method.
+    val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
+    logger.info("Got container ID: %s" format containerId)
+    val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
+    logger.info("Got coordinator URL: %s" format coordinatorUrl)
+    val jobModel = readJobModel(coordinatorUrl)
+    val containerModel = jobModel.getContainers()(containerId.toInt)
+    val config = jobModel.getConfig
+    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can 
not find the job name")))
+    putMDC("jobId", config.getJobId.getOrElse("1"))
+    var jmxServer: JmxServer = null
+
+    try {
+      jmxServer = newJmxServer()
+      SamzaContainer(containerModel, jobModel, jmxServer).run
+    } finally {
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
+    }
+  }
+
+  /**
+   * Fetches config, task:SSP assignments, and task:changelog partition
+   * assignments, and returns objects to be used for SamzaContainer's
+   * constructor.
+   */
+  def readJobModel(url: String, initialDelayMs: Int = 
scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS) + 1) = {
+    info("Fetching configuration from: %s" format url)
+    SamzaObjectMapper
+      .getObjectMapper
+      .readValue(
+        Util.read(
+          url = new URL(url),
+          retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 
initialDelayMs)),
+        classOf[JobModel])
+  }
+
+  def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: 
JmxServer) = {
+    val config = jobModel.getConfig
+    val containerId = containerModel.getContainerId
+    val containerName = "samza-container-%s" format containerId
+    val containerPID = Util.getContainerPID
+
+    info("Setting up Samza container: %s" format containerName)
+    info("Samza container PID: %s" format containerPID)
+    info("Using configuration: %s" format config)
+    info("Using container model: %s" format containerModel)
+
+    val registry = new MetricsRegistryMap(containerName)
+    val samzaContainerMetrics = new SamzaContainerMetrics(containerName, 
registry)
+    val systemProducersMetrics = new SystemProducersMetrics(registry)
+    val systemConsumersMetrics = new SystemConsumersMetrics(registry)
+    val offsetManagerMetrics = new OffsetManagerMetrics(registry)
+
+    val inputSystemStreamPartitions = containerModel
+      .getTasks
+      .values
+      .flatMap(_.getSystemStreamPartitions)
+      .toSet
+
+    val inputSystemStreams = inputSystemStreamPartitions
+      .map(_.getSystemStream)
+      .toSet
+
+    val inputSystems = inputSystemStreams
+      .map(_.getSystem)
+      .toSet
+
+    val systemNames = config.getSystemNames
+
+    info("Got system names: %s" format systemNames)
+
+    val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ 
config.getSerdeStreams(_))
+
+    debug("Got serde streams: %s" format serdeStreams)
+
+    val serdeNames = config.getSerdeNames
+
+    info("Got serde names: %s" format serdeNames)
+
+    val systemFactories = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
+      (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
+    }).toMap
+
+    val systemAdmins = systemNames
+      .map(systemName => (systemName, 
systemFactories(systemName).getAdmin(systemName, config)))
+      .toMap
+
+    info("Got system factories: %s" format systemFactories.keys)
+
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(inputSystemStreams)
+
+    info("Got input stream metadata: %s" format inputStreamMetadata)
+
+    val consumers = inputSystems
+      .map(systemName => {
+        val systemFactory = systemFactories(systemName)
+
+        try {
+          (systemName, systemFactory.getConsumer(systemName, config, 
samzaContainerMetrics.registry))
+        } catch {
+          case e: Exception =>
+            error("Failed to create a consumer for %s, so skipping." 
format(systemName), e)
+            (systemName, null)
+        }
+      })
+      .filter(_._2 != null)
+      .toMap
+
+    info("Got system consumers: %s" format consumers.keys)
+
+    val producers = systemFactories
+      .map {
+        case (systemName, systemFactory) =>
+          try {
+            (systemName, systemFactory.getProducer(systemName, config, 
samzaContainerMetrics.registry))
+          } catch {
+            case e: Exception =>
+              error("Failed to create a producer for %s, so skipping." 
format(systemName), e)
+              (systemName, null)
+          }
+      }
+      .filter(_._2 != null)
+      .toMap
+
+    info("Got system producers: %s" format producers.keys)
+
+    val serdes = serdeNames.map(serdeName => {
+      val serdeClassName = config
+        .getSerdeClass(serdeName)
+        .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
+
+      val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
+        .getSerde(serdeName, config)
+
+      (serdeName, serde)
+    }).toMap
+
+    info("Got serdes: %s" format serdes.keys)
+
+    /*
+     * A Helper function to build a Map[String, Serde] (systemName -> Serde) 
for systems defined in the config. This is useful to build both key and message 
serde maps.
+     */
+    val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
+      systemNames
+        .filter( sn => {
+          val serde = getSerdeName(sn)
+          serde.isDefined && !serde.get.equals("")
+        }).map(systemName => {
+          val serdeName = getSerdeName(systemName).get
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
+          (systemName, serde)
+        }).toMap
+    }
+
+    /*
+     * A Helper function to build a Map[SystemStream, Serde] for streams 
defined in the config. This is useful to build both key and message serde maps.
+     */
+    val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => 
Option[String]) => {
+      (serdeStreams ++ inputSystemStreamPartitions)
+        .filter(systemStream => getSerdeName(systemStream).isDefined)
+        .map(systemStream => {
+          val serdeName = getSerdeName(systemStream).get
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
+          (systemStream, serde)
+        }).toMap
+    }
+
+    val systemKeySerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemKeySerde(systemName))
+
+    debug("Got system key serdes: %s" format systemKeySerdes)
+
+    val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemMsgSerde(systemName))
+
+    debug("Got system message serdes: %s" format systemMessageSerdes)
+
+    val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamKeySerde(systemStream))
+
+    debug("Got system stream key serdes: %s" format systemStreamKeySerdes)
+
+    val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamMsgSerde(systemStream))
+
+    debug("Got system stream message serdes: %s" format 
systemStreamMessageSerdes)
+
+    val changeLogSystemStreams = config
+      .getStoreNames
+      .filter(config.getChangelogStream(_).isDefined)
+      .map(name => (name, config.getChangelogStream(name).get)).toMap
+      .mapValues(Util.getSystemStreamFromNames(_))
+
+    info("Got change log system streams: %s" format changeLogSystemStreams)
+
+    val serdeManager = new SerdeManager(
+      serdes = serdes,
+      systemKeySerdes = systemKeySerdes,
+      systemMessageSerdes = systemMessageSerdes,
+      systemStreamKeySerdes = systemStreamKeySerdes,
+      systemStreamMessageSerdes = systemStreamMessageSerdes,
+      changeLogSystemStreams = changeLogSystemStreams.values.toSet)
+
+    info("Setting up JVM metrics.")
+
+    val jvm = new JvmMetrics(samzaContainerMetrics.registry)
+
+    info("Setting up message chooser.")
+
+    val chooserFactoryClassName = 
config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
+
+    val chooserFactory = 
Util.getObj[MessageChooserFactory](chooserFactoryClassName)
+
+    val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, 
samzaContainerMetrics.registry)
+
+    info("Setting up metrics reporters.")
+
+    val reporters = config.getMetricReporterNames.map(reporterName => {
+      val metricsFactoryClassName = config
+        .getMetricsFactoryClass(reporterName)
+        .getOrElse(throw new SamzaException("Metrics reporter %s missing 
.class config" format reporterName))
+
+      val reporter =
+        Util
+          .getObj[MetricsReporterFactory](metricsFactoryClassName)
+          .getMetricsReporter(reporterName, containerName, config)
+      (reporterName, reporter)
+    }).toMap
+
+    info("Got metrics reporters: %s" format reporters.keys)
+
+    val securityManager = config.getSecurityManagerFactory match {
+      case Some(securityManagerFactoryClassName) =>
+        Util
+          .getObj[SecurityManagerFactory](securityManagerFactoryClassName)
+          .getSecurityManager(config)
+      case _ => null
+    }
+    info("Got security manager: %s" format securityManager)
+
+    val coordinatorSystemProducer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, 
samzaContainerMetrics.registry)
+    val localityManager = new LocalityManager(coordinatorSystemProducer)
+    val checkpointManager = config.getCheckpointManagerFactory() match {
+      case Some(checkpointFactoryClassName) if 
(!checkpointFactoryClassName.isEmpty) =>
+        Util
+          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+          .getCheckpointManager(config, samzaContainerMetrics.registry)
+      case _ => null
+    }
+    info("Got checkpoint manager: %s" format checkpointManager)
+
+    val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, systemAdmins, offsetManagerMetrics)
+
+    info("Got offset manager: %s" format offsetManager)
+
+    val dropDeserializationError = config.getDropDeserialization match {
+      case Some(dropError) => dropError.toBoolean
+      case _ => false
+    }
+
+    val dropSerializationError = config.getDropSerialization match {
+      case Some(dropError) => dropError.toBoolean
+      case _ => false
+    }
+
+    val pollIntervalMs = config
+      .getPollIntervalMs
+      .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
+      .toInt
+
+    val consumerMultiplexer = new SystemConsumers(
+      chooser = chooser,
+      consumers = consumers,
+      serdeManager = serdeManager,
+      metrics = systemConsumersMetrics,
+      dropDeserializationError = dropDeserializationError,
+      pollIntervalMs = pollIntervalMs)
+
+    val producerMultiplexer = new SystemProducers(
+      producers = producers,
+      serdeManager = serdeManager,
+      metrics = systemProducersMetrics,
+      dropSerializationError = dropSerializationError)
+
+    val storageEngineFactories = config
+      .getStoreNames
+      .map(storeName => {
+        val storageFactoryClassName = config
+          .getStorageFactoryClassName(storeName)
+          .getOrElse(throw new SamzaException("Missing storage factory for 
%s." format storeName))
+        (storeName, Util.getObj[StorageEngineFactory[Object, 
Object]](storageFactoryClassName))
+      }).toMap
+
+    info("Got storage engines: %s" format storageEngineFactories.keys)
+
+    val taskClassName = config
+      .getTaskClass
+      .getOrElse(throw new SamzaException("No task class defined in 
configuration."))
+
+    info("Got stream task class: %s" format taskClassName)
+
+    val taskWindowMs = config.getWindowMs.getOrElse(-1L)
+
+    info("Got window milliseconds: %s" format taskWindowMs)
+
+    val taskCommitMs = config.getCommitMs.getOrElse(60000L)
+
+    info("Got commit milliseconds: %s" format taskCommitMs)
+
+    val taskShutdownMs = config.getShutdownMs.getOrElse(5000L)
+
+    info("Got shutdown timeout milliseconds: %s" format taskShutdownMs)
+
+    // Wire up all task-instance-level (unshared) objects.
+
+    val taskNames = containerModel
+      .getTasks
+      .values
+      .map(_.getTaskName)
+      .toSet
+    val containerContext = new SamzaContainerContext(containerId, config, 
taskNames)
+
+    // TODO not sure how we should make this config based, or not. Kind of
+    // strange, since it has some dynamic directories when used with YARN.
+    val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+    info("Got default storage engine base directory: %s" format 
defaultStoreBaseDir)
+
+    val storeWatchPaths = new util.HashSet[Path]()
+    storeWatchPaths.add(defaultStoreBaseDir.toPath)
+
+    val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.map(taskModel => {
+      debug("Setting up task instance: %s" format taskModel)
+
+      val taskName = taskModel.getTaskName
+
+      val task = Util.getObj[StreamTask](taskClassName)
+
+      val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format 
taskName)
+
+      val collector = new TaskInstanceCollector(producerMultiplexer, 
taskInstanceMetrics)
+
+      val storeConsumers = changeLogSystemStreams
+        .map {
+          case (storeName, changeLogSystemStream) =>
+            val systemConsumer = systemFactories
+              .getOrElse(changeLogSystemStream.getSystem, throw new 
SamzaException("Changelog system %s for store %s does not exist in the config." 
format (changeLogSystemStream, storeName)))
+              .getConsumer(changeLogSystemStream.getSystem, config, 
taskInstanceMetrics.registry)
+            samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
+            (storeName, systemConsumer)
+        }.toMap
+
+      info("Got store consumers: %s" format storeConsumers)
+
+      var loggedStorageBaseDir: File = null
+      if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
+        val jobNameAndId = Util.getJobNameAndId(config)
+        loggedStorageBaseDir = new 
File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + 
File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+      } else {
+        warn("No override was provided for logged store base directory. This 
disables local state re-use on " +
+          "application restart. If you want to enable this feature, set 
LOGGED_STORE_BASE_DIR as an environment " +
+          "variable in all machines running the Samza container")
+        loggedStorageBaseDir = defaultStoreBaseDir
+      }
+
+      storeWatchPaths.add(loggedStorageBaseDir.toPath)
+
+      info("Got base directory for logged data stores: %s" format 
loggedStorageBaseDir)
+
+      val taskStores = storageEngineFactories
+        .map {
+          case (storeName, storageEngineFactory) =>
+            val changeLogSystemStreamPartition = if 
(changeLogSystemStreams.contains(storeName)) {
+              new SystemStreamPartition(changeLogSystemStreams(storeName), 
taskModel.getChangelogPartition)
+            } else {
+              null
+            }
+            val keySerde = config.getStorageKeySerde(storeName) match {
+              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("No class defined for serde: %s." format keySerde))
+              case _ => null
+            }
+            val msgSerde = config.getStorageMsgSerde(storeName) match {
+              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("No class defined for serde: %s." format msgSerde))
+              case _ => null
+            }
+            val storeBaseDir = if(changeLogSystemStreamPartition != null) {
+              TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, 
storeName, taskName)
+            }
+            else {
+              TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, 
storeName, taskName)
+            }
+            val storageEngine = storageEngineFactory.getStorageEngine(
+              storeName,
+              storeBaseDir,
+              keySerde,
+              msgSerde,
+              collector,
+              taskInstanceMetrics.registry,
+              changeLogSystemStreamPartition,
+              containerContext)
+            (storeName, storageEngine)
+        }
+
+      info("Got task stores: %s" format taskStores)
+
+      val storageManager = new TaskStorageManager(
+        taskName = taskName,
+        taskStores = taskStores,
+        storeConsumers = storeConsumers,
+        changeLogSystemStreams = changeLogSystemStreams,
+        jobModel.maxChangeLogStreamPartitions,
+        streamMetadataCache = streamMetadataCache,
+        storeBaseDir = defaultStoreBaseDir,
+        loggedStoreBaseDir = loggedStorageBaseDir,
+        partition = taskModel.getChangelogPartition,
+        systemAdmins = systemAdmins)
+
+      val systemStreamPartitions = taskModel
+        .getSystemStreamPartitions
+        .toSet
+
+      info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " 
for " + taskName)
+
+      val taskInstance = new TaskInstance(
+        task = task,
+        taskName = taskName,
+        config = config,
+        metrics = taskInstanceMetrics,
+        systemAdmins = systemAdmins,
+        consumerMultiplexer = consumerMultiplexer,
+        collector = collector,
+        containerContext = containerContext,
+        offsetManager = offsetManager,
+        storageManager = storageManager,
+        reporters = reporters,
+        systemStreamPartitions = systemStreamPartitions,
+        exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, 
config))
+
+      (taskName, taskInstance)
+    }).toMap
+
+    val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0)
+    var diskSpaceMonitor: DiskSpaceMonitor = null
+    if (diskPollMillis != 0) {
+      val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge()
+
+      diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, 
diskPollMillis)
+      diskSpaceMonitor.registerListener(new Listener {
+        override def onUpdate(diskUsageSample: Long): Unit =
+          diskUsage.set(diskUsageSample)
+      })
+
+      info("Initialized disk space monitor watch paths to: %s" format 
storeWatchPaths)
+    }
+
+    val runLoop = new RunLoop(
+      taskInstances = taskInstances,
+      consumerMultiplexer = consumerMultiplexer,
+      metrics = samzaContainerMetrics,
+      windowMs = taskWindowMs,
+      commitMs = taskCommitMs,
+      shutdownMs = taskShutdownMs)
+
+    info("Samza container setup complete.")
+
+    new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = taskInstances,
+      runLoop = runLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      offsetManager = offsetManager,
+      localityManager = localityManager,
+      securityManager = securityManager,
+      metrics = samzaContainerMetrics,
+      reporters = reporters,
+      jvm = jvm,
+      jmxServer = jmxServer,
+      diskSpaceMonitor = diskSpaceMonitor)
+  }
+}
+
+class SamzaContainer(
+  containerContext: SamzaContainerContext,
+  taskInstances: Map[TaskName, TaskInstance],
+  runLoop: RunLoop,
+  consumerMultiplexer: SystemConsumers,
+  producerMultiplexer: SystemProducers,
+  metrics: SamzaContainerMetrics,
+  jmxServer: JmxServer,
+  diskSpaceMonitor: DiskSpaceMonitor = null,
+  offsetManager: OffsetManager = new OffsetManager,
+  localityManager: LocalityManager = null,
+  securityManager: SecurityManager = null,
+  reporters: Map[String, MetricsReporter] = Map(),
+  jvm: JvmMetrics = null) extends Runnable with Logging {
+
+  def run {
+    try {
+      info("Starting container.")
+
+      startMetrics
+      startOffsetManager
+      startLocalityManager
+      startStores
+      startDiskSpaceMonitor
+      startProducers
+      startTask
+      startConsumers
+      startSecurityManger
+
+      info("Entering run loop.")
+      runLoop.run
+    } catch {
+      case e: Exception =>
+        error("Caught exception in process loop.", e)
+        throw e
+    } finally {
+      info("Shutting down.")
+
+      shutdownConsumers
+      shutdownTask
+      shutdownStores
+      shutdownDiskSpaceMonitor
+      shutdownProducers
+      shutdownLocalityManager
+      shutdownOffsetManager
+      shutdownMetrics
+      shutdownSecurityManger
+
+      info("Shutdown complete.")
+    }
+  }
+
+  def startDiskSpaceMonitor: Unit = {
+    if (diskSpaceMonitor != null) {
+      info("Starting disk space monitor")
+      diskSpaceMonitor.start()
+    }
+  }
+
+  def startMetrics {
+    info("Registering task instances with metrics.")
+
+    taskInstances.values.foreach(_.registerMetrics)
+
+    info("Starting JVM metrics.")
+
+    if (jvm != null) {
+      jvm.start
+    }
+
+    info("Starting metrics reporters.")
+
+    reporters.values.foreach(reporter => {
+      reporter.register(metrics.source, metrics.registry)
+      reporter.start
+    })
+  }
+
+  def startOffsetManager {
+    info("Registering task instances with offsets.")
+
+    taskInstances.values.foreach(_.registerOffsets)
+
+    info("Starting offset manager.")
+
+    offsetManager.start
+  }
+
+  def startLocalityManager {
+    if(localityManager != null) {
+      info("Registering localityManager for the container")
+      localityManager.start
+      localityManager.register(String.valueOf(containerContext.id))
+
+      info("Writing container locality and JMX address to Coordinator Stream")
+      try {
+        val hostInet = Util.getLocalHost
+        val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
+        val jmxTunnelingUrl = if (jmxServer != null) 
jmxServer.getTunnelingJmxUrl else ""
+        localityManager.writeContainerToHostMapping(containerContext.id, 
hostInet.getHostName, jmxUrl, jmxTunnelingUrl)
+      } catch {
+        case uhe: UnknownHostException =>
+          warn("Received UnknownHostException when persisting locality info 
for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op
+        case unknownException: Throwable =>
+          warn("Received an exception when persisting locality info for 
container %d: %s" format (containerContext.id, unknownException.getMessage))
+      }
+    }
+  }
+
+  def startStores {
+    info("Starting task instance stores.")
+    taskInstances.values.foreach(taskInstance => {
+      val startTime = System.currentTimeMillis()
+      taskInstance.startStores
+      // Measuring the time to restore the stores
+      val timeToRestore = System.currentTimeMillis() - startTime
+      val taskGauge = 
metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null)
+      if (taskGauge != null) {
+        taskGauge.set(timeToRestore)
+      }
+    })
+  }
+
+  def startTask {
+    info("Initializing stream tasks.")
+
+    taskInstances.values.foreach(_.initTask)
+  }
+
+  def startProducers {
+    info("Registering task instances with producers.")
+
+    taskInstances.values.foreach(_.registerProducers)
+
+    info("Starting producer multiplexer.")
+
+    producerMultiplexer.start
+  }
+
+  def startConsumers {
+    info("Registering task instances with consumers.")
+
+    taskInstances.values.foreach(_.registerConsumers)
+
+    info("Starting consumer multiplexer.")
+
+    consumerMultiplexer.start
+  }
+
+  def startSecurityManger: Unit = {
+    if (securityManager != null) {
+      info("Starting security manager.")
+
+      securityManager.start
+    }
+  }
+
+  def shutdownConsumers {
+    info("Shutting down consumer multiplexer.")
+
+    consumerMultiplexer.stop
+  }
+
+  def shutdownProducers {
+    info("Shutting down producer multiplexer.")
+
+    producerMultiplexer.stop
+  }
+
+  def shutdownTask {
+    info("Shutting down task instance stream tasks.")
+
+    taskInstances.values.foreach(_.shutdownTask)
+  }
+
+  def shutdownStores {
+    info("Shutting down task instance stores.")
+
+    taskInstances.values.foreach(_.shutdownStores)
+  }
+
+  def shutdownLocalityManager {
+    if(localityManager != null) {
+      info("Shutting down locality manager.")
+      localityManager.stop
+    }
+  }
+
+  def shutdownOffsetManager {
+    info("Shutting down offset manager.")
+
+    offsetManager.stop
+  }
+
+
+  def shutdownMetrics {
+    info("Shutting down metrics reporters.")
+
+    reporters.values.foreach(_.stop)
+
+    if (jvm != null) {
+      info("Shutting down JVM metrics.")
+
+      jvm.stop
+    }
+  }
+
+  def shutdownSecurityManger: Unit = {
+    if (securityManager != null) {
+      info("Shutting down security manager.")
+
+      securityManager.stop
+    }
+  }
+
+  def shutdownDiskSpaceMonitor: Unit = {
+    if (diskSpaceMonitor != null) {
+      info("Shutting down disk space monitor.")
+      diskSpaceMonitor.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 9e6641c..2044ce0 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -38,21 +38,13 @@ class SamzaContainerMetrics(
   val processNs = newTimer("process-ns")
   val commitNs = newTimer("commit-ns")
   val utilization = newGauge("event-loop-utilization", 0.0F)
+  val diskUsageBytes = newGauge("disk-usage-bytes", 0L)
+  val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue)
+  val executorWorkFactor = newGauge("executor-work-factor", 1.0)
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new 
util.HashMap[TaskName, Gauge[Long]]()
 
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" 
format(taskName.toString, storeName), -1L))
   }
-
-  /**
-   * Creates or gets the disk usage gauge for the container and returns it.
-   */
-  def createOrGetDiskUsageGauge(): Gauge[Long] = {
-    // Despite the name, this function appears to be idempotent. A more 
defensive approach would be
-    // to ensure idempotency at this level, e.g. via a CAS operation. 
Unfortunately, it appears that
-    // the mechanism to register a Gauge is hidden. An alternative would be to 
use a mutex to
-    // set ensure the gauge is created once.
-    newGauge("disk-usage", 0L)
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index c77d929..84166b4 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -364,4 +364,27 @@ object Util extends Logging {
     info("use default serde %s for %s" format (serde, serdeName))
     serde
   }
+
+  /**
+   * Add the supplied arguments and handle overflow by clamping the resulting 
sum to
+   * {@code Long.MinValue} if the sum would have been less than {@code 
Long.MinValue} or
+   * {@code Long.MaxValue} if the sum would have been greater than {@code 
Long.MaxValue}.
+   *
+   * @param lhs left hand side of sum
+   * @param rhs right hand side of sum
+   * @return the sum if no overflow occurs, or the clamped extreme if it does.
+   */
+  def clampAdd(lhs: Long, rhs: Long): Long = {
+    val sum = lhs + rhs
+
+    // From "Hacker's Delight", overflow occurs IFF both operands have the 
same sign and the
+    // sign of the sum differs from the operands. Here we're doing a basic 
bitwise check that
+    // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will 
have the high-order
+    // bit set to true IFF the signs are different.
+    if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) {
+      return if (lhs >= 0) Long.MaxValue else Long.MinValue
+    }
+
+    sum
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java
 
b/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java
new file mode 100644
index 0000000..96c462b
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.disk;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestDiskQuotaPolicyEntry {
+  private static final double ARBITRARY_HIGH_WATER_MARK = 0.75;
+  private static final double ARBITRARY_LOW_WATER_MARK = 0.25;
+  private static final double ARBITRARY_WORK_FACTOR = 1.0;
+
+  @Test
+  public void testConstruction() {
+    final WatermarkDiskQuotaPolicy.Entry policy = new 
WatermarkDiskQuotaPolicy.Entry(
+        ARBITRARY_LOW_WATER_MARK,
+        ARBITRARY_HIGH_WATER_MARK,
+        ARBITRARY_WORK_FACTOR);
+
+    assertEquals(ARBITRARY_LOW_WATER_MARK, policy.getLowWaterMarkPercent());
+    assertEquals(ARBITRARY_HIGH_WATER_MARK, policy.getHighWaterMarkPercent());
+    assertEquals(ARBITRARY_WORK_FACTOR, policy.getWorkFactor());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLowWaterMarkGreaterThanHighWaterMark() {
+    new WatermarkDiskQuotaPolicy.Entry(
+        1.0,
+        ARBITRARY_HIGH_WATER_MARK,
+        ARBITRARY_WORK_FACTOR);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLowWaterMarkBelowZero() {
+    new WatermarkDiskQuotaPolicy.Entry(
+        -1.0,
+        ARBITRARY_HIGH_WATER_MARK,
+        ARBITRARY_WORK_FACTOR);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testHighWaterMarkAboveOne() {
+    new WatermarkDiskQuotaPolicy.Entry(
+        ARBITRARY_LOW_WATER_MARK,
+        2.0,
+        ARBITRARY_WORK_FACTOR);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWorkFactorOfZero() {
+    new WatermarkDiskQuotaPolicy.Entry(
+        ARBITRARY_LOW_WATER_MARK,
+        ARBITRARY_HIGH_WATER_MARK,
+        0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWorkFactorGreaterThanOne() {
+    new WatermarkDiskQuotaPolicy.Entry(
+        ARBITRARY_LOW_WATER_MARK,
+        ARBITRARY_HIGH_WATER_MARK,
+        2.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java
 
b/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java
new file mode 100644
index 0000000..8367d41
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.disk;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestWatermarkDiskQuotaPolicy {
+  @Test
+  public void testNoEntries() {
+    WatermarkDiskQuotaPolicy policy = new 
WatermarkDiskQuotaPolicy(Collections.<WatermarkDiskQuotaPolicy.Entry>emptyList());
+
+    assertEquals(1.0, policy.apply(1.0));
+    assertEquals(1.0, policy.apply(0.5));
+    assertEquals(1.0, policy.apply(0.0));
+  }
+
+  @Test
+  public void testOneEntryUntriggered() {
+    double workFactor = 0.5f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = 
Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 
workFactor));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(1.0, policy.apply(1.0));
+    assertEquals(1.0, policy.apply(0.75));
+    assertEquals(1.0, policy.apply(0.5));
+  }
+
+  @Test
+  public void testOneEntryTriggered() {
+    double workFactor = 0.5f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = 
Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 
workFactor));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(workFactor, policy.apply(0.25));
+  }
+
+  @Test
+  public void testTwoEntriesTriggered() {
+    double workFactor1 = 0.5f;
+    double workFactor2 = 0.25f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1),
+        new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(1.0, policy.apply(0.5));
+    assertEquals(workFactor1, policy.apply(0.4));
+    assertEquals(workFactor2, policy.apply(0.1));
+  }
+
+  @Test
+  public void testTwoEntriesTriggeredSkipFirst() {
+    double workFactor1 = 0.5f;
+    double workFactor2 = 0.25f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1),
+        new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(workFactor2, policy.apply(0.1));
+  }
+
+  @Test
+  public void testTwoEntriesReversedOrder() {
+    // Results should be the same regardless of order as we sort policies at 
construction time.
+    double workFactor1 = 0.5f;
+    double workFactor2 = 0.25f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2),
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(workFactor2, policy.apply(0.1));
+  }
+
+  @Test
+  public void testTriggerEntriesAndRecover() {
+    double workFactor1 = 0.5f;
+    double workFactor2 = 0.25f;
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1),
+        new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2));
+    WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries);
+
+    assertEquals(workFactor2, policy.apply(0.1));
+    assertEquals(workFactor1, policy.apply(0.4));
+    assertEquals(1.0, policy.apply(1.0));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWorkFactorTooHigh() {
+    new WatermarkDiskQuotaPolicy(
+        Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 
1.5)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWorkFactorTooLow() {
+    new WatermarkDiskQuotaPolicy(
+        Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 
-1.0)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testRaiseWorkFactorWithLowerThreshold() {
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5),
+        new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, 0.75));
+    new WatermarkDiskQuotaPolicy(entries);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicatedRange() {
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5),
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.75));
+    new WatermarkDiskQuotaPolicy(entries);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFullyOverlappedRange1() {
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5),
+        new WatermarkDiskQuotaPolicy.Entry(0.25, 1.0, 0.75));
+    new WatermarkDiskQuotaPolicy(entries);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFullyOverlappedRange2() {
+    List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList(
+        new WatermarkDiskQuotaPolicy.Entry(0.5, 0.75, 0.75),
+        new WatermarkDiskQuotaPolicy.Entry(0.25, 1.0, 0.5));
+    new WatermarkDiskQuotaPolicy(entries);
+  }
+}

Reply via email to