Repository: kafka Updated Branches: refs/heads/trunk 7d6515fb8 -> b30d68a4e
KAFKA-2743: Make forwarded task reconfiguration requests asynchronous on backoff on retrying. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang Closes #422 from ewencp/task-reconfiguration-async-with-backoff Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b30d68a4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b30d68a4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b30d68a4 Branch: refs/heads/trunk Commit: b30d68a4e31cb49c2e664bb2e3263e056a27db40 Parents: 7d6515f Author: Ewen Cheslack-Postava <[email protected]> Authored: Thu Nov 5 08:49:58 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 08:49:58 2015 -0800 ---------------------------------------------------------------------- .../runtime/distributed/DistributedHerder.java | 264 +++++++++++++------ .../distributed/DistributedHerderTest.java | 5 +- 2 files changed, 187 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b30d68a4/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java index 4c88737..96de1ca 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java @@ -19,6 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.errors.AlreadyExistsException; @@ -46,12 +48,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -79,9 +83,12 @@ import java.util.concurrent.atomic.AtomicBoolean; public class DistributedHerder implements Herder, Runnable { private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); + private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; + private final Worker worker; private final KafkaConfigStorage configStorage; private ClusterConfigState configState; + private final Time time; private final int workerSyncTimeoutMs; private final int workerUnsyncBackoffMs; @@ -97,18 +104,20 @@ public class DistributedHerder implements Herder, Runnable { // To handle most external requests, like creating or destroying a connector, we can use a generic request where // the caller specifies all the code that should be executed. - private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>(); + private final Queue<HerderRequest> requests = new PriorityQueue<>(); // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). private Set<String> connectorConfigUpdates = new HashSet<>(); private boolean needsReconfigRebalance; + private final ExecutorService forwardRequestExecutor; + public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) { - this(config, worker, null, null, restUrl); + this(config, worker, null, null, restUrl, new SystemTime()); } // public for testing - public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl) { + public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) { this.worker = worker; if (configStorage != null) { // For testing. Assume configuration has already been performed @@ -118,6 +127,7 @@ public class DistributedHerder implements Herder, Runnable { this.configStorage.configure(config.originals()); } configState = ClusterConfigState.EMPTY; + this.time = time; this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); @@ -127,11 +137,13 @@ public class DistributedHerder implements Herder, Runnable { rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks needsReconfigRebalance = false; + + forwardRequestExecutor = Executors.newSingleThreadExecutor(); } @Override public void start() { - Thread thread = new Thread(this); + Thread thread = new Thread(this, "DistributedHerder"); thread.start(); } @@ -152,6 +164,7 @@ public class DistributedHerder implements Herder, Runnable { log.info("Herder stopped"); } catch (Throwable t) { log.error("Uncaught exception in herder work thread, exiting: ", t); + stopLatch.countDown(); System.exit(1); } finally { stopLatch.countDown(); @@ -177,14 +190,27 @@ public class DistributedHerder implements Herder, Runnable { } // Process any external requests - while (!requests.isEmpty()) { - HerderRequest request = requests.poll(); - Callback<Void> cb = request.callback(); + final long now = time.milliseconds(); + long nextRequestTimeoutMs = Long.MAX_VALUE; + while (true) { + final HerderRequest next; + synchronized (this) { + next = requests.peek(); + if (next == null) { + break; + } else if (now >= next.at) { + requests.poll(); + } else { + nextRequestTimeoutMs = next.at - now; + break; + } + } + try { - request.action().call(); - cb.onCompletion(null, null); + next.action().call(); + next.callback().onCompletion(null, null); } catch (Throwable t) { - cb.onCompletion(t, null); + next.callback().onCompletion(t, null); } } @@ -237,7 +263,7 @@ public class DistributedHerder implements Herder, Runnable { // Let the group take any actions it needs to try { - member.poll(Long.MAX_VALUE); + member.poll(nextRequestTimeoutMs); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin if (!handleRebalanceCompleted()) return; } catch (WakeupException e) { // FIXME should not be WakeupException @@ -292,13 +318,24 @@ public class DistributedHerder implements Herder, Runnable { // ignore, should not happen } } + + + forwardRequestExecutor.shutdown(); + try { + if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS)) + forwardRequestExecutor.shutdownNow(); + } catch (InterruptedException e) { + // ignore + } + + log.info("Herder stopped"); } @Override public synchronized void connectors(final Callback<Collection<String>> callback) { log.trace("Submitting connector listing request"); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { @@ -308,16 +345,16 @@ public class DistributedHerder implements Herder, Runnable { callback.onCompletion(null, configState.connectors()); return null; } - } - )); - member.wakeup(); + }, + forwardErrorCallback(callback) + ); } @Override public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) { log.trace("Submitting connector info request {}", connName); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { @@ -331,9 +368,9 @@ public class DistributedHerder implements Herder, Runnable { } return null; } - } - )); - member.wakeup(); + }, + forwardErrorCallback(callback) + ); } @Override @@ -366,7 +403,7 @@ public class DistributedHerder implements Herder, Runnable { log.trace("Submitting connector config write request {}", connName); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { @@ -399,31 +436,38 @@ public class DistributedHerder implements Herder, Runnable { return null; } - })); - member.wakeup(); + }, + forwardErrorCallback(callback) + ); } @Override public synchronized void requestTaskReconfiguration(final String connName) { log.trace("Submitting connector task reconfiguration request {}", connName); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { - reconfigureConnector(connName); + reconfigureConnectorTasksWithRetry(connName); return null; } + }, + new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + log.error("Unexpected error during task reconfiguration: ", error); + log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); + } } - )); - member.wakeup(); + ); } @Override public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) { log.trace("Submitting get task configuration request {}", connName); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { @@ -442,16 +486,16 @@ public class DistributedHerder implements Herder, Runnable { } return null; } - } - )); - member.wakeup(); + }, + forwardErrorCallback(callback) + ); } @Override public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) { log.trace("Submitting put task configuration request {}", connName); - requests.add(new HerderRequest( + addRequest( new Callable<Void>() { @Override public Void call() throws Exception { @@ -465,9 +509,9 @@ public class DistributedHerder implements Herder, Runnable { } return null; } - } - )); - member.wakeup(); + }, + forwardErrorCallback(callback) + ); } @@ -626,48 +670,92 @@ public class DistributedHerder implements Herder, Runnable { // Immediately request configuration since this could be a brand new connector. However, also only update those // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is // just restoring an existing connector. - reconfigureConnector(connName); + reconfigureConnectorTasksWithRetry(connName); + } + + private void reconfigureConnectorTasksWithRetry(final String connName) { + reconfigureConnector(connName, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + // If we encountered an error, we don't have much choice but to just retry. If we don't, we could get + // stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore + // never makes progress. The retry has to run through a HerderRequest since this callback could be happening + // from the HTTP request forwarding thread. + if (error != null) { + log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error); + addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS, + new Callable<Void>() { + @Override + public Void call() throws Exception { + reconfigureConnectorTasksWithRetry(connName); + return null; + } + }, new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + log.error("Unexpected error during connector task reconfiguration: ", error); + log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); + } + } + ); + } + } + }); } // Updates configurations for a connector by requesting them from the connector, filling in parameters provided // by the system, then checks whether any configs have actually changed before submitting the new configs to storage - private void reconfigureConnector(String connName) { - Map<String, String> configs = configState.connectorConfig(connName); - ConnectorConfig connConfig = new ConnectorConfig(configs); - - List<String> sinkTopics = null; - if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) - sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); - - List<Map<String, String>> taskProps - = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); - boolean changed = false; - int currentNumTasks = configState.taskCount(connName); - if (taskProps.size() != currentNumTasks) { - log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size()); - changed = true; - } else { - int index = 0; - for (Map<String, String> taskConfig : taskProps) { - if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) { - log.debug("Change in task configurations, writing updated task configurations"); - changed = true; - break; + private void reconfigureConnector(final String connName, final Callback<Void> cb) { + try { + Map<String, String> configs = configState.connectorConfig(connName); + ConnectorConfig connConfig = new ConnectorConfig(configs); + + List<String> sinkTopics = null; + if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) + sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); + + final List<Map<String, String>> taskProps + = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); + boolean changed = false; + int currentNumTasks = configState.taskCount(connName); + if (taskProps.size() != currentNumTasks) { + log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size()); + changed = true; + } else { + int index = 0; + for (Map<String, String> taskConfig : taskProps) { + if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) { + log.debug("Change in task configurations, writing updated task configurations"); + changed = true; + break; + } + index++; } - index++; } - } - if (changed) { - if (isLeader()) { - configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps)); - } else { - try { - String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks"); - RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); - } catch (CopycatException e) { - log.error("Request to leader to reconfigure connector tasks failed", e); + if (changed) { + if (isLeader()) { + configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps)); + cb.onCompletion(null, null); + } else { + // We cannot forward the request on the same thread because this reconfiguration can happen in as a + // result of . If we blocked + forwardRequestExecutor.submit(new Runnable() { + @Override + public void run() { + try { + String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks"); + RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); + cb.onCompletion(null, null); + } catch (CopycatException e) { + log.error("Request to leader to reconfigure connector tasks failed", e); + cb.onCompletion(e, null); + } + } + }); } } + } catch (Throwable t) { + cb.onCompletion(t, null); } } @@ -684,21 +772,28 @@ public class DistributedHerder implements Herder, Runnable { return true; } + private void addRequest(Callable<Void> action, Callback<Void> callback) { + addRequest(0, action, callback); + } + + private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { + HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback); + requests.add(req); + if (requests.peek() == req) + member.wakeup(); + } - private class HerderRequest { + private class HerderRequest implements Comparable<HerderRequest> { + private final long at; private final Callable<Void> action; private final Callback<Void> callback; - public HerderRequest(Callable<Void> action, Callback<Void> callback) { + public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) { + this.at = at; this.action = action; this.callback = callback; } - public HerderRequest(Callable<Void> action) { - this.action = action; - this.callback = DEFAULT_CALLBACK; - } - public Callable<Void> action() { return action; } @@ -706,14 +801,21 @@ public class DistributedHerder implements Herder, Runnable { public Callback<Void> callback() { return callback; } - } - private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>() { @Override - public void onCompletion(Throwable error, Void result) { - if (error != null) - log.error("HerderRequest's action threw an exception: ", error); + public int compareTo(HerderRequest o) { + return Long.compare(at, o.at); } + } + + private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) { + return new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + if (error != null) + callback.onCompletion(error, null); + } + }; }; http://git-wip-us.apache.org/repos/asf/kafka/blob/b30d68a4/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java index 8f28f5f..7873447 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.copycat.runtime.distributed; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.errors.AlreadyExistsException; import org.apache.kafka.copycat.runtime.ConnectorConfig; @@ -124,6 +125,7 @@ public class DistributedHerderTest { @Mock private KafkaConfigStorage configStorage; @Mock private WorkerGroupMember member; + private MockTime time; private DistributedHerder herder; @Mock private Worker worker; @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; @@ -135,9 +137,10 @@ public class DistributedHerderTest { @Before public void setUp() throws Exception { worker = PowerMock.createMock(Worker.class); + time = new MockTime(); herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, - new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL); + new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time); connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback"); taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback"); rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
