SAMZA-956 : Disk Quotas: Add throttler and disk quota enforcement
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/27c9e4c2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/27c9e4c2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/27c9e4c2 Branch: refs/heads/master Commit: 27c9e4c2ed114c12c040f449bfa6ed1af71bdb7d Parents: 312c1b1 Author: Chris Pettitt <cpett...@linkedin.com> Authored: Mon Jun 20 15:55:01 2016 -0700 Committer: Navina Ramesh <nram...@linkedin.com> Committed: Mon Jun 20 15:55:01 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 6 +- .../samza/container/disk/DiskQuotaPolicy.java | 32 + .../container/disk/DiskQuotaPolicyFactory.java | 34 + .../samza/container/disk/DiskSpaceMonitor.java | 3 + .../disk/NoThrottlingDiskQuotaPolicy.java | 39 + .../NoThrottlingDiskQuotaPolicyFactory.java | 32 + .../disk/WatermarkDiskQuotaPolicy.java | 216 +++++ .../disk/WatermarkDiskQuotaPolicyFactory.java | 111 +++ .../apache/samza/util/HighResolutionClock.java | 47 ++ .../samza/util/SystemHighResolutionClock.java | 41 + .../apache/samza/util/ThrottlingExecutor.java | 135 ++++ .../org/apache/samza/container/RunLoop.scala | 25 +- .../samza/container/SameThreadExecutor.scala | 26 + .../apache/samza/container/SamzaContainer.scala | 33 +- .../samza/container/SamzaContainer.scala.orig | 787 +++++++++++++++++++ .../samza/container/SamzaContainerMetrics.scala | 14 +- .../main/scala/org/apache/samza/util/Util.scala | 23 + .../disk/TestDiskQuotaPolicyEntry.java | 82 ++ .../disk/TestWatermarkDiskQuotaPolicy.java | 155 ++++ .../samza/util/TestThrottlingExecutor.java | 255 ++++++ .../scala/org/apache/samza/util/TestUtil.scala | 16 + 21 files changed, 2084 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7a09c7e..d0a5c66 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -115,7 +115,8 @@ <subpackage name="util"> <allow pkg="org.apache.samza.metrics" /> <allow pkg="org.apache.samza.system" /> - + <allow pkg="junit.framework" /> + <allow class="org.apache.samza.Partition" /> <allow class="org.apache.samza.SamzaException" /> <allow class="joptsimple.OptionSet" /> @@ -138,7 +139,10 @@ <allow pkg="org.apache.samza.config" /> <allow pkg="org.apache.samza.container" /> <allow pkg="org.apache.samza.coordinator.stream" /> + <allow pkg="org.apache.samza.util" /> + <allow pkg="junit.framework" /> <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" /> + <subpackage name="grouper"> <subpackage name="stream"> <allow pkg="org.apache.samza.system" /> http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java new file mode 100644 index 0000000..060f980 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicy.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +/** + * An object that produces a new work rate for the container based on the current percentage of + * available disk quota assigned to the container. + */ +public interface DiskQuotaPolicy { + /** + * Given the latest percentage of available disk quota, this method returns the work rate that + * should be applied to the container as a value in (0.0, 1.0]. + */ + double apply(double availableDiskQuotaPercentage); +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java new file mode 100644 index 0000000..00202a4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskQuotaPolicyFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.apache.samza.config.Config; + +/** + * A factory for creating a {@link DiskQuotaPolicy}. This interface is required due to Samza's + * string-only configuration. + */ +public interface DiskQuotaPolicyFactory { + /** + * Creates and returns a disk quota policy instance. + * @param config configuration that may apply to this factory + */ + DiskQuotaPolicy create(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java index 2a565be..162cabf 100644 --- a/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/disk/DiskSpaceMonitor.java @@ -51,6 +51,9 @@ public interface DiskSpaceMonitor { interface Listener { /** * Invoked with new samples as they become available. + * <p> + * Updates are guaranteed to be serialized. In other words, a listener's onUpdate callback + * may only be invoked once at a time by a DiskSpaceMonitor. * * @param diskUsageSample the measured disk usage size in bytes. */ http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java new file mode 100644 index 0000000..5cd54e0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicy.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link DiskQuotaPolicy} that does no throttling of the work rate. + */ +public class NoThrottlingDiskQuotaPolicy implements DiskQuotaPolicy { + private static final Logger log = LoggerFactory.getLogger(NoThrottlingDiskQuotaPolicy.class); + + public NoThrottlingDiskQuotaPolicy() { + log.info("Using a no throttling disk quota policy"); + } + + @Override + public double apply(double availableDiskQuotaPercentage) { + return 1.0; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java new file mode 100644 index 0000000..cad1aa8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/NoThrottlingDiskQuotaPolicyFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.apache.samza.config.Config; + +/** + * A factory that creates a {@link NoThrottlingDiskQuotaPolicy}. + */ +public class NoThrottlingDiskQuotaPolicyFactory implements DiskQuotaPolicyFactory { + @Override + public DiskQuotaPolicy create(Config config) { + return new NoThrottlingDiskQuotaPolicy(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java new file mode 100644 index 0000000..21fbca2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.apache.samza.util.ThrottlingExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * An object that calculates the current work rate using a configurable set of + * {@link Entry} instances. + * <p> + * The supplied {@link Entry} instances may overlap as long as the range + * between low and high water mark of one policy entry does not contain the range of low and high + * water mark of another policy entry. For example, the following entries would be OK: + * + * <ul> + * <li>Low: 0.5, High: 1.0</li> + * <li>Low:0.2, High: 0.8</li> + * </ul> + * + * But the following entries would not: + * + * <ul> + * <li>Low: 0.1, High: 1.0</li> + * <li>Low: 0.2, High: 0.5</li> + * </ul> + * + * Policy entries do not stack. In other words, there is always a clear entry to apply and its work + * factor is not added, multiplied, or in any other way joined with another entry's work rate. + */ +public class WatermarkDiskQuotaPolicy implements DiskQuotaPolicy { + /** + * A comparator that orders {@link Entry} instances in descending order first by + * high water mark and then by low water mark. + */ + private static final Comparator<Entry> POLICY_COMPARATOR = new Comparator<Entry>() { + @Override + public int compare(Entry lhs, Entry rhs) { + if (lhs.getHighWaterMarkPercent() > rhs.getHighWaterMarkPercent()) { + return -1; + } else if (lhs.getHighWaterMarkPercent() < rhs.getHighWaterMarkPercent()) { + return 1; + } else if (lhs.getLowWaterMarkPercent() > rhs.getLowWaterMarkPercent()) { + return -1; + } else if (lhs.getLowWaterMarkPercent() < rhs.getLowWaterMarkPercent()) { + return 1; + } + return 0; + } + }; + + private static final Logger log = LoggerFactory.getLogger(WatermarkDiskQuotaPolicy.class); + + // Lock guards entryIndex + private final List<Entry> entries; + private int entryIndex = -1; + + private static String dumpPolicyEntries(List<Entry> entries) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < entries.size(); ++i) { + final Entry entry = entries.get(i); + sb.append(String.format("\n Policy entry %d. Low: %f. High: %f. Work Factor: %f", + i, + entry.getLowWaterMarkPercent(), + entry.getHighWaterMarkPercent(), + entry.getWorkFactor())); + } + return sb.toString(); + } + + /** + * Constructs a new watermark disk quota policy using the supplied policy entries. + * + * @param entries a list of {@link Entry} objects that describe how to adjust the work rate. May + * be empty, but cannot be {@code null}. + */ + public WatermarkDiskQuotaPolicy(List<Entry> entries) { + // Copy entries, sort, make immutable + entries = new ArrayList<>(entries); + Collections.sort(entries, POLICY_COMPARATOR); + this.entries = Collections.unmodifiableList(entries); + + // Validate entries + double lastHighWaterMark = 1.0; + double lastWorkFactor = ThrottlingExecutor.MAX_WORK_FACTOR; + for (int i = 0; i < entries.size(); ++i) { + final Entry entry = entries.get(i); + + if (lastHighWaterMark < entry.getHighWaterMarkPercent()) { + throw new IllegalArgumentException("Policy entry " + i + + " has high water mark (" + entry.getHighWaterMarkPercent() + + ") > previous high water mark (" + lastHighWaterMark + "):" + + dumpPolicyEntries(entries)); + } + + if (lastWorkFactor < entry.getWorkFactor()) { + throw new IllegalArgumentException("Policy entry " + i + + " has work factor (" + entry.getWorkFactor() + + ") < previous work factor (" + lastWorkFactor + "):" + + dumpPolicyEntries(entries)); + } + + if (entry.getWorkFactor() < ThrottlingExecutor.MIN_WORK_FACTOR) { + throw new IllegalArgumentException("Policy entry " + i + + " has work factor (" + entry.getWorkFactor() + + ") < minimum work factor (" + ThrottlingExecutor.MIN_WORK_FACTOR + "):" + + dumpPolicyEntries(entries)); + } + + lastHighWaterMark = entry.getHighWaterMarkPercent(); + lastWorkFactor = entry.getWorkFactor(); + } + + log.info("Using the following disk quota enforcement entries: {}", + entries.isEmpty() ? "NONE" : dumpPolicyEntries(entries)); + } + + @Override + public double apply(double availableDiskQuotaPercentage) { + double workFactor = entryIndex == -1 ? 1.0 : this.entries.get(entryIndex).getWorkFactor(); + int entryIndex = this.entryIndex; + + while (entryIndex >= 0 && + entries.get(entryIndex).getHighWaterMarkPercent() <= availableDiskQuotaPercentage) { + --entryIndex; + } + + while (entryIndex < entries.size() - 1 && + entries.get(entryIndex + 1).getLowWaterMarkPercent() > availableDiskQuotaPercentage) { + ++entryIndex; + } + + if (entryIndex != this.entryIndex) { + workFactor = entryIndex == -1 ? 1.0 : entries.get(entryIndex).getWorkFactor(); + this.entryIndex = entryIndex; + log.info("Work factor has been updated: {}.", workFactor); + } + + return workFactor; + } + + /** + * A thread-safe value object that represents a policy entry for disk quotas. When the percentage + * of available disk space assigned via the disk quota drops below the low water mark the + * configured work rate is applied to reduce throughput of the task. When the available disk space + * rises above the high water mark the work rate throttling is removed (but other entries may + * still apply). + */ + public static class Entry { + private final double lowWaterMarkPercent; + private final double highWaterMarkPercent; + private final double workFactor; + + public Entry(double lowWaterMarkPercent, double highWaterMarkPercent, double workFactor) { + if (lowWaterMarkPercent < 0.0) { + throw new IllegalArgumentException("low water mark percent (" + lowWaterMarkPercent + ") < 0"); + } + + if (highWaterMarkPercent > 1.0) { + throw new IllegalArgumentException("high water mark percent (" + highWaterMarkPercent + ") > 1"); + } + + if (lowWaterMarkPercent > highWaterMarkPercent) { + throw new IllegalArgumentException("low water mark percent (" + lowWaterMarkPercent + ") > " + + "high water mark percent (" + highWaterMarkPercent + ")"); + } + + if (workFactor <= 0.0) { + throw new IllegalArgumentException("work factor (" + workFactor + ") <= 0"); + } + + if (workFactor > 1.0) { + throw new IllegalArgumentException("work factor (" + workFactor + ") > 1"); + } + + this.lowWaterMarkPercent = lowWaterMarkPercent; + this.highWaterMarkPercent = highWaterMarkPercent; + this.workFactor = workFactor; + } + + public double getLowWaterMarkPercent() { + return lowWaterMarkPercent; + } + + public double getHighWaterMarkPercent() { + return highWaterMarkPercent; + } + + public double getWorkFactor() { + return workFactor; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java new file mode 100644 index 0000000..42e2d3b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicyFactory.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.apache.samza.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * A factory for producing {@link WatermarkDiskQuotaPolicy} instances. This class is only required + * due to Samza's string-only config. In the event that a richer config system is added it would + * be possible to construct a {@link WatermarkDiskQuotaPolicy} directly. + * <p> + * <table> + * <tr><th>Name</th><th>Default</th><th>Description</th></tr> + * <tr> + * <td>container.disk.quota.policy.count</td> + * <td>0</td> + * <td> + * The number of entries for this policy. Entries are configured with the following keys by + * substituting a 0-based index for each policy. For example, to configure the low water mark + * for the first policy to 0.5, use <code>container.disk.quota.policy.0.lowWaterMark=0.5</code>. + * Setting this value to 0 disables this policy. + * </td> + * </tr> + * <tr> + * <td>container.disk.quota.policy.entry-number.lowWaterMark</td> + * <td></td> + * <td> + * <strong>Required.</strong> + * The low water mark for this entry. If the available percentage of disk quota drops below + * this low water mark then the work factor for this entry is used unless the available + * percentage drops below an entry with an even lower low water mark. For example, if the low + * water mark has a value of <code>0.5</code> and the available percentage of disk quota drops + * below 50% then the work factor for this entry is used. However, if there were another entry + * that had a low water mark of <code>0.4</code> and the disk quota dropped below 40% then + * the other entry's work factor would be used instead. + * </td> + * </tr> + * <tr> + * <td>container.disk.quota.policy.entry-number.highWaterMark</td> + * <td></td> + * <td> + * <strong>Required.</strong> + * The high water mark for this entry. If the available percentage of disk quota rises above + * this high water mark then the work factor for this entry is not longer used and the next + * highest work factor is used instead. If there is no other higher work factor then the work + * factor resets to <code>1.0</code>. + * </td> + * </tr> + * <tr> + * <td>container.disk.quota.policy.entry-number.workFactor</td> + * <td></td> + * <td> + * <strong>Required.</strong> + * The work factor to apply when this entry is triggered (i.e. due to the available percentage + * of disk quota dropping below this entry's low water mark). The work factor is a hint at + * the percentage of time that the run loop should be used to process new work. For example, + * a value of <code>1.0</code> indicates that the run loop should be working at full rate + * (i.e. as fast as it can). A value of <code>0.5</code> means that the run loop should only + * run half as fast as it can. A value of <code>0.2</code> means that the run loop should only + * run at a 20% of its usual speed. + * </td> + * </tr> + * </table> + */ +public class WatermarkDiskQuotaPolicyFactory implements DiskQuotaPolicyFactory { + private static final Logger log = LoggerFactory.getLogger(WatermarkDiskQuotaPolicyFactory.class); + + private static final String POLICY_COUNT_KEY = "container.disk.quota.policy.count"; + + @Override + public DiskQuotaPolicy create(Config config) { + final int entryCount = config.getInt(POLICY_COUNT_KEY, 0); + if (entryCount == 0) { + log.info("Using a no throttling disk quota policy because policy entry count was missing or set to zero ({})", + POLICY_COUNT_KEY); + return new NoThrottlingDiskQuotaPolicy(); + } + + final List<WatermarkDiskQuotaPolicy.Entry> entries = new ArrayList<WatermarkDiskQuotaPolicy.Entry>(); + for (int i = 0; i < entryCount; ++i) { + final double lowWaterMark = config.getDouble(String.format("container.disk.quota.policy.%d.lowWaterMark", i)); + final double highWaterMark = config.getDouble(String.format("container.disk.quota.policy.%d.highWaterMark", i)); + final double workFactor = config.getDouble(String.format("container.disk.quota.policy.%d.workFactor", i)); + entries.add(new WatermarkDiskQuotaPolicy.Entry(lowWaterMark, highWaterMark, workFactor)); + } + + return new WatermarkDiskQuotaPolicy(entries); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java new file mode 100644 index 0000000..69ba441 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +/** + * An object that can provide time points (useful for getting the elapsed time between two time + * points) and can sleep for a specified period of time. + * <p> + * Instances of this interface must be thread-safe. + */ +interface HighResolutionClock { + /** + * Returns a time point that can be used to calculate the difference in nanoseconds with another + * time point. Resolution of the timer is platform dependent and not guaranteed to actually + * operate at nanosecond precision. + * + * @return current time point in nanoseconds + */ + long nanoTime(); + + /** + * Sleeps for a period of time that approximates the requested number of nanoseconds. Actual sleep + * time can vary significantly based on the JVM implementation and platform. This function returns + * the measured error between expected and actual sleep time. + * + * @param nanos the number of nanoseconds to sleep. + * @throws InterruptedException if the current thread is interrupted while blocked in this method. + */ + long sleep(long nanos) throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java new file mode 100644 index 0000000..2e65b60 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.concurrent.TimeUnit; + +class SystemHighResolutionClock implements HighResolutionClock { + @Override + public long nanoTime() { + return System.nanoTime(); + } + + @Override + public long sleep(long nanos) throws InterruptedException { + if (nanos <= 0) { + return nanos; + } + + final long start = System.nanoTime(); + TimeUnit.NANOSECONDS.sleep(nanos); + + return Util.clampAdd(nanos, -(System.nanoTime() - start)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java new file mode 100644 index 0000000..214cefd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.concurrent.Executor; + +/** + * An object that performs work on the current thread and optionally slows the rate of execution. + * By default work submitted with {@link #execute(Runnable)} will not be throttled. Work can be + * throttled by invoking {@link #setWorkFactor(double)}. + * <p> + * This class is *NOT* thread-safe. It is intended to be used from a single thread. However, the + * work factor may be set from any thread. + */ +public class ThrottlingExecutor implements Executor { + public static final double MAX_WORK_FACTOR = 1.0; + public static final double MIN_WORK_FACTOR = 0.001; + + private final HighResolutionClock clock; + + private volatile double workToIdleFactor; + private long pendingNanos; + + public ThrottlingExecutor() { + this(new SystemHighResolutionClock()); + } + + ThrottlingExecutor(HighResolutionClock clock) { + this.clock = clock; + } + + /** + * Executes the given command on the current thread. If throttling is enabled (the work factor + * is less than 1.0) this command may optionally insert a delay before returning to satisfy the + * requested work factor. + * <p> + * This method will not operate correct if used by more than one thread. + * + * @param command the work to execute + */ + public void execute(Runnable command) { + final double currentWorkToIdleFactor = workToIdleFactor; + + // If we're not throttling, do not get clock time, etc. This substantially reduces the overhead + // per invocation of this feature (by ~75%). + if (currentWorkToIdleFactor == 0.0) { + command.run(); + } else { + final long startWorkNanos = clock.nanoTime(); + command.run(); + final long workNanos = clock.nanoTime() - startWorkNanos; + + // NOTE: we accumulate pending delay nanos here, but we later update the pending nanos during + // the sleep operation (if applicable), so they do not continue to grow. + pendingNanos = Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)); + if (pendingNanos > 0) { + try { + pendingNanos = clock.sleep(pendingNanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + /** + * Sets the work factor for this executor. A work factor of {@code 1.0} indicates that execution + * should proceed at full throughput. A work factor of less than {@code 1.0} will introduce + * delays into the {@link #execute(Runnable)} call to approximate the requested work factor. For + * example, if the work factor is {@code 0.7} then approximately 70% of the execute call will be + * spent executing the supplied command while 30% will be spent idle. + * + * @param workFactor the work factor to set for this executor. + */ + public void setWorkFactor(double workFactor) { + if (workFactor < MIN_WORK_FACTOR) { + throw new IllegalArgumentException("Work factor must be >= " + MIN_WORK_FACTOR); + } + if (workFactor > MAX_WORK_FACTOR) { + throw new IllegalArgumentException("Work factor must be <= " + MAX_WORK_FACTOR); + } + + workToIdleFactor = (1.0 - workFactor) / workFactor; + } + + /** + * Returns the current work factor in use. + * @see #setWorkFactor(double) + * @return the current work factor. + */ + public double getWorkFactor() { + return 1.0 / (workToIdleFactor + 1.0); + } + + /** + * Returns the total amount of delay (in nanoseconds) that needs to be applied to subsequent work. + * Alternatively this can be thought to capture the error between expected delay and actual + * applied delay. This accounts for variance in the precision of the clock and the delay + * mechanism, both of which may vary from platform to platform. + * <p> + * This is required for test purposes only. + * + * @return the total amount of delay (in nanoseconds) that needs to be applied to subsequent work. + */ + long getPendingNanos() { + return pendingNanos; + } + + /** + * A convenience method for test that allows the pending nanos for this executor to be set + * explicitly. + * + * @param pendingNanos the pending nanos to set. + */ + void setPendingNanos(long pendingNanos) { + this.pendingNanos = pendingNanos; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 3f25eca..cf05c15 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -19,6 +19,8 @@ package org.apache.samza.container +import java.util.concurrent.Executor + import org.apache.samza.system.{SystemConsumers, SystemStreamPartition} import org.apache.samza.task.ReadableCoordinator import org.apache.samza.util.{Logging, TimerUtils} @@ -39,7 +41,8 @@ class RunLoop( val windowMs: Long = -1, val commitMs: Long = 60000, val clock: () => Long = { System.nanoTime }, - val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging { + val shutdownMs: Long = 5000, + val executor: Executor = new SameThreadExecutor()) extends Runnable with TimerUtils with Logging { private val metricsMsOffset = 1000000L private var lastWindowNs = clock() @@ -69,14 +72,20 @@ class RunLoop( def run { addShutdownHook(Thread.currentThread()) + val runTask = new Runnable() { + override def run(): Unit = { + val loopStartTime = clock() + process + window + commit + val totalNs = clock() - loopStartTime + metrics.utilization.set(activeNs.toFloat / totalNs) + activeNs = 0L + } + } + while (!shutdownNow) { - val loopStartTime = clock() - process - window - commit - val totalNs = clock() - loopStartTime - metrics.utilization.set(activeNs.toFloat/totalNs) - activeNs = 0L + executor.execute(runTask) } } http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala b/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala new file mode 100644 index 0000000..29f737e --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/SameThreadExecutor.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container + +import java.util.concurrent.Executor + +private[container] class SameThreadExecutor() extends Executor { + override def execute(command: Runnable): Unit = command.run() +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 2e8a500..5cbdb4b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -32,7 +32,8 @@ import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.container.disk.DiskSpaceMonitor.Listener -import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor} +import org.apache.samza.container.disk.WatermarkDiskQuotaPolicy.Entry +import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, DiskQuotaPolicyFactory, NoThrottlingDiskQuotaPolicy, WatermarkDiskQuotaPolicy, PollingScanDiskSpaceMonitor, DiskSpaceMonitor} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.JvmMetrics @@ -56,7 +57,7 @@ import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util} +import org.apache.samza.util.{ThrottlingExecutor, ExponentialSleepStrategy, Logging, Util} import scala.collection.JavaConversions._ import java.net.{UnknownHostException, InetAddress, URL} import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} @@ -66,6 +67,7 @@ import java.lang.Thread.UncaughtExceptionHandler object SamzaContainer extends Logging { val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 + val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms" def main(args: Array[String]) { safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1))) @@ -520,18 +522,32 @@ object SamzaContainer extends Logging { (taskName, taskInstance) }).toMap - val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0) + val executor = new ThrottlingExecutor() + + val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue) + samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes) + + val diskQuotaPolicyFactoryString = config.get("container.disk.quota.policy.factory", + classOf[NoThrottlingDiskQuotaPolicyFactory].getName) + val diskQuotaPolicyFactory = Util.getObj[DiskQuotaPolicyFactory](diskQuotaPolicyFactoryString) + val diskQuotaPolicy = diskQuotaPolicyFactory.create(config) + var diskSpaceMonitor: DiskSpaceMonitor = null + val diskPollMillis = config.getInt(DISK_POLL_INTERVAL_KEY, 0) if (diskPollMillis != 0) { - val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge() - diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis) diskSpaceMonitor.registerListener(new Listener { - override def onUpdate(diskUsageSample: Long): Unit = - diskUsage.set(diskUsageSample) + override def onUpdate(diskUsageBytes: Long): Unit = { + val newWorkRate = diskQuotaPolicy.apply(1.0 - (diskUsageBytes.toDouble / diskQuotaBytes)) + executor.setWorkFactor(newWorkRate) + samzaContainerMetrics.executorWorkFactor.set(executor.getWorkFactor) + samzaContainerMetrics.diskUsageBytes.set(diskUsageBytes) + } }) info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths) + } else { + info(s"Disk quotas disabled because polling interval is not set ($DISK_POLL_INTERVAL_KEY)") } val runLoop = new RunLoop( @@ -540,7 +556,8 @@ object SamzaContainer extends Logging { metrics = samzaContainerMetrics, windowMs = taskWindowMs, commitMs = taskCommitMs, - shutdownMs = taskShutdownMs) + shutdownMs = taskShutdownMs, + executor = executor) info("Samza container setup complete.") http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig new file mode 100644 index 0000000..086531e --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container + +import java.io.File +import java.nio.file.Path +import java.util +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} +import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.config.SerializerConfig.Config2Serializer +import org.apache.samza.config.ShellCommandConfig +import org.apache.samza.config.StorageConfig.Config2Storage +import org.apache.samza.config.StreamConfig.Config2Stream +import org.apache.samza.config.SystemConfig.Config2System +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.container.disk.DiskSpaceMonitor.Listener +import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor} +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory +import org.apache.samza.metrics.JmxServer +import org.apache.samza.metrics.JvmMetrics +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.metrics.MetricsReporter +import org.apache.samza.metrics.MetricsReporterFactory +import org.apache.samza.serializers.SerdeFactory +import org.apache.samza.serializers.SerdeManager +import org.apache.samza.storage.StorageEngineFactory +import org.apache.samza.storage.TaskStorageManager +import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.SystemConsumersMetrics +import org.apache.samza.system.SystemFactory +import org.apache.samza.system.SystemProducers +import org.apache.samza.system.SystemProducersMetrics +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.chooser.DefaultChooser +import org.apache.samza.system.chooser.MessageChooserFactory +import org.apache.samza.system.chooser.RoundRobinChooserFactory +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util} +import scala.collection.JavaConversions._ +import java.net.{UnknownHostException, InetAddress, URL} +import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} +import org.apache.samza.serializers.model.SamzaObjectMapper +import org.apache.samza.config.JobConfig.Config2Job +import java.lang.Thread.UncaughtExceptionHandler + +object SamzaContainer extends Logging { + val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 + + def main(args: Array[String]) { + safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1))) + } + + def safeMain( + newJmxServer: () => JmxServer, + exceptionHandler: UncaughtExceptionHandler = null) { + if (exceptionHandler != null) { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID)) + // Break out the main method to make the JmxServer injectable so we can + // validate that we don't leak JMX non-daemon threads if we have an + // exception in the main method. + val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt + logger.info("Got container ID: %s" format containerId) + val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) + logger.info("Got coordinator URL: %s" format coordinatorUrl) + val jobModel = readJobModel(coordinatorUrl) + val containerModel = jobModel.getContainers()(containerId.toInt) + val config = jobModel.getConfig + putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) + putMDC("jobId", config.getJobId.getOrElse("1")) + var jmxServer: JmxServer = null + + try { + jmxServer = newJmxServer() + SamzaContainer(containerModel, jobModel, jmxServer).run + } finally { + if (jmxServer != null) { + jmxServer.stop + } + } + } + + /** + * Fetches config, task:SSP assignments, and task:changelog partition + * assignments, and returns objects to be used for SamzaContainer's + * constructor. + */ + def readJobModel(url: String, initialDelayMs: Int = scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS) + 1) = { + info("Fetching configuration from: %s" format url) + SamzaObjectMapper + .getObjectMapper + .readValue( + Util.read( + url = new URL(url), + retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)), + classOf[JobModel]) + } + + def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { + val config = jobModel.getConfig + val containerId = containerModel.getContainerId + val containerName = "samza-container-%s" format containerId + val containerPID = Util.getContainerPID + + info("Setting up Samza container: %s" format containerName) + info("Samza container PID: %s" format containerPID) + info("Using configuration: %s" format config) + info("Using container model: %s" format containerModel) + + val registry = new MetricsRegistryMap(containerName) + val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry) + val systemProducersMetrics = new SystemProducersMetrics(registry) + val systemConsumersMetrics = new SystemConsumersMetrics(registry) + val offsetManagerMetrics = new OffsetManagerMetrics(registry) + + val inputSystemStreamPartitions = containerModel + .getTasks + .values + .flatMap(_.getSystemStreamPartitions) + .toSet + + val inputSystemStreams = inputSystemStreamPartitions + .map(_.getSystemStream) + .toSet + + val inputSystems = inputSystemStreams + .map(_.getSystem) + .toSet + + val systemNames = config.getSystemNames + + info("Got system names: %s" format systemNames) + + val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_)) + + debug("Got serde streams: %s" format serdeStreams) + + val serdeNames = config.getSerdeNames + + info("Got serde names: %s" format serdeNames) + + val systemFactories = systemNames.map(systemName => { + val systemFactoryClassName = config + .getSystemFactory(systemName) + .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) + (systemName, Util.getObj[SystemFactory](systemFactoryClassName)) + }).toMap + + val systemAdmins = systemNames + .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config))) + .toMap + + info("Got system factories: %s" format systemFactories.keys) + + val streamMetadataCache = new StreamMetadataCache(systemAdmins) + val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams) + + info("Got input stream metadata: %s" format inputStreamMetadata) + + val consumers = inputSystems + .map(systemName => { + val systemFactory = systemFactories(systemName) + + try { + (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry)) + } catch { + case e: Exception => + error("Failed to create a consumer for %s, so skipping." format(systemName), e) + (systemName, null) + } + }) + .filter(_._2 != null) + .toMap + + info("Got system consumers: %s" format consumers.keys) + + val producers = systemFactories + .map { + case (systemName, systemFactory) => + try { + (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry)) + } catch { + case e: Exception => + error("Failed to create a producer for %s, so skipping." format(systemName), e) + (systemName, null) + } + } + .filter(_._2 != null) + .toMap + + info("Got system producers: %s" format producers.keys) + + val serdes = serdeNames.map(serdeName => { + val serdeClassName = config + .getSerdeClass(serdeName) + .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName)) + + val serde = Util.getObj[SerdeFactory[Object]](serdeClassName) + .getSerde(serdeName, config) + + (serdeName, serde) + }).toMap + + info("Got serdes: %s" format serdes.keys) + + /* + * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined in the config. This is useful to build both key and message serde maps. + */ + val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => { + systemNames + .filter( sn => { + val serde = getSerdeName(sn) + serde.isDefined && !serde.get.equals("") + }).map(systemName => { + val serdeName = getSerdeName(systemName).get + val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) + (systemName, serde) + }).toMap + } + + /* + * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps. + */ + val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => { + (serdeStreams ++ inputSystemStreamPartitions) + .filter(systemStream => getSerdeName(systemStream).isDefined) + .map(systemStream => { + val serdeName = getSerdeName(systemStream).get + val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) + (systemStream, serde) + }).toMap + } + + val systemKeySerdes = buildSystemSerdeMap((systemName: String) => config.getSystemKeySerde(systemName)) + + debug("Got system key serdes: %s" format systemKeySerdes) + + val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => config.getSystemMsgSerde(systemName)) + + debug("Got system message serdes: %s" format systemMessageSerdes) + + val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamKeySerde(systemStream)) + + debug("Got system stream key serdes: %s" format systemStreamKeySerdes) + + val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamMsgSerde(systemStream)) + + debug("Got system stream message serdes: %s" format systemStreamMessageSerdes) + + val changeLogSystemStreams = config + .getStoreNames + .filter(config.getChangelogStream(_).isDefined) + .map(name => (name, config.getChangelogStream(name).get)).toMap + .mapValues(Util.getSystemStreamFromNames(_)) + + info("Got change log system streams: %s" format changeLogSystemStreams) + + val serdeManager = new SerdeManager( + serdes = serdes, + systemKeySerdes = systemKeySerdes, + systemMessageSerdes = systemMessageSerdes, + systemStreamKeySerdes = systemStreamKeySerdes, + systemStreamMessageSerdes = systemStreamMessageSerdes, + changeLogSystemStreams = changeLogSystemStreams.values.toSet) + + info("Setting up JVM metrics.") + + val jvm = new JvmMetrics(samzaContainerMetrics.registry) + + info("Setting up message chooser.") + + val chooserFactoryClassName = config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName) + + val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName) + + val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry) + + info("Setting up metrics reporters.") + + val reporters = config.getMetricReporterNames.map(reporterName => { + val metricsFactoryClassName = config + .getMetricsFactoryClass(reporterName) + .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName)) + + val reporter = + Util + .getObj[MetricsReporterFactory](metricsFactoryClassName) + .getMetricsReporter(reporterName, containerName, config) + (reporterName, reporter) + }).toMap + + info("Got metrics reporters: %s" format reporters.keys) + + val securityManager = config.getSecurityManagerFactory match { + case Some(securityManagerFactoryClassName) => + Util + .getObj[SecurityManagerFactory](securityManagerFactoryClassName) + .getSecurityManager(config) + case _ => null + } + info("Got security manager: %s" format securityManager) + + val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) + val localityManager = new LocalityManager(coordinatorSystemProducer) + val checkpointManager = config.getCheckpointManagerFactory() match { + case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) => + Util + .getObj[CheckpointManagerFactory](checkpointFactoryClassName) + .getCheckpointManager(config, samzaContainerMetrics.registry) + case _ => null + } + info("Got checkpoint manager: %s" format checkpointManager) + + val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics) + + info("Got offset manager: %s" format offsetManager) + + val dropDeserializationError = config.getDropDeserialization match { + case Some(dropError) => dropError.toBoolean + case _ => false + } + + val dropSerializationError = config.getDropSerialization match { + case Some(dropError) => dropError.toBoolean + case _ => false + } + + val pollIntervalMs = config + .getPollIntervalMs + .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString) + .toInt + + val consumerMultiplexer = new SystemConsumers( + chooser = chooser, + consumers = consumers, + serdeManager = serdeManager, + metrics = systemConsumersMetrics, + dropDeserializationError = dropDeserializationError, + pollIntervalMs = pollIntervalMs) + + val producerMultiplexer = new SystemProducers( + producers = producers, + serdeManager = serdeManager, + metrics = systemProducersMetrics, + dropSerializationError = dropSerializationError) + + val storageEngineFactories = config + .getStoreNames + .map(storeName => { + val storageFactoryClassName = config + .getStorageFactoryClassName(storeName) + .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) + (storeName, Util.getObj[StorageEngineFactory[Object, Object]](storageFactoryClassName)) + }).toMap + + info("Got storage engines: %s" format storageEngineFactories.keys) + + val taskClassName = config + .getTaskClass + .getOrElse(throw new SamzaException("No task class defined in configuration.")) + + info("Got stream task class: %s" format taskClassName) + + val taskWindowMs = config.getWindowMs.getOrElse(-1L) + + info("Got window milliseconds: %s" format taskWindowMs) + + val taskCommitMs = config.getCommitMs.getOrElse(60000L) + + info("Got commit milliseconds: %s" format taskCommitMs) + + val taskShutdownMs = config.getShutdownMs.getOrElse(5000L) + + info("Got shutdown timeout milliseconds: %s" format taskShutdownMs) + + // Wire up all task-instance-level (unshared) objects. + + val taskNames = containerModel + .getTasks + .values + .map(_.getTaskName) + .toSet + val containerContext = new SamzaContainerContext(containerId, config, taskNames) + + // TODO not sure how we should make this config based, or not. Kind of + // strange, since it has some dynamic directories when used with YARN. + val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") + info("Got default storage engine base directory: %s" format defaultStoreBaseDir) + + val storeWatchPaths = new util.HashSet[Path]() + storeWatchPaths.add(defaultStoreBaseDir.toPath) + + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { + debug("Setting up task instance: %s" format taskModel) + + val taskName = taskModel.getTaskName + + val task = Util.getObj[StreamTask](taskClassName) + + val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName) + + val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics) + + val storeConsumers = changeLogSystemStreams + .map { + case (storeName, changeLogSystemStream) => + val systemConsumer = systemFactories + .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName))) + .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry) + samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName) + (storeName, systemConsumer) + }.toMap + + info("Got store consumers: %s" format storeConsumers) + + var loggedStorageBaseDir: File = null + if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { + val jobNameAndId = Util.getJobNameAndId(config) + loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) + } else { + warn("No override was provided for logged store base directory. This disables local state re-use on " + + "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + + "variable in all machines running the Samza container") + loggedStorageBaseDir = defaultStoreBaseDir + } + + storeWatchPaths.add(loggedStorageBaseDir.toPath) + + info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) + + val taskStores = storageEngineFactories + .map { + case (storeName, storageEngineFactory) => + val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) { + new SystemStreamPartition(changeLogSystemStreams(storeName), taskModel.getChangelogPartition) + } else { + null + } + val keySerde = config.getStorageKeySerde(storeName) match { + case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("No class defined for serde: %s." format keySerde)) + case _ => null + } + val msgSerde = config.getStorageMsgSerde(storeName) match { + case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde)) + case _ => null + } + val storeBaseDir = if(changeLogSystemStreamPartition != null) { + TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) + } + else { + TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) + } + val storageEngine = storageEngineFactory.getStorageEngine( + storeName, + storeBaseDir, + keySerde, + msgSerde, + collector, + taskInstanceMetrics.registry, + changeLogSystemStreamPartition, + containerContext) + (storeName, storageEngine) + } + + info("Got task stores: %s" format taskStores) + + val storageManager = new TaskStorageManager( + taskName = taskName, + taskStores = taskStores, + storeConsumers = storeConsumers, + changeLogSystemStreams = changeLogSystemStreams, + jobModel.maxChangeLogStreamPartitions, + streamMetadataCache = streamMetadataCache, + storeBaseDir = defaultStoreBaseDir, + loggedStoreBaseDir = loggedStorageBaseDir, + partition = taskModel.getChangelogPartition, + systemAdmins = systemAdmins) + + val systemStreamPartitions = taskModel + .getSystemStreamPartitions + .toSet + + info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) + + val taskInstance = new TaskInstance( + task = task, + taskName = taskName, + config = config, + metrics = taskInstanceMetrics, + systemAdmins = systemAdmins, + consumerMultiplexer = consumerMultiplexer, + collector = collector, + containerContext = containerContext, + offsetManager = offsetManager, + storageManager = storageManager, + reporters = reporters, + systemStreamPartitions = systemStreamPartitions, + exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config)) + + (taskName, taskInstance) + }).toMap + + val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0) + var diskSpaceMonitor: DiskSpaceMonitor = null + if (diskPollMillis != 0) { + val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge() + + diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis) + diskSpaceMonitor.registerListener(new Listener { + override def onUpdate(diskUsageSample: Long): Unit = + diskUsage.set(diskUsageSample) + }) + + info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths) + } + + val runLoop = new RunLoop( + taskInstances = taskInstances, + consumerMultiplexer = consumerMultiplexer, + metrics = samzaContainerMetrics, + windowMs = taskWindowMs, + commitMs = taskCommitMs, + shutdownMs = taskShutdownMs) + + info("Samza container setup complete.") + + new SamzaContainer( + containerContext = containerContext, + taskInstances = taskInstances, + runLoop = runLoop, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + offsetManager = offsetManager, + localityManager = localityManager, + securityManager = securityManager, + metrics = samzaContainerMetrics, + reporters = reporters, + jvm = jvm, + jmxServer = jmxServer, + diskSpaceMonitor = diskSpaceMonitor) + } +} + +class SamzaContainer( + containerContext: SamzaContainerContext, + taskInstances: Map[TaskName, TaskInstance], + runLoop: RunLoop, + consumerMultiplexer: SystemConsumers, + producerMultiplexer: SystemProducers, + metrics: SamzaContainerMetrics, + jmxServer: JmxServer, + diskSpaceMonitor: DiskSpaceMonitor = null, + offsetManager: OffsetManager = new OffsetManager, + localityManager: LocalityManager = null, + securityManager: SecurityManager = null, + reporters: Map[String, MetricsReporter] = Map(), + jvm: JvmMetrics = null) extends Runnable with Logging { + + def run { + try { + info("Starting container.") + + startMetrics + startOffsetManager + startLocalityManager + startStores + startDiskSpaceMonitor + startProducers + startTask + startConsumers + startSecurityManger + + info("Entering run loop.") + runLoop.run + } catch { + case e: Exception => + error("Caught exception in process loop.", e) + throw e + } finally { + info("Shutting down.") + + shutdownConsumers + shutdownTask + shutdownStores + shutdownDiskSpaceMonitor + shutdownProducers + shutdownLocalityManager + shutdownOffsetManager + shutdownMetrics + shutdownSecurityManger + + info("Shutdown complete.") + } + } + + def startDiskSpaceMonitor: Unit = { + if (diskSpaceMonitor != null) { + info("Starting disk space monitor") + diskSpaceMonitor.start() + } + } + + def startMetrics { + info("Registering task instances with metrics.") + + taskInstances.values.foreach(_.registerMetrics) + + info("Starting JVM metrics.") + + if (jvm != null) { + jvm.start + } + + info("Starting metrics reporters.") + + reporters.values.foreach(reporter => { + reporter.register(metrics.source, metrics.registry) + reporter.start + }) + } + + def startOffsetManager { + info("Registering task instances with offsets.") + + taskInstances.values.foreach(_.registerOffsets) + + info("Starting offset manager.") + + offsetManager.start + } + + def startLocalityManager { + if(localityManager != null) { + info("Registering localityManager for the container") + localityManager.start + localityManager.register(String.valueOf(containerContext.id)) + + info("Writing container locality and JMX address to Coordinator Stream") + try { + val hostInet = Util.getLocalHost + val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else "" + val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else "" + localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName, jmxUrl, jmxTunnelingUrl) + } catch { + case uhe: UnknownHostException => + warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op + case unknownException: Throwable => + warn("Received an exception when persisting locality info for container %d: %s" format (containerContext.id, unknownException.getMessage)) + } + } + } + + def startStores { + info("Starting task instance stores.") + taskInstances.values.foreach(taskInstance => { + val startTime = System.currentTimeMillis() + taskInstance.startStores + // Measuring the time to restore the stores + val timeToRestore = System.currentTimeMillis() - startTime + val taskGauge = metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null) + if (taskGauge != null) { + taskGauge.set(timeToRestore) + } + }) + } + + def startTask { + info("Initializing stream tasks.") + + taskInstances.values.foreach(_.initTask) + } + + def startProducers { + info("Registering task instances with producers.") + + taskInstances.values.foreach(_.registerProducers) + + info("Starting producer multiplexer.") + + producerMultiplexer.start + } + + def startConsumers { + info("Registering task instances with consumers.") + + taskInstances.values.foreach(_.registerConsumers) + + info("Starting consumer multiplexer.") + + consumerMultiplexer.start + } + + def startSecurityManger: Unit = { + if (securityManager != null) { + info("Starting security manager.") + + securityManager.start + } + } + + def shutdownConsumers { + info("Shutting down consumer multiplexer.") + + consumerMultiplexer.stop + } + + def shutdownProducers { + info("Shutting down producer multiplexer.") + + producerMultiplexer.stop + } + + def shutdownTask { + info("Shutting down task instance stream tasks.") + + taskInstances.values.foreach(_.shutdownTask) + } + + def shutdownStores { + info("Shutting down task instance stores.") + + taskInstances.values.foreach(_.shutdownStores) + } + + def shutdownLocalityManager { + if(localityManager != null) { + info("Shutting down locality manager.") + localityManager.stop + } + } + + def shutdownOffsetManager { + info("Shutting down offset manager.") + + offsetManager.stop + } + + + def shutdownMetrics { + info("Shutting down metrics reporters.") + + reporters.values.foreach(_.stop) + + if (jvm != null) { + info("Shutting down JVM metrics.") + + jvm.stop + } + } + + def shutdownSecurityManger: Unit = { + if (securityManager != null) { + info("Shutting down security manager.") + + securityManager.stop + } + } + + def shutdownDiskSpaceMonitor: Unit = { + if (diskSpaceMonitor != null) { + info("Shutting down disk space monitor.") + diskSpaceMonitor.stop() + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index 9e6641c..2044ce0 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -38,21 +38,13 @@ class SamzaContainerMetrics( val processNs = newTimer("process-ns") val commitNs = newTimer("commit-ns") val utilization = newGauge("event-loop-utilization", 0.0F) + val diskUsageBytes = newGauge("disk-usage-bytes", 0L) + val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue) + val executorWorkFactor = newGauge("executor-work-factor", 1.0) val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } - - /** - * Creates or gets the disk usage gauge for the container and returns it. - */ - def createOrGetDiskUsageGauge(): Gauge[Long] = { - // Despite the name, this function appears to be idempotent. A more defensive approach would be - // to ensure idempotency at this level, e.g. via a CAS operation. Unfortunately, it appears that - // the mechanism to register a Gauge is hidden. An alternative would be to use a mutex to - // set ensure the gauge is created once. - newGauge("disk-usage", 0L) - } } http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index c77d929..84166b4 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -364,4 +364,27 @@ object Util extends Logging { info("use default serde %s for %s" format (serde, serdeName)) serde } + + /** + * Add the supplied arguments and handle overflow by clamping the resulting sum to + * {@code Long.MinValue} if the sum would have been less than {@code Long.MinValue} or + * {@code Long.MaxValue} if the sum would have been greater than {@code Long.MaxValue}. + * + * @param lhs left hand side of sum + * @param rhs right hand side of sum + * @return the sum if no overflow occurs, or the clamped extreme if it does. + */ + def clampAdd(lhs: Long, rhs: Long): Long = { + val sum = lhs + rhs + + // From "Hacker's Delight", overflow occurs IFF both operands have the same sign and the + // sign of the sum differs from the operands. Here we're doing a basic bitwise check that + // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will have the high-order + // bit set to true IFF the signs are different. + if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) { + return if (lhs >= 0) Long.MaxValue else Long.MinValue + } + + sum + } } http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java b/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java new file mode 100644 index 0000000..96c462b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/disk/TestDiskQuotaPolicyEntry.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.disk; + +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; + +public class TestDiskQuotaPolicyEntry { + private static final double ARBITRARY_HIGH_WATER_MARK = 0.75; + private static final double ARBITRARY_LOW_WATER_MARK = 0.25; + private static final double ARBITRARY_WORK_FACTOR = 1.0; + + @Test + public void testConstruction() { + final WatermarkDiskQuotaPolicy.Entry policy = new WatermarkDiskQuotaPolicy.Entry( + ARBITRARY_LOW_WATER_MARK, + ARBITRARY_HIGH_WATER_MARK, + ARBITRARY_WORK_FACTOR); + + assertEquals(ARBITRARY_LOW_WATER_MARK, policy.getLowWaterMarkPercent()); + assertEquals(ARBITRARY_HIGH_WATER_MARK, policy.getHighWaterMarkPercent()); + assertEquals(ARBITRARY_WORK_FACTOR, policy.getWorkFactor()); + } + + @Test(expected = IllegalArgumentException.class) + public void testLowWaterMarkGreaterThanHighWaterMark() { + new WatermarkDiskQuotaPolicy.Entry( + 1.0, + ARBITRARY_HIGH_WATER_MARK, + ARBITRARY_WORK_FACTOR); + } + + @Test(expected = IllegalArgumentException.class) + public void testLowWaterMarkBelowZero() { + new WatermarkDiskQuotaPolicy.Entry( + -1.0, + ARBITRARY_HIGH_WATER_MARK, + ARBITRARY_WORK_FACTOR); + } + + @Test(expected = IllegalArgumentException.class) + public void testHighWaterMarkAboveOne() { + new WatermarkDiskQuotaPolicy.Entry( + ARBITRARY_LOW_WATER_MARK, + 2.0, + ARBITRARY_WORK_FACTOR); + } + + @Test(expected = IllegalArgumentException.class) + public void testWorkFactorOfZero() { + new WatermarkDiskQuotaPolicy.Entry( + ARBITRARY_LOW_WATER_MARK, + ARBITRARY_HIGH_WATER_MARK, + 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testWorkFactorGreaterThanOne() { + new WatermarkDiskQuotaPolicy.Entry( + ARBITRARY_LOW_WATER_MARK, + ARBITRARY_HIGH_WATER_MARK, + 2.0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/27c9e4c2/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java b/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java new file mode 100644 index 0000000..8367d41 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/disk/TestWatermarkDiskQuotaPolicy.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.disk; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static junit.framework.Assert.assertEquals; + +public class TestWatermarkDiskQuotaPolicy { + @Test + public void testNoEntries() { + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(Collections.<WatermarkDiskQuotaPolicy.Entry>emptyList()); + + assertEquals(1.0, policy.apply(1.0)); + assertEquals(1.0, policy.apply(0.5)); + assertEquals(1.0, policy.apply(0.0)); + } + + @Test + public void testOneEntryUntriggered() { + double workFactor = 0.5f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(1.0, policy.apply(1.0)); + assertEquals(1.0, policy.apply(0.75)); + assertEquals(1.0, policy.apply(0.5)); + } + + @Test + public void testOneEntryTriggered() { + double workFactor = 0.5f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(workFactor, policy.apply(0.25)); + } + + @Test + public void testTwoEntriesTriggered() { + double workFactor1 = 0.5f; + double workFactor2 = 0.25f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1), + new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(1.0, policy.apply(0.5)); + assertEquals(workFactor1, policy.apply(0.4)); + assertEquals(workFactor2, policy.apply(0.1)); + } + + @Test + public void testTwoEntriesTriggeredSkipFirst() { + double workFactor1 = 0.5f; + double workFactor2 = 0.25f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1), + new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(workFactor2, policy.apply(0.1)); + } + + @Test + public void testTwoEntriesReversedOrder() { + // Results should be the same regardless of order as we sort policies at construction time. + double workFactor1 = 0.5f; + double workFactor2 = 0.25f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2), + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(workFactor2, policy.apply(0.1)); + } + + @Test + public void testTriggerEntriesAndRecover() { + double workFactor1 = 0.5f; + double workFactor2 = 0.25f; + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, workFactor1), + new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, workFactor2)); + WatermarkDiskQuotaPolicy policy = new WatermarkDiskQuotaPolicy(entries); + + assertEquals(workFactor2, policy.apply(0.1)); + assertEquals(workFactor1, policy.apply(0.4)); + assertEquals(1.0, policy.apply(1.0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testWorkFactorTooHigh() { + new WatermarkDiskQuotaPolicy( + Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 1.5))); + } + + @Test(expected = IllegalArgumentException.class) + public void testWorkFactorTooLow() { + new WatermarkDiskQuotaPolicy( + Collections.singletonList(new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, -1.0))); + } + + @Test(expected = IllegalArgumentException.class) + public void testRaiseWorkFactorWithLowerThreshold() { + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5), + new WatermarkDiskQuotaPolicy.Entry(0.2, 0.4, 0.75)); + new WatermarkDiskQuotaPolicy(entries); + } + + @Test(expected = IllegalArgumentException.class) + public void testDuplicatedRange() { + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5), + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.75)); + new WatermarkDiskQuotaPolicy(entries); + } + + @Test(expected = IllegalArgumentException.class) + public void testFullyOverlappedRange1() { + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 1.0, 0.5), + new WatermarkDiskQuotaPolicy.Entry(0.25, 1.0, 0.75)); + new WatermarkDiskQuotaPolicy(entries); + } + + @Test(expected = IllegalArgumentException.class) + public void testFullyOverlappedRange2() { + List<WatermarkDiskQuotaPolicy.Entry> entries = Arrays.asList( + new WatermarkDiskQuotaPolicy.Entry(0.5, 0.75, 0.75), + new WatermarkDiskQuotaPolicy.Entry(0.25, 1.0, 0.5)); + new WatermarkDiskQuotaPolicy(entries); + } +}