[ 
https://issues.apache.org/jira/browse/KAFKA-6696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439086#comment-16439086
 ] 

ASF GitHub Bot commented on KAFKA-6696:
---------------------------------------

rajinisivaram closed pull request #4759: KAFKA-6696 Trogdor should support 
destroying tasks
URL: https://github.com/apache/kafka/pull/4759
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2767132886d..64258bf7b07 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Errors|SaslAuthenticatorTest|AgentTest).java"/>
+              
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 3b5b21e68d8..0324d2d2dba 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -27,10 +27,9 @@
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,13 +94,16 @@ public AgentStatusResponse status() throws Exception {
         return new AgentStatusResponse(serverStartMs, 
workerManager.workerStates());
     }
 
-    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws 
Exception {
-        workerManager.createWorker(req.id(), req.spec());
-        return new CreateWorkerResponse(req.spec());
+    public void createWorker(CreateWorkerRequest req) throws Throwable {
+        workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
     }
 
-    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws 
Exception {
-        return new StopWorkerResponse(workerManager.stopWorker(req.id()));
+    public void stopWorker(StopWorkerRequest req) throws Throwable {
+        workerManager.stopWorker(req.workerId(), false);
+    }
+
+    public void destroyWorker(DestroyWorkerRequest req) throws Throwable {
+        workerManager.stopWorker(req.workerId(), true);
     }
 
     public static void main(String[] args) throws Exception {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 08769a0971d..c89011b8650 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -27,15 +27,16 @@
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.UriBuilder;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -116,20 +117,29 @@ public AgentStatusResponse status() throws Exception {
         return resp.body();
     }
 
-    public CreateWorkerResponse createWorker(CreateWorkerRequest request) 
throws Exception {
-        HttpResponse<CreateWorkerResponse> resp =
-            JsonRestServer.<CreateWorkerResponse>httpRequest(
+    public void createWorker(CreateWorkerRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(
                 url("/agent/worker/create"), "POST",
-                request, new TypeReference<CreateWorkerResponse>() { }, 
maxTries);
-        return resp.body();
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
-    public StopWorkerResponse stopWorker(StopWorkerRequest request) throws 
Exception {
-        HttpResponse<StopWorkerResponse> resp =
-            JsonRestServer.<StopWorkerResponse>httpRequest(url(
+    public void stopWorker(StopWorkerRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(url(
                 "/agent/worker/stop"), "PUT",
-                request, new TypeReference<StopWorkerResponse>() { }, 
maxTries);
-        return resp.body();
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
+    }
+
+    public void destroyWorker(DestroyWorkerRequest request) throws Exception {
+        UriBuilder uriBuilder = UriBuilder.fromPath(url("/agent/worker"));
+        uriBuilder.queryParam("workerId", request.workerId());
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(uriBuilder.build().toString(), 
"DELETE",
+                null, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
     public void invokeShutdown() throws Exception {
@@ -166,10 +176,16 @@ public static void main(String[] args) throws Exception {
             .help("Create a new fault.");
         actions.addArgument("--stop-worker")
             .action(store())
-            .type(String.class)
+            .type(Long.class)
             .dest("stop_worker")
-            .metavar("SPEC_JSON")
-            .help("Create a new fault.");
+            .metavar("WORKER_ID")
+            .help("Stop a worker ID.");
+        actions.addArgument("--destroy-worker")
+            .action(store())
+            .type(Long.class)
+            .dest("destroy_worker")
+            .metavar("WORKER_ID")
+            .help("Destroy a worker ID.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
             .type(Boolean.class)
@@ -197,13 +213,21 @@ public static void main(String[] args) throws Exception {
             System.out.println("Got agent status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
         } else if (res.getString("create_worker") != null) {
-            client.createWorker(JsonUtil.JSON_SERDE.
-                readValue(res.getString("create_worker"),
-                    CreateWorkerRequest.class));
-            System.out.println("Created fault.");
+            CreateWorkerRequest req = JsonUtil.JSON_SERDE.
+                readValue(res.getString("create_worker"), 
CreateWorkerRequest.class);
+            client.createWorker(req);
+            System.out.printf("Sent CreateWorkerRequest for worker %d%n.", 
req.workerId());
+        } else if (res.getString("stop_worker") != null) {
+            long workerId = res.getLong("stop_worker");
+            client.stopWorker(new StopWorkerRequest(workerId));
+            System.out.printf("Sent StopWorkerRequest for worker %d%n.", 
workerId);
+        } else if (res.getString("destroy_worker") != null) {
+            long workerId = res.getLong("stop_worker");
+            client.destroyWorker(new DestroyWorkerRequest(workerId));
+            System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", 
workerId);
         } else if (res.getBoolean("shutdown")) {
             client.invokeShutdown();
-            System.out.println("Sent shutdown request.");
+            System.out.println("Sent ShutdownRequest.");
         } else {
             System.out.println("You must choose an action. Type --help for 
help.");
             Exit.exit(1);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
index 773c580fa15..1f2ad49d2fe 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
@@ -18,22 +18,34 @@
 
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import java.util.concurrent.atomic.AtomicReference;
 
-
+/**
+ * The REST resource for the Agent. This describes the RPCs which the agent 
can accept.
+ *
+ * RPCs should be idempotent.  This is important because if the server's 
response is
+ * lost, the client will simply retransmit the same request. The server's 
response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no 
results.
+ * This ensures that if we want to add more return results later, we can do so 
in a
+ * compatible way.
+ */
 @Path("/agent")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -55,14 +67,23 @@ public AgentStatusResponse getStatus() throws Throwable {
 
     @POST
     @Path("/worker/create")
-    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws 
Throwable {
-        return agent().createWorker(req);
+    public Empty createWorker(CreateWorkerRequest req) throws Throwable {
+        agent().createWorker(req);
+        return Empty.INSTANCE;
     }
 
     @PUT
     @Path("/worker/stop")
-    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws 
Throwable {
-        return agent().stopWorker(req);
+    public Empty stopWorker(StopWorkerRequest req) throws Throwable {
+        agent().stopWorker(req);
+        return Empty.INSTANCE;
+    }
+
+    @DELETE
+    @Path("/worker")
+    public Empty destroyWorker(@DefaultValue("0") @QueryParam("workerId") long 
workerId) throws Throwable {
+        agent().destroyWorker(new DestroyWorkerRequest(workerId));
+        return Empty.INSTANCE;
     }
 
     @PUT
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 7c8de6d3f22..59d34c90ab6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -36,10 +37,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -72,7 +75,7 @@
     /**
      * A map of task IDs to Work objects.
      */
-    private final Map<String, Worker> workers;
+    private final Map<Long, Worker> workers;
 
     /**
      * An ExecutorService used to schedule events in the future.
@@ -137,12 +140,15 @@ synchronized boolean shutdown() {
                 return false;
             }
             shutdown = true;
+            if (refCount == 0) {
+                this.notifyAll();
+            }
             return true;
         }
 
         synchronized void waitForQuiescence() throws InterruptedException {
             while ((!shutdown) || (refCount > 0)) {
-                wait();
+                this.wait();
             }
         }
     }
@@ -173,10 +179,15 @@ synchronized void waitForQuiescence() throws 
InterruptedException {
      * A worker which is being tracked.
      */
     class Worker {
+        /**
+         * The worker ID.
+         */
+        private final long workerId;
+
         /**
          * The task ID.
          */
-        private final String id;
+        private final String taskId;
 
         /**
          * The task specification.
@@ -217,7 +228,7 @@ synchronized void waitForQuiescence() throws 
InterruptedException {
          * If there is a task timeout scheduled, this is a future which can
          * be used to cancel it.
          */
-        private Future<TaskSpec> timeoutFuture = null;
+        private Future<Void> timeoutFuture = null;
 
         /**
          * A shutdown manager reference which will keep the WorkerManager
@@ -225,16 +236,26 @@ synchronized void waitForQuiescence() throws 
InterruptedException {
          */
         private ShutdownManager.Reference reference;
 
-        Worker(String id, TaskSpec spec, long now) {
-            this.id = id;
+        /**
+         * Whether we should destroy the records of this worker once it stops.
+         */
+        private boolean mustDestroy = false;
+
+        Worker(long workerId, String taskId, TaskSpec spec, long now) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
-            this.taskWorker = spec.newTaskWorker(id);
+            this.taskWorker = spec.newTaskWorker(taskId);
             this.startedMs = now;
             this.reference = shutdownManager.takeReference();
         }
 
-        String id() {
-            return id;
+        long workerId() {
+            return workerId;
+        }
+
+        String taskId() {
+            return taskId;
         }
 
         TaskSpec spec() {
@@ -244,14 +265,14 @@ TaskSpec spec() {
         WorkerState state() {
             switch (state) {
                 case STARTING:
-                    return new WorkerStarting(spec);
+                    return new WorkerStarting(taskId, spec);
                 case RUNNING:
-                    return new WorkerRunning(spec, startedMs, status.get());
+                    return new WorkerRunning(taskId, spec, startedMs, 
status.get());
                 case CANCELLING:
                 case STOPPING:
-                    return new WorkerStopping(spec, startedMs, status.get());
+                    return new WorkerStopping(taskId, spec, startedMs, 
status.get());
                 case DONE:
-                    return new WorkerDone(spec, startedMs, doneMs, 
status.get(), error);
+                    return new WorkerDone(taskId, spec, startedMs, doneMs, 
status.get(), error);
             }
             throw new RuntimeException("unreachable");
         }
@@ -259,7 +280,7 @@ WorkerState state() {
         void transitionToRunning() {
             state = State.RUNNING;
             timeoutFuture = scheduler.schedule(stateChangeExecutor,
-                new StopWorker(id), spec.durationMs());
+                new StopWorker(workerId, false), spec.durationMs());
         }
 
         void transitionToStopping() {
@@ -268,7 +289,7 @@ void transitionToStopping() {
                 timeoutFuture.cancel(false);
                 timeoutFuture = null;
             }
-            workerCleanupExecutor.submit(new CleanupWorker(this));
+            workerCleanupExecutor.submit(new HaltWorker(this));
         }
 
         void transitionToDone() {
@@ -279,15 +300,20 @@ void transitionToDone() {
                 reference = null;
             }
         }
+
+        @Override
+        public String toString() {
+            return String.format("%s_%d", taskId, workerId);
+        }
     }
 
-    public void createWorker(final String id, TaskSpec spec) throws Exception {
+    public void createWorker(long workerId, String taskId, TaskSpec spec) 
throws Throwable {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
             final Worker worker = stateChangeExecutor.
-                submit(new CreateWorker(id, spec, time.milliseconds())).get();
+                submit(new CreateWorker(workerId, taskId, spec, 
time.milliseconds())).get();
             if (worker == null) {
                 log.info("{}: Ignoring request to create worker {}, because 
there is already " +
-                    "a worker with that id.", nodeName, id);
+                    "a worker with that id.", nodeName, workerId);
                 return;
             }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
@@ -297,9 +323,10 @@ public Void apply(String errorString) {
                     if (errorString == null)
                         errorString = "";
                     if (errorString.isEmpty()) {
-                        log.info("{}: Worker {} is halting.", nodeName, id);
+                        log.info("{}: Worker {} is halting.", nodeName, 
worker);
                     } else {
-                        log.info("{}: Worker {} is halting with error {}", 
nodeName, id, errorString);
+                        log.info("{}: Worker {} is halting with error {}",
+                            nodeName, worker, errorString);
                     }
                     stateChangeExecutor.submit(
                         new HandleWorkerHalting(worker, errorString, false));
@@ -309,11 +336,20 @@ public Void apply(String errorString) {
             try {
                 worker.taskWorker.start(platform, worker.status, haltFuture);
             } catch (Exception e) {
-                log.info("{}: Worker {} start() exception", nodeName, id, e);
+                log.info("{}: Worker {} start() exception", nodeName, worker, 
e);
                 stateChangeExecutor.submit(new HandleWorkerHalting(worker,
                     "worker.start() exception: " + Utils.stackTrace(e), true));
             }
             stateChangeExecutor.submit(new FinishCreatingWorker(worker));
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof RequestConflictException) {
+                log.info("{}: request conflict while creating worker {} for 
task {} with spec {}.",
+                    nodeName, workerId, taskId, spec);
+            } else {
+                log.info("{}: Error creating worker {} for task {} with spec 
{}",
+                    nodeName, workerId, taskId, spec, e);
+            }
+            throw e.getCause();
         }
     }
 
@@ -321,27 +357,42 @@ public Void apply(String errorString) {
      * Handles a request to create a new worker.  Processed by the state 
change thread.
      */
     class CreateWorker implements Callable<Worker> {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
         private final long now;
 
-        CreateWorker(String id, TaskSpec spec, long now) {
-            this.id = id;
+        CreateWorker(long workerId, String taskId, TaskSpec spec, long now) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
             this.now = now;
         }
 
         @Override
         public Worker call() throws Exception {
-            Worker worker = workers.get(id);
-            if (worker != null) {
-                log.info("{}: Task ID {} is already in use.", nodeName, id);
-                return null;
+            try {
+                Worker worker = workers.get(workerId);
+                if (worker != null) {
+                    if (!worker.taskId().equals(taskId)) {
+                        throw new RequestConflictException("There is already a 
worker ID " + workerId +
+                            " with a different task ID.");
+                    } else if (!worker.spec().equals(spec)) {
+                        throw new RequestConflictException("There is already a 
worker ID " + workerId +
+                            " with a different task spec.");
+                    } else {
+                        return null;
+                    }
+                }
+                worker = new Worker(workerId, taskId, spec, now);
+                workers.put(workerId, worker);
+                log.info("{}: Created worker {} with spec {}", nodeName, 
worker, spec);
+                return worker;
+            } catch (Exception e) {
+                log.info("{}: unable to create worker {} for task {}, with 
spec {}",
+                    nodeName, workerId, taskId, spec, e);
+                throw e;
             }
-            worker = new Worker(id, spec, now);
-            workers.put(id, worker);
-            log.info("{}: Created a new worker for task {} with spec {}", 
nodeName, id, spec);
-            return worker;
         }
     }
 
@@ -360,12 +411,12 @@ public Void call() throws Exception {
             switch (worker.state) {
                 case CANCELLING:
                     log.info("{}: Worker {} was cancelled while it was 
starting up.  " +
-                        "Transitioning to STOPPING.", nodeName, worker.id);
+                        "Transitioning to STOPPING.", nodeName, worker);
                     worker.transitionToStopping();
                     break;
                 case STARTING:
                     log.info("{}: Worker {} is now RUNNING.  Scheduled to stop 
in {} ms.",
-                        nodeName, worker.id, worker.spec.durationMs());
+                        nodeName, worker, worker.spec.durationMs());
                     worker.transitionToRunning();
                     break;
                 default:
@@ -400,29 +451,29 @@ public Void call() throws Exception {
                 case STARTING:
                     if (startupHalt) {
                         log.info("{}: Worker {} {} during startup.  
Transitioning to DONE.",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                         worker.transitionToDone();
                     } else {
                         log.info("{}: Worker {} {} during startup.  
Transitioning to CANCELLING.",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                         worker.state = State.CANCELLING;
                     }
                     break;
                 case CANCELLING:
                     log.info("{}: Cancelling worker {} {}.  ",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                     break;
                 case RUNNING:
                     log.info("{}: Running worker {} {}.  Transitioning to 
STOPPING.",
-                        nodeName, worker.id, verb);
+                        nodeName, worker, verb);
                     worker.transitionToStopping();
                     break;
                 case STOPPING:
-                    log.info("{}: Stopping worker {} {}.", nodeName, 
worker.id, verb);
+                    log.info("{}: Stopping worker {} {}.", nodeName, worker, 
verb);
                     break;
                 case DONE:
                     log.info("{}: Can't halt worker {} because it is already 
DONE.",
-                        nodeName, worker.id);
+                        nodeName, worker);
                     break;
             }
             return null;
@@ -432,7 +483,7 @@ public Void call() throws Exception {
     /**
      * Transitions a worker to WorkerDone.  Processed by the state change 
thread.
      */
-    static class CompleteWorker implements Callable<Void> {
+    class CompleteWorker implements Callable<Void> {
         private final Worker worker;
 
         private final String failure;
@@ -448,60 +499,79 @@ public Void call() throws Exception {
                 worker.error = failure;
             }
             worker.transitionToDone();
+            if (worker.mustDestroy) {
+                log.info("{}: destroying worker {} with error {}",
+                    nodeName, worker, worker.error);
+                workers.remove(worker.workerId);
+            } else {
+                log.info("{}: completed worker {} with error {}",
+                    nodeName, worker, worker.error);
+            }
             return null;
         }
     }
 
-    public TaskSpec stopWorker(String id) throws Exception {
+    public void stopWorker(long workerId, boolean mustDestroy) throws 
Throwable {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
-            TaskSpec taskSpec = stateChangeExecutor.submit(new 
StopWorker(id)).get();
-            if (taskSpec == null) {
-                throw new KafkaException("No task found with id " + id);
-            }
-            return taskSpec;
+            stateChangeExecutor.submit(new StopWorker(workerId, 
mustDestroy)).get();
+        } catch (ExecutionException e) {
+            throw e.getCause();
         }
     }
 
     /**
      * Stops a worker.  Processed by the state change thread.
      */
-    class StopWorker implements Callable<TaskSpec> {
-        private final String id;
+    class StopWorker implements Callable<Void> {
+        private final long workerId;
+        private final boolean mustDestroy;
 
-        StopWorker(String id) {
-            this.id = id;
+        StopWorker(long workerId, boolean mustDestroy) {
+            this.workerId = workerId;
+            this.mustDestroy = mustDestroy;
         }
 
         @Override
-        public TaskSpec call() throws Exception {
-            Worker worker = workers.get(id);
+        public Void call() throws Exception {
+            Worker worker = workers.get(workerId);
             if (worker == null) {
+                log.info("{}: Can't stop worker {} because there is no worker 
with that ID.",
+                    nodeName, workerId);
                 return null;
             }
+            if (mustDestroy) {
+                worker.mustDestroy = true;
+            }
             switch (worker.state) {
                 case STARTING:
                     log.info("{}: Cancelling worker {} during its startup 
process.",
-                        nodeName, id);
+                        nodeName, worker);
                     worker.state = State.CANCELLING;
                     break;
                 case CANCELLING:
                     log.info("{}: Can't stop worker {}, because it is already 
being " +
-                        "cancelled.", nodeName, id);
+                        "cancelled.", nodeName, worker);
                     break;
                 case RUNNING:
-                    log.info("{}: Stopping running worker {}.", nodeName, id);
+                    log.info("{}: Stopping running worker {}.", nodeName, 
worker);
                     worker.transitionToStopping();
                     break;
                 case STOPPING:
                     log.info("{}: Can't stop worker {}, because it is already 
" +
-                            "stopping.", nodeName, id);
+                            "stopping.", nodeName, worker);
                     break;
                 case DONE:
-                    log.debug("{}: Can't stop worker {}, because it is already 
done.",
-                        nodeName, id);
+                    if (worker.mustDestroy) {
+                        log.info("{}: destroying worker {} with error {}",
+                            nodeName, worker, worker.error);
+                        workers.remove(worker.workerId);
+                    } else {
+                        log.debug("{}: Can't stop worker {}, because it is 
already done.",
+                            nodeName, worker);
+                    }
                     break;
             }
-            return worker.spec();
+            return null;
         }
     }
 
@@ -509,10 +579,10 @@ public TaskSpec call() throws Exception {
      * Cleans up the resources associated with a worker.  Processed by the 
worker
      * cleanup thread pool.
      */
-    class CleanupWorker implements Callable<Void> {
+    class HaltWorker implements Callable<Void> {
         private final Worker worker;
 
-        CleanupWorker(Worker worker) {
+        HaltWorker(Worker worker) {
             this.worker = worker;
         }
 
@@ -530,18 +600,18 @@ public Void call() throws Exception {
         }
     }
 
-    public TreeMap<String, WorkerState> workerStates() throws Exception {
+    public TreeMap<Long, WorkerState> workerStates() throws Exception {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
             return stateChangeExecutor.submit(new GetWorkerStates()).get();
         }
     }
 
-    class GetWorkerStates implements Callable<TreeMap<String, WorkerState>> {
+    class GetWorkerStates implements Callable<TreeMap<Long, WorkerState>> {
         @Override
-        public TreeMap<String, WorkerState> call() throws Exception {
-            TreeMap<String, WorkerState> workerMap = new TreeMap<>();
+        public TreeMap<Long, WorkerState> call() throws Exception {
+            TreeMap<Long, WorkerState> workerMap = new TreeMap<>();
             for (Worker worker : workers.values()) {
-                workerMap.put(worker.id(), worker.state());
+                workerMap.put(worker.workerId(), worker.state());
             }
             return workerMap;
         }
@@ -562,17 +632,53 @@ public void waitForShutdown() throws Exception {
     class Shutdown implements Callable<Void> {
         @Override
         public Void call() throws Exception {
-            log.info("{}: Shutting down WorkerManager.", 
platform.curNode().name());
-            for (Worker worker : workers.values()) {
-                stateChangeExecutor.submit(new StopWorker(worker.id));
+            log.info("{}: Shutting down WorkerManager.", nodeName);
+            try {
+                stateChangeExecutor.submit(new DestroyAllWorkers()).get();
+                log.info("{}: Waiting for shutdownManager quiescence...", 
nodeName);
+                shutdownManager.waitForQuiescence();
+                workerCleanupExecutor.shutdownNow();
+                stateChangeExecutor.shutdownNow();
+                log.info("{}: Waiting for workerCleanupExecutor to 
terminate...", nodeName);
+                workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
+                log.info("{}: Waiting for stateChangeExecutor to 
terminate...", nodeName);
+                stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
+                log.info("{}: Shutting down shutdownExecutor.", nodeName);
+                shutdownExecutor.shutdown();
+            } catch (Exception e) {
+                log.info("{}: Caught exception while shutting down 
WorkerManager", nodeName, e);
+                throw e;
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Begins the process of destroying all workers.  Processed by the state 
change thread.
+     */
+    class DestroyAllWorkers implements Callable<Void> {
+        @Override
+        public Void call() throws Exception {
+            log.info("{}: Destroying all workers.", nodeName);
+
+            // StopWorker may remove elements from the set of worker IDs.  
That might generate
+            // a ConcurrentModificationException if we were iterating over the 
worker ID
+            // set directly.  Therefore, we make a copy of the worker IDs here 
and iterate
+            // over that instead.
+            //
+            // Note that there is no possible way that more worker IDs can be 
added while this
+            // callable is running, because the state change executor is 
single-threaded.
+            ArrayList<Long> workerIds = new ArrayList<>(workers.keySet());
+
+            for (long workerId : workerIds) {
+                try {
+                    new StopWorker(workerId, true).call();
+                } catch (Exception e) {
+                    log.error("Failed to stop worker {}", workerId, e);
+                }
             }
-            shutdownManager.waitForQuiescence();
-            workerCleanupExecutor.shutdownNow();
-            stateChangeExecutor.shutdownNow();
-            workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
-            stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
-            shutdownExecutor.shutdown();
             return null;
         }
     }
+
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 717d7c7047a..23f3ceb91b0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -27,15 +27,16 @@
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 /**
@@ -72,9 +73,9 @@
      * @param resource      The AgentRestResoure to use.
      */
     public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer 
restServer,
-                       CoordinatorRestResource resource) {
+                       CoordinatorRestResource resource, long firstWorkerId) {
         this.startTimeMs = scheduler.time().milliseconds();
-        this.taskManager = new TaskManager(platform, scheduler);
+        this.taskManager = new TaskManager(platform, scheduler, firstWorkerId);
         this.restServer = restServer;
         resource.setCoordinator(this);
     }
@@ -87,12 +88,16 @@ public CoordinatorStatusResponse status() throws Exception {
         return new CoordinatorStatusResponse(startTimeMs);
     }
 
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws 
Exception {
-        return new CreateTaskResponse(taskManager.createTask(request.id(), 
request.spec()));
+    public void createTask(CreateTaskRequest request) throws Throwable {
+        taskManager.createTask(request.id(), request.spec());
+    }
+
+    public void stopTask(StopTaskRequest request) throws Throwable {
+        taskManager.stopTask(request.id());
     }
 
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception 
{
-        return new StopTaskResponse(taskManager.stopTask(request.id()));
+    public void destroyTask(DestroyTaskRequest request) throws Throwable {
+        taskManager.destroyTask(request.id());
     }
 
     public TasksResponse tasks(TasksRequest request) throws Exception {
@@ -149,7 +154,7 @@ public static void main(String[] args) throws Exception {
         CoordinatorRestResource resource = new CoordinatorRestResource();
         log.info("Starting coordinator process.");
         final Coordinator coordinator = new Coordinator(platform, 
Scheduler.SYSTEM,
-            restServer, resource);
+            restServer, resource, ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE / 2));
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 0677296ee3c..780ae737e0e 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -27,12 +27,11 @@
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
@@ -116,36 +115,45 @@ public CoordinatorStatusResponse status() throws 
Exception {
         return resp.body();
     }
 
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws 
Exception {
-        HttpResponse<CreateTaskResponse> resp =
-            JsonRestServer.<CreateTaskResponse>httpRequest(log, 
url("/coordinator/task/create"), "POST",
-                request, new TypeReference<CreateTaskResponse>() { }, 
maxTries);
-        return resp.body();
+    public void createTask(CreateTaskRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, url("/coordinator/task/create"), 
"POST",
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception 
{
-        HttpResponse<StopTaskResponse> resp =
-            JsonRestServer.<StopTaskResponse>httpRequest(log, 
url("/coordinator/task/stop"), "PUT",
-                request, new TypeReference<StopTaskResponse>() { }, maxTries);
-        return resp.body();
+    public void stopTask(StopTaskRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, url("/coordinator/task/stop"), 
"PUT",
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
+    }
+
+    public void destroyTask(DestroyTaskRequest request) throws Exception {
+        UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
+        uriBuilder.queryParam("taskId", request.id());
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, uriBuilder.build().toString(), 
"DELETE",
+                null, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
     public TasksResponse tasks(TasksRequest request) throws Exception {
         UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
-        uriBuilder.queryParam("taskId", request.taskIds().toArray(new 
String[0]));
+        uriBuilder.queryParam("taskId", (Object[]) 
request.taskIds().toArray(new String[0]));
         uriBuilder.queryParam("firstStartMs", request.firstStartMs());
         uriBuilder.queryParam("lastStartMs", request.lastStartMs());
         uriBuilder.queryParam("firstEndMs", request.firstEndMs());
         uriBuilder.queryParam("lastEndMs", request.lastEndMs());
         HttpResponse<TasksResponse> resp =
-            JsonRestServer.<TasksResponse>httpRequest(log, 
uriBuilder.build().toString(), "GET",
+            JsonRestServer.httpRequest(log, uriBuilder.build().toString(), 
"GET",
                 null, new TypeReference<TasksResponse>() { }, maxTries);
         return resp.body();
     }
 
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
-            JsonRestServer.<Empty>httpRequest(log, 
url("/coordinator/shutdown"), "PUT",
+            JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), 
"PUT",
                 null, new TypeReference<Empty>() { }, maxTries);
         resp.body();
     }
@@ -185,6 +193,12 @@ public static void main(String[] args) throws Exception {
             .dest("stop_task")
             .metavar("TASK_ID")
             .help("Stop a task.");
+        actions.addArgument("--destroy-task")
+            .action(store())
+            .type(String.class)
+            .dest("destroy_task")
+            .metavar("TASK_ID")
+            .help("Destroy a task.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
             .type(Boolean.class)
@@ -216,15 +230,21 @@ public static void main(String[] args) throws Exception {
                 JsonUtil.toPrettyJsonString(client.tasks(
                     new TasksRequest(null, 0, 0, 0, 0))));
         } else if (res.getString("create_task") != null) {
-            
client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
-                CreateTaskRequest.class));
-            System.out.println("Created task.");
+            CreateTaskRequest req = JsonUtil.JSON_SERDE.
+                readValue(res.getString("create_task"), 
CreateTaskRequest.class);
+            client.createTask(req);
+            System.out.printf("Sent CreateTaskRequest for task %s.", req.id());
         } else if (res.getString("stop_task") != null) {
-            client.stopTask(new StopTaskRequest(res.getString("stop_task")));
-            System.out.println("Created task.");
+            String taskId = res.getString("stop_task");
+            client.stopTask(new StopTaskRequest(taskId));
+            System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
+        } else if (res.getString("destroy_task") != null) {
+            String taskId = res.getString("destroy_task");
+            client.destroyTask(new DestroyTaskRequest(taskId));
+            System.out.printf("Sent DestroyTaskRequest for task %s.%n", 
taskId);
         } else if (res.getBoolean("shutdown")) {
             client.shutdown();
-            System.out.println("Sent shutdown request.");
+            System.out.println("Sent ShutdownRequest.");
         } else {
             System.out.println("You must choose an action. Type --help for 
help.");
             Exit.exit(1);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index b8663ec4cc3..cbfbddd7eda 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -19,15 +19,15 @@
 import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -39,7 +39,18 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
-
+/**
+ * The REST resource for the Coordinator. This describes the RPCs which the 
coordinator
+ * can accept.
+ *
+ * RPCs should be idempotent.  This is important because if the server's 
response is
+ * lost, the client will simply retransmit the same request. The server's 
response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no 
results.
+ * This ensures that if we want to add more return results later, we can do so 
in a
+ * compatible way.
+ */
 @Path("/coordinator")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -61,14 +72,23 @@ public CoordinatorStatusResponse status() throws Throwable {
 
     @POST
     @Path("/task/create")
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws 
Throwable {
-        return coordinator().createTask(request);
+    public Empty createTask(CreateTaskRequest request) throws Throwable {
+        coordinator().createTask(request);
+        return Empty.INSTANCE;
     }
 
     @PUT
     @Path("/task/stop")
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Throwable 
{
-        return coordinator().stopTask(request);
+    public Empty stopTask(StopTaskRequest request) throws Throwable {
+        coordinator().stopTask(request);
+        return Empty.INSTANCE;
+    }
+
+    @DELETE
+    @Path("/tasks")
+    public Empty destroyTask(@DefaultValue("") @QueryParam("taskId") String 
taskId) throws Throwable {
+        coordinator().destroyTask(new DestroyTaskRequest(taskId));
+        return Empty.INSTANCE;
     }
 
     @GET
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 91ef9c2928a..3f0075e598a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -79,13 +79,16 @@
     private static final long HEARTBEAT_DELAY_MS = 1000L;
 
     class ManagedWorker {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
         private boolean shouldRun;
         private WorkerState state;
 
-        ManagedWorker(String id, TaskSpec spec, boolean shouldRun, WorkerState 
state) {
-            this.id = id;
+        ManagedWorker(long workerId, String taskId, TaskSpec spec,
+                      boolean shouldRun, WorkerState state) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
             this.shouldRun = shouldRun;
             this.state = state;
@@ -93,19 +96,24 @@
 
         void tryCreate() {
             try {
-                client.createWorker(new CreateWorkerRequest(id, spec));
+                client.createWorker(new CreateWorkerRequest(workerId, taskId, 
spec));
             } catch (Throwable e) {
-                log.error("{}: error creating worker {}.", node.name(), id, e);
+                log.error("{}: error creating worker {}.", node.name(), this, 
e);
             }
         }
 
         void tryStop() {
             try {
-                client.stopWorker(new StopWorkerRequest(id));
+                client.stopWorker(new StopWorkerRequest(workerId));
             } catch (Throwable e) {
-                log.error("{}: error stopping worker {}.", node.name(), id, e);
+                log.error("{}: error stopping worker {}.", node.name(), this, 
e);
             }
         }
+
+        @Override
+        public String toString() {
+            return String.format("%s_%d", taskId, workerId);
+        }
     }
 
     /**
@@ -126,7 +134,7 @@ void tryStop() {
     /**
      * Maps task IDs to worker structures.
      */
-    private final Map<String, ManagedWorker> workers;
+    private final Map<Long, ManagedWorker> workers;
 
     /**
      * An executor service which manages the thread dedicated to this node.
@@ -196,24 +204,25 @@ public void run() {
                 }
                 // Identify workers which we think should be running, but 
which do not appear
                 // in the agent's response.  We need to send startWorker 
requests for these.
-                for (Map.Entry<String, ManagedWorker> entry : 
workers.entrySet()) {
-                    String id = entry.getKey();
-                    if (!agentStatus.workers().containsKey(id)) {
+                for (Map.Entry<Long, ManagedWorker> entry : 
workers.entrySet()) {
+                    Long workerId = entry.getKey();
+                    if (!agentStatus.workers().containsKey(workerId)) {
                         ManagedWorker worker = entry.getValue();
                         if (worker.shouldRun) {
                             worker.tryCreate();
                         }
                     }
                 }
-                for (Map.Entry<String, WorkerState> entry : 
agentStatus.workers().entrySet()) {
-                    String id = entry.getKey();
+                for (Map.Entry<Long, WorkerState> entry : 
agentStatus.workers().entrySet()) {
+                    long workerId = entry.getKey();
                     WorkerState state = entry.getValue();
-                    ManagedWorker worker = workers.get(id);
+                    ManagedWorker worker = workers.get(workerId);
                     if (worker == null) {
                         // Identify tasks which are running, but which we 
don't know about.
                         // Add these to the NodeManager as tasks that should 
not be running.
-                        log.warn("{}: scheduling unknown worker {} for 
stopping.", node.name(), id);
-                        workers.put(id, new ManagedWorker(id, state.spec(), 
false, state));
+                        log.warn("{}: scheduling unknown worker with ID {} for 
stopping.", node.name(), workerId);
+                        workers.put(workerId, new ManagedWorker(workerId, 
state.taskId(),
+                            state.spec(), false, state));
                     } else {
                         // Handle workers which need to be stopped.
                         if (state instanceof WorkerStarting || state 
instanceof WorkerRunning) {
@@ -227,7 +236,7 @@ public void run() {
                         } else {
                             log.info("{}: worker state changed from {} to {}", 
node.name(), worker.state, state);
                             worker.state = state;
-                            taskManager.updateWorkerState(node.name(), 
worker.id, state);
+                            taskManager.updateWorkerState(node.name(), 
worker.workerId, state);
                         }
                     }
                 }
@@ -240,34 +249,39 @@ public void run() {
     /**
      * Create a new worker.
      *
-     * @param id                    The new worker id.
+     * @param workerId              The new worker id.
+     * @param taskId                The new task id.
      * @param spec                  The task specification to use with the new 
worker.
      */
-    public void createWorker(String id, TaskSpec spec) {
-        executor.submit(new CreateWorker(id, spec));
+    public void createWorker(long workerId, String taskId, TaskSpec spec) {
+        executor.submit(new CreateWorker(workerId, taskId, spec));
     }
 
     /**
      * Starts a worker.
      */
     class CreateWorker implements Callable<Void> {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
 
-        CreateWorker(String id, TaskSpec spec) {
-            this.id = id;
+        CreateWorker(long workerId, String taskId, TaskSpec spec) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedWorker worker = workers.get(id);
+            ManagedWorker worker = workers.get(workerId);
             if (worker != null) {
-                log.error("{}: there is already a worker for task {}.", 
node.name(), id);
+                log.error("{}: there is already a worker {} with ID {}.",
+                    node.name(), worker, workerId);
                 return null;
             }
-            log.info("{}: scheduling worker {} to start.", node.name(), id);
-            workers.put(id, new ManagedWorker(id, spec, true, new 
WorkerReceiving(spec)));
+            worker = new ManagedWorker(workerId, taskId, spec, true, new 
WorkerReceiving(taskId, spec));
+            log.info("{}: scheduling worker {} to start.", node.name(), 
worker);
+            workers.put(workerId, worker);
             rescheduleNextHeartbeat(0);
             return null;
         }
@@ -276,41 +290,72 @@ public Void call() throws Exception {
     /**
      * Stop a worker.
      *
-     * @param id                    The id of the worker to stop.
+     * @param workerId              The id of the worker to stop.
      */
-    public void stopWorker(String id) {
-        executor.submit(new StopWorker(id));
+    public void stopWorker(long workerId) {
+        executor.submit(new StopWorker(workerId));
     }
 
     /**
      * Stops a worker.
      */
     class StopWorker implements Callable<Void> {
-        private final String id;
+        private final long workerId;
 
-        StopWorker(String id) {
-            this.id = id;
+        StopWorker(long workerId) {
+            this.workerId = workerId;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedWorker worker = workers.get(id);
+            ManagedWorker worker = workers.get(workerId);
             if (worker == null) {
-                log.error("{}: can't stop non-existent worker {}.", 
node.name(), id);
+                log.error("{}: unable to locate worker to stop with ID {}.", 
node.name(), workerId);
                 return null;
             }
             if (!worker.shouldRun) {
-                log.error("{}: The worker for task {} is already scheduled to 
stop.",
-                    node.name(), id);
+                log.error("{}: Worker {} is already scheduled to stop.",
+                    node.name(), worker);
                 return null;
             }
-            log.info("{}: scheduling worker {} on {} to stop.", node.name(), 
id);
+            log.info("{}: scheduling worker {} to stop.", node.name(), worker);
             worker.shouldRun = false;
             rescheduleNextHeartbeat(0);
             return null;
         }
     }
 
+    /**
+     * Destroy a worker.
+     *
+     * @param workerId              The id of the worker to destroy.
+     */
+    public void destroyWorker(long workerId) {
+        executor.submit(new DestroyWorker(workerId));
+    }
+
+    /**
+     * Destroys a worker.
+     */
+    class DestroyWorker implements Callable<Void> {
+        private final long workerId;
+
+        DestroyWorker(long workerId) {
+            this.workerId = workerId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            ManagedWorker worker = workers.remove(workerId);
+            if (worker == null) {
+                log.error("{}: unable to locate worker to destroy with ID 
{}.", node.name(), workerId);
+                return null;
+            }
+            rescheduleNextHeartbeat(0);
+            return null;
+        }
+    }
+
     public void beginShutdown(boolean stopNode) {
         executor.shutdownNow();
         if (stopNode) {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 7e19c8b34ae..74082bdb60a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -21,6 +21,7 @@
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -28,6 +29,7 @@
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
@@ -106,12 +108,22 @@
      */
     private final Map<String, NodeManager> nodeManagers;
 
+    /**
+     * The states of all workers.
+     */
+    private final Map<Long, WorkerState> workerStates = new HashMap<>();
+
     /**
      * True if the TaskManager is shut down.
      */
     private AtomicBoolean shutdown = new AtomicBoolean(false);
 
-    TaskManager(Platform platform, Scheduler scheduler) {
+    /**
+     * The ID to use for the next worker.  Only accessed by the state change 
thread.
+     */
+    private long nextWorkerId;
+
+    TaskManager(Platform platform, Scheduler scheduler, long firstWorkerId) {
         this.platform = platform;
         this.scheduler = scheduler;
         this.time = scheduler.time();
@@ -119,6 +131,7 @@
         this.executor = Executors.newSingleThreadScheduledExecutor(
             ThreadUtils.createThreadFactory("TaskManagerStateThread", false));
         this.nodeManagers = new HashMap<>();
+        this.nextWorkerId = firstWorkerId;
         for (Node node : platform.topology().nodes().values()) {
             if (Node.Util.getTrogdorAgentPort(node) > 0) {
                 this.nodeManagers.put(node.name(), new NodeManager(node, 
this));
@@ -178,9 +191,9 @@
         private Future<?> startFuture = null;
 
         /**
-         * The states of the workers involved with this task.
+         * Maps node names to worker IDs.
          */
-        public Map<String, WorkerState> workerStates = new TreeMap<>();
+        public TreeMap<String, Long> workerIds = new TreeMap<>();
 
         /**
          * If this is non-empty, a message describing how this task failed.
@@ -240,38 +253,42 @@ TaskState taskState() {
                 case PENDING:
                     return new TaskPending(spec);
                 case RUNNING:
-                    return new TaskRunning(spec, startedMs, 
getCombinedStatus(workerStates));
+                    return new TaskRunning(spec, startedMs, 
getCombinedStatus());
                 case STOPPING:
-                    return new TaskStopping(spec, startedMs, 
getCombinedStatus(workerStates));
+                    return new TaskStopping(spec, startedMs, 
getCombinedStatus());
                 case DONE:
-                    return new TaskDone(spec, startedMs, doneMs, error, 
cancelled, getCombinedStatus(workerStates));
+                    return new TaskDone(spec, startedMs, doneMs, error, 
cancelled, getCombinedStatus());
             }
             throw new RuntimeException("unreachable");
         }
 
-        TreeSet<String> activeWorkers() {
-            TreeSet<String> workerNames = new TreeSet<>();
-            for (Map.Entry<String, WorkerState> entry : 
workerStates.entrySet()) {
-                if (!entry.getValue().done()) {
-                    workerNames.add(entry.getKey());
+        private JsonNode getCombinedStatus() {
+            if (workerIds.size() == 1) {
+                return 
workerStates.get(workerIds.values().iterator().next()).status();
+            } else {
+                ObjectNode objectNode = new 
ObjectNode(JsonNodeFactory.instance);
+                for (Map.Entry<String, Long> entry : workerIds.entrySet()) {
+                    String nodeName = entry.getKey();
+                    Long workerId = entry.getValue();
+                    WorkerState state = workerStates.get(workerId);
+                    JsonNode node = state.status();
+                    if (node != null) {
+                        objectNode.set(nodeName, node);
+                    }
                 }
+                return objectNode;
             }
-            return workerNames;
         }
-    }
 
-    private static final JsonNode getCombinedStatus(Map<String, WorkerState> 
states) {
-        if (states.size() == 1) {
-            return states.values().iterator().next().status();
-        } else {
-            ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
-            for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
-                JsonNode node = entry.getValue().status();
-                if (node != null) {
-                    objectNode.set(entry.getKey(), node);
+        TreeMap<String, Long> activeWorkerIds() {
+            TreeMap<String, Long> activeWorkerIds = new TreeMap<>();
+            for (Map.Entry<String, Long> entry : workerIds.entrySet()) {
+                WorkerState workerState = workerStates.get(entry.getValue());
+                if (!workerState.done()) {
+                    activeWorkerIds.put(entry.getKey(), entry.getValue());
                 }
             }
-            return objectNode;
+            return activeWorkerIds;
         }
     }
 
@@ -280,27 +297,21 @@ private static final JsonNode 
getCombinedStatus(Map<String, WorkerState> states)
      *
      * @param id                    The ID of the task to create.
      * @param spec                  The specification of the task to create.
-     *
-     * @return                      The specification of the task with the 
given ID.
-     *                              Note that if there was already a task with 
the given ID,
-     *                              this may be different from the 
specification that was
-     *                              requested.
      */
-    public TaskSpec createTask(final String id, TaskSpec spec)
-            throws ExecutionException, InterruptedException {
-        final TaskSpec existingSpec = executor.submit(new CreateTask(id, 
spec)).get();
-        if (existingSpec != null) {
-            log.info("Ignoring request to create task {}, because there is 
already " +
-                "a task with that id.", id);
-            return existingSpec;
+    public void createTask(final String id, TaskSpec spec)
+            throws Throwable {
+        try {
+            executor.submit(new CreateTask(id, spec)).get();
+        } catch (ExecutionException e) {
+            log.info("createTask(id={}, spec={}) error", id, spec, e);
+            throw e.getCause();
         }
-        return spec;
     }
 
     /**
      * Handles a request to create a new task.  Processed by the state change 
thread.
      */
-    class CreateTask implements Callable<TaskSpec> {
+    class CreateTask implements Callable<Void> {
         private final String id;
         private final TaskSpec spec;
 
@@ -310,11 +321,18 @@ public TaskSpec createTask(final String id, TaskSpec spec)
         }
 
         @Override
-        public TaskSpec call() throws Exception {
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in 
createTask request.");
+            }
             ManagedTask task = tasks.get(id);
             if (task != null) {
-                log.info("Task ID {} is already in use.", id);
-                return task.spec;
+                if (!task.spec.equals(spec)) {
+                    throw new RequestConflictException("Task ID " + id + " 
already " +
+                        "exists, and has a different spec " + task.spec);
+                }
+                log.info("Task {} already exists with spec {}", id, spec);
+                return null;
             }
             TaskController controller = null;
             String failure = null;
@@ -374,8 +392,10 @@ public Void call() throws Exception {
             task.state = ManagedTaskState.RUNNING;
             task.startedMs = time.milliseconds();
             for (String workerName : nodeNames) {
-                task.workerStates.put(workerName, new 
WorkerReceiving(task.spec));
-                nodeManagers.get(workerName).createWorker(task.id, task.spec);
+                long workerId = nextWorkerId++;
+                task.workerIds.put(workerName, workerId);
+                workerStates.put(workerId, new WorkerReceiving(task.id, 
task.spec));
+                nodeManagers.get(workerName).createWorker(workerId, task.id, 
task.spec);
             }
             return null;
         }
@@ -385,18 +405,20 @@ public Void call() throws Exception {
      * Stop a task.
      *
      * @param id                    The ID of the task to stop.
-     * @return                      The specification of the task which was 
stopped, or null if there
-     *                              was no task found with the given ID.
      */
-    public TaskSpec stopTask(final String id) throws ExecutionException, 
InterruptedException {
-        final TaskSpec spec = executor.submit(new CancelTask(id)).get();
-        return spec;
+    public void stopTask(final String id) throws Throwable {
+        try {
+            executor.submit(new CancelTask(id)).get();
+        } catch (ExecutionException e) {
+            log.info("stopTask(id={}) error", id, e);
+            throw e.getCause();
+        }
     }
 
     /**
      * Handles cancelling a task.  Processed by the state change thread.
      */
-    class CancelTask implements Callable<TaskSpec> {
+    class CancelTask implements Callable<Void> {
         private final String id;
 
         CancelTask(String id) {
@@ -404,7 +426,10 @@ public TaskSpec stopTask(final String id) throws 
ExecutionException, Interrupted
         }
 
         @Override
-        public TaskSpec call() throws Exception {
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in 
stopTask request.");
+            }
             ManagedTask task = tasks.get(id);
             if (task == null) {
                 log.info("Can't cancel non-existent task {}.", id);
@@ -420,16 +445,21 @@ public TaskSpec call() throws Exception {
                     break;
                 case RUNNING:
                     task.cancelled = true;
-                    TreeSet<String> activeWorkers = task.activeWorkers();
-                    if (activeWorkers.isEmpty()) {
-                        log.info("Task {} is now complete with error: {}", id, 
task.error);
+                    TreeMap<String, Long> activeWorkerIds = 
task.activeWorkerIds();
+                    if (activeWorkerIds.isEmpty()) {
+                        if (task.error.isEmpty()) {
+                            log.info("Task {} is now complete with no 
errors.", id);
+                        } else {
+                            log.info("Task {} is now complete with error: {}", 
id, task.error);
+                        }
                         task.doneMs = time.milliseconds();
                         task.state = ManagedTaskState.DONE;
                     } else {
-                        for (String workerName : activeWorkers) {
-                            nodeManagers.get(workerName).stopWorker(id);
+                        for (Map.Entry<String, Long> entry : 
activeWorkerIds.entrySet()) {
+                            
nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
                         }
-                        log.info("Cancelling task {} on worker(s): {}", id, 
Utils.join(activeWorkers, ", "));
+                        log.info("Cancelling task {} with worker(s) {}",
+                            id, Utils.mkString(activeWorkerIds, "", "", " = ", 
", "));
                         task.state = ManagedTaskState.STOPPING;
                     }
                     break;
@@ -440,7 +470,48 @@ public TaskSpec call() throws Exception {
                     log.info("Can't cancel task {} because it is already 
done.", id);
                     break;
             }
-            return task.spec;
+            return null;
+        }
+    }
+
+    public void destroyTask(String id) throws Throwable {
+        try {
+            executor.submit(new DestroyTask(id)).get();
+        } catch (ExecutionException e) {
+            log.info("destroyTask(id={}) error", id, e);
+            throw e.getCause();
+        }
+    }
+
+    /**
+     * Handles destroying a task.  Processed by the state change thread.
+     */
+    class DestroyTask implements Callable<Void> {
+        private final String id;
+
+        DestroyTask(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in 
destroyTask request.");
+            }
+            ManagedTask task = tasks.remove(id);
+            if (task == null) {
+                log.info("Can't destroy task {}: no such task found.", id);
+                return null;
+            }
+            log.info("Destroying task {}.", id);
+            task.clearStartFuture();
+            for (Map.Entry<String, Long> entry : task.workerIds.entrySet()) {
+                long workerId = entry.getValue();
+                workerStates.remove(workerId);
+                String nodeName = entry.getKey();
+                nodeManagers.get(nodeName).destroyWorker(workerId);
+            }
+            return null;
         }
     }
 
@@ -448,38 +519,48 @@ public TaskSpec call() throws Exception {
      * Update the state of a particular agent's worker.
      *
      * @param nodeName      The node where the agent is running.
-     * @param id            The worker name.
+     * @param workerId      The worker ID.
      * @param state         The worker state.
      */
-    public void updateWorkerState(String nodeName, String id, WorkerState 
state) {
-        executor.submit(new UpdateWorkerState(nodeName, id, state));
+    public void updateWorkerState(String nodeName, long workerId, WorkerState 
state) {
+        executor.submit(new UpdateWorkerState(nodeName, workerId, state));
     }
 
+    /**
+     * Updates the state of a worker.  Process by the state change thread.
+     */
     class UpdateWorkerState implements Callable<Void> {
         private final String nodeName;
-        private final String id;
-        private final WorkerState state;
+        private final long workerId;
+        private final WorkerState nextState;
 
-        UpdateWorkerState(String nodeName, String id, WorkerState state) {
+        UpdateWorkerState(String nodeName, long workerId, WorkerState 
nextState) {
             this.nodeName = nodeName;
-            this.id = id;
-            this.state = state;
+            this.workerId = workerId;
+            this.nextState = nextState;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedTask task = tasks.get(id);
-            if (task == null) {
-                log.error("Can't update worker state unknown worker {} on node 
{}",
-                    id, nodeName);
-                return null;
-            }
-            WorkerState prevState = task.workerStates.get(nodeName);
-            log.debug("Task {}: Updating worker state for {} from {} to {}.",
-                id, nodeName, prevState, state);
-            task.workerStates.put(nodeName, state);
-            if (state.done() && (!prevState.done())) {
-                handleWorkerCompletion(task, nodeName, (WorkerDone) state);
+            try {
+                WorkerState prevState = workerStates.get(workerId);
+                if (prevState == null) {
+                    throw new RuntimeException("Unable to find workerId " + 
workerId);
+                }
+                ManagedTask task = tasks.get(prevState.taskId());
+                if (task == null) {
+                    throw new RuntimeException("Unable to find taskId " + 
prevState.taskId());
+                }
+                log.debug("Task {}: Updating worker state for {} on {} from {} 
to {}.",
+                    task.id, workerId, nodeName, prevState, nextState);
+                workerStates.put(workerId, nextState);
+                if (nextState.done() && (!prevState.done())) {
+                    handleWorkerCompletion(task, nodeName, (WorkerDone) 
nextState);
+                }
+            } catch (Exception e) {
+                log.error("Error updating worker state for {} on {}.  Stopping 
worker.",
+                    workerId, nodeName, e);
+                nodeManagers.get(nodeName).stopWorker(workerId);
             }
             return null;
         }
@@ -501,19 +582,19 @@ private void handleWorkerCompletion(ManagedTask task, 
String nodeName, WorkerDon
                 nodeName, task.id, state.error(), 
JsonUtil.toJsonString(state.status()));
             task.maybeSetError(state.error());
         }
-        if (task.activeWorkers().isEmpty()) {
+        TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
+        if (activeWorkerIds.isEmpty()) {
             task.doneMs = time.milliseconds();
             task.state = ManagedTaskState.DONE;
             log.info("{}: Task {} is now complete on {} with error: {}",
-                nodeName, task.id, Utils.join(task.workerStates.keySet(), ", 
"),
+                nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
                 task.error.isEmpty() ? "(none)" : task.error);
         } else if ((task.state == ManagedTaskState.RUNNING) && 
(!task.error.isEmpty())) {
-            TreeSet<String> activeWorkers = task.activeWorkers();
             log.info("{}: task {} stopped with error {}.  Stopping worker(s): 
{}",
-                nodeName, task.id, task.error, Utils.join(activeWorkers, ", 
"));
+                nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, 
"{", "}", ": ", ", "));
             task.state = ManagedTaskState.STOPPING;
-            for (String workerName : activeWorkers) {
-                nodeManagers.get(workerName).stopWorker(task.id);
+            for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
+                nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
             }
         }
     }
@@ -525,6 +606,9 @@ public TasksResponse tasks(TasksRequest request) throws 
ExecutionException, Inte
         return executor.submit(new GetTasksResponse(request)).get();
     }
 
+    /**
+     * Gets information about the tasks being managed.  Processed by the state 
change thread.
+     */
     class GetTasksResponse implements Callable<TasksResponse> {
         private final TasksRequest request;
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index c505e75e3ff..d41a54b2881 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -27,13 +27,13 @@
  */
 public class AgentStatusResponse extends Message {
     private final long serverStartMs;
-    private final TreeMap<String, WorkerState> workers;
+    private final TreeMap<Long, WorkerState> workers;
 
     @JsonCreator
     public AgentStatusResponse(@JsonProperty("serverStartMs") long 
serverStartMs,
-            @JsonProperty("workers") TreeMap<String, WorkerState> workers) {
+            @JsonProperty("workers") TreeMap<Long, WorkerState> workers) {
         this.serverStartMs = serverStartMs;
-        this.workers = workers == null ? new TreeMap<String, WorkerState>() : 
workers;
+        this.workers = workers == null ? new TreeMap<Long, WorkerState>() : 
workers;
     }
 
     @JsonProperty
@@ -42,7 +42,7 @@ public long serverStartMs() {
     }
 
     @JsonProperty
-    public TreeMap<String, WorkerState> workers() {
+    public TreeMap<Long, WorkerState> workers() {
         return workers;
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
index 9f6e8dcf0d2..4acc943251e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
@@ -25,19 +25,27 @@
  * A request to the Trogdor agent to create a worker.
  */
 public class CreateWorkerRequest extends Message {
-    private final String id;
+    private final long workerId;
+    private final String taskId;
     private final TaskSpec spec;
 
     @JsonCreator
-    public CreateWorkerRequest(@JsonProperty("id") String id,
+    public CreateWorkerRequest(@JsonProperty("workerId") long workerId,
+            @JsonProperty("taskId") String taskId,
             @JsonProperty("spec") TaskSpec spec) {
-        this.id = id;
+        this.workerId = workerId;
+        this.taskId = taskId;
         this.spec = spec;
     }
 
     @JsonProperty
-    public String id() {
-        return id;
+    public long workerId() {
+        return workerId;
+    }
+
+    @JsonProperty
+    public String taskId() {
+        return taskId;
     }
 
     @JsonProperty
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
deleted file mode 100644
index 9e068eccd7a..00000000000
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
-/**
- * A response from the Trogdor agent about creating a worker.
- */
-public class CreateWorkerResponse extends Message {
-    private final TaskSpec spec;
-
-    @JsonCreator
-    public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
-    }
-
-    @JsonProperty
-    public TaskSpec spec() {
-        return spec;
-    }
-}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
similarity index 74%
rename from 
tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
rename to 
tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
index 7d5b4687db3..d782d5d1cfb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
@@ -19,21 +19,20 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
- * A response from the Trogdor agent about stopping a worker.
+ * A request to the Trogdor coordinator to delete all memory of a task.
  */
-public class StopWorkerResponse extends Message {
-    private final TaskSpec spec;
+public class DestroyTaskRequest extends Message {
+    private final String id;
 
     @JsonCreator
-    public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
+    public DestroyTaskRequest(@JsonProperty("id") String id) {
+        this.id = id;
     }
 
     @JsonProperty
-    public TaskSpec spec() {
-        return spec;
+    public String id() {
+        return id;
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
similarity index 75%
rename from 
tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
rename to 
tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
index f344dc9666a..e5a8969d4cd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
@@ -19,21 +19,20 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
- * A response from the Trogdor coordinator about stopping a task.
+ * A request to the Trogdor agent to delete all memory of a task.
  */
-public class StopTaskResponse extends Message {
-    private final TaskSpec spec;
+public class DestroyWorkerRequest extends Message {
+    private final long workerId;
 
     @JsonCreator
-    public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
+    public DestroyWorkerRequest(@JsonProperty("workerId") long workerId) {
+        this.workerId = workerId;
     }
 
     @JsonProperty
-    public TaskSpec spec() {
-        return spec;
+    public long workerId() {
+        return workerId;
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
similarity index 64%
rename from 
tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
rename to 
tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
index 54ea0f23c97..2701f6af8f9 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
@@ -17,23 +17,17 @@
 
 package org.apache.kafka.trogdor.rest;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
 /**
- * A response from the Trogdor coordinator about creating a task.
+ * Indicates that a given request got an HTTP error 409: CONFLICT.
  */
-public class CreateTaskResponse extends Message {
-    private final TaskSpec spec;
+public class RequestConflictException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
 
-    @JsonCreator
-    public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
+    public RequestConflictException(String message) {
+        super(message);
     }
 
-    @JsonProperty
-    public TaskSpec spec() {
-        return spec;
+    public RequestConflictException() {
+        super();
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
index f62a775fad9..57c54ec04d8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
@@ -18,6 +18,7 @@
 
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,8 @@ public Response toResponse(Throwable e) {
         }
         if (e instanceof NotFoundException) {
             return buildResponse(Response.Status.NOT_FOUND, e);
+        } else if (e instanceof InvalidRequestException) {
+            return buildResponse(Response.Status.BAD_REQUEST, e);
         } else if (e instanceof InvalidTypeIdException) {
             return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
         } else if (e instanceof JsonMappingException) {
@@ -46,6 +49,8 @@ public Response toResponse(Throwable e) {
             return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
         } else if (e instanceof SerializationException) {
             return buildResponse(Response.Status.BAD_REQUEST, e);
+        } else if (e instanceof RequestConflictException) {
+            return buildResponse(Response.Status.CONFLICT, e);
         } else {
             return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e);
         }
@@ -57,7 +62,9 @@ public static Exception toException(int code, String msg) 
throws Exception {
         } else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) {
             throw new ClassNotFoundException(msg);
         } else if (code == Response.Status.BAD_REQUEST.getStatusCode()) {
-            throw new SerializationException(msg);
+            throw new InvalidRequestException(msg);
+        } else if (code == Response.Status.CONFLICT.getStatusCode()) {
+            throw new RequestConflictException(msg);
         } else {
             throw new RuntimeException(msg);
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
index 3287801d303..704a961f99e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
@@ -28,7 +28,7 @@
 
     @JsonCreator
     public StopTaskRequest(@JsonProperty("id") String id) {
-        this.id = id;
+        this.id = (id == null) ? "" : id;
     }
 
     @JsonProperty
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
index 54c689adfcd..c1dcff363c8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
@@ -24,15 +24,15 @@
  * A request to the Trogdor agent to stop a worker.
  */
 public class StopWorkerRequest extends Message {
-    private final String id;
+    private final long workerId;
 
     @JsonCreator
-    public StopWorkerRequest(@JsonProperty("id") String id) {
-        this.id = id;
+    public StopWorkerRequest(@JsonProperty("workerId") long workerId) {
+        this.workerId = workerId;
     }
 
     @JsonProperty
-    public String id() {
-        return id;
+    public long workerId() {
+        return workerId;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index 500d3c6a0c2..5f773bba5c4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -49,12 +49,13 @@
     private final String error;
 
     @JsonCreator
-    public WorkerDone(@JsonProperty("spec") TaskSpec spec,
+    public WorkerDone(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
             @JsonProperty("status") JsonNode status,
             @JsonProperty("error") String error) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
         this.status = status == null ? NullNode.instance : status;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index 70687743f74..1babcce2a57 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -29,8 +29,9 @@
  */
 public final class WorkerReceiving extends WorkerState {
     @JsonCreator
-    public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+    public WorkerReceiving(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec) {
+        super(taskId, spec);
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index af8ee88a1ab..15e77528d62 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -39,10 +39,11 @@
     private final JsonNode status;
 
     @JsonCreator
-    public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
+    public WorkerRunning(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("status") JsonNode status) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.status = status == null ? NullNode.instance : status;
     }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index b568ec1f887..7a06eac5b7d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -28,8 +28,9 @@
  */
 public final class WorkerStarting extends WorkerState {
     @JsonCreator
-    public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+    public WorkerStarting(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec) {
+        super(taskId, spec);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 044d719f894..6480a2410dc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -38,12 +38,19 @@
     @JsonSubTypes.Type(value = WorkerDone.class, name = "DONE")
     })
 public abstract class WorkerState extends Message {
+    private final String taskId;
     private final TaskSpec spec;
 
-    public WorkerState(TaskSpec spec) {
+    public WorkerState(String taskId, TaskSpec spec) {
+        this.taskId = taskId;
         this.spec = spec;
     }
 
+    @JsonProperty
+    public String taskId() {
+        return taskId;
+    }
+
     @JsonProperty
     public TaskSpec spec() {
         return spec;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 9fbb3ff7306..2942e118ac6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -39,10 +39,11 @@
     private final JsonNode status;
 
     @JsonCreator
-    public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
+    public WorkerStopping(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("status") JsonNode status) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.status = status == null ? NullNode.instance : status;
     }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 61de5c98797..158e690da4b 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -36,8 +36,9 @@
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -120,36 +121,47 @@ public void testAgentCreateWorkers() throws Exception {
         new ExpectedTasks().waitFor(client);
 
         final NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 600000);
-        CreateWorkerResponse response = client.createWorker(new 
CreateWorkerRequest("foo", fooSpec));
-        assertEquals(fooSpec.toString(), response.spec().toString());
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
         final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+
+        try {
+            client.createWorker(new CreateWorkerRequest(1, "foo", barSpec));
+            Assert.fail("Expected RequestConflictException when re-creating a 
request with a different taskId.");
+        } catch (RequestConflictException exception) {
+        }
+        try {
+            client.createWorker(new CreateWorkerRequest(1, "bar", fooSpec));
+            Assert.fail("Expected RequestConflictException when re-creating a 
request with a different spec.");
+        } catch (RequestConflictException exception) {
+        }
+
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
         final NoOpTaskSpec bazSpec = new NoOpTaskSpec(1, 450000);
-        client.createWorker(new CreateWorkerRequest("baz", bazSpec));
+        client.createWorker(new CreateWorkerRequest(2, "baz", bazSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("baz").
-                workerState(new WorkerRunning(bazSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("baz", bazSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -167,23 +179,23 @@ public void testAgentFinishesTasks() throws Exception {
         new ExpectedTasks().waitFor(client);
 
         final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
-        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
 
         final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, new 
TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 1, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -191,21 +203,21 @@ public void testAgentFinishesTasks() throws Exception {
 
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, new 
TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, 0, 2, new 
TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, new 
TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 1, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(5);
-        client.stopWorker(new StopWorkerRequest("bar"));
+        client.stopWorker(new StopWorkerRequest(1));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, new 
TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, 0, 2, new 
TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 1, 7, new 
TextNode("done"), "")).
+                workerState(new WorkerDone("bar", barSpec, 1, 7, new 
TextNode("done"), "")).
                 build()).
             waitFor(client);
 
@@ -224,25 +236,25 @@ public void testWorkerCompletions() throws Exception {
 
         SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
             Collections.singletonMap("node01", 1L), "");
-        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
                 build()).
             waitFor(client);
 
         SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
             Collections.singletonMap("node01", 2L), "baz");
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1,
+                workerState(new WorkerDone("foo", fooSpec, 0, 1,
                     new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0,
+                workerState(new WorkerRunning("bar", barSpec, 0,
                     new TextNode("active"))).
                 build()).
             waitFor(client);
@@ -250,11 +262,11 @@ public void testWorkerCompletions() throws Exception {
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1,
+                workerState(new WorkerDone("foo", fooSpec, 0, 1,
                     new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 0, 2,
+                workerState(new WorkerDone("bar", barSpec, 0, 2,
                     new TextNode("halted"), "baz")).
                 build()).
             waitFor(client);
@@ -293,37 +305,84 @@ public void testKiboshFaults() throws Exception {
             Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
             FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 
900000,
                 Collections.singleton("myAgent"), 
mockKibosh.tempDir.getPath().toString(), "/foo", 123);
-            client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+            client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("Added fault foo"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("Added fault foo"))).
                     build()).
                 waitFor(client);
             Assert.assertEquals(new 
KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/foo", 123))), 
mockKibosh.read());
             FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 
900000,
                 Collections.singleton("myAgent"), 
mockKibosh.tempDir.getPath().toString(), "/bar", 456);
-            client.createWorker(new CreateWorkerRequest("bar", barSpec));
+            client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, new 
TextNode("Added fault foo"))).build()).
+                    workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("Added fault foo"))).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, new 
TextNode("Added fault bar"))).build()).
+                    workerState(new WorkerRunning("bar", barSpec, 0, new 
TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(new 
ArrayList<Kibosh.KiboshFaultSpec>() {{
                     add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
                     add(new KiboshFilesUnreadableFaultSpec("/bar", 456));
                 }}), mockKibosh.read());
             time.sleep(1);
-            client.stopWorker(new StopWorkerRequest("foo"));
+            client.stopWorker(new StopWorkerRequest(0));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerDone(fooSpec, 0, 1, new 
TextNode("Removed fault foo"), "")).build()).
+                    workerState(new WorkerDone("foo", fooSpec, 0, 1, new 
TextNode("Removed fault foo"), "")).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, new 
TextNode("Added fault bar"))).build()).
+                    workerState(new WorkerRunning("bar", barSpec, 0, new 
TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new 
KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/bar", 456))), 
mockKibosh.read());
         }
     }
+
+    @Test
+    public void testDestroyWorkers() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
+        new ExpectedTasks().waitFor(client);
+
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5);
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning("foo", fooSpec, 0, new 
TextNode("active"))).
+                build()).
+            waitFor(client);
+        time.sleep(1);
+
+        client.destroyWorker(new DestroyWorkerRequest(0));
+        client.destroyWorker(new DestroyWorkerRequest(0));
+        client.destroyWorker(new DestroyWorkerRequest(1));
+        new ExpectedTasks().waitFor(client);
+        time.sleep(1);
+
+        final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1);
+        client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning("foo", fooSpec2, 2, new 
TextNode("active"))).
+                build()).
+            waitFor(client);
+
+        time.sleep(2);
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone("foo", fooSpec2, 2, 4, new 
TextNode("done"), "")).
+                build()).
+            waitFor(client);
+
+        time.sleep(1);
+        client.destroyWorker(new DestroyWorkerRequest(1));
+        new ExpectedTasks().waitFor(client);
+
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
 };
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 617bf34bcd9..121281f5910 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -32,6 +32,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -184,10 +185,14 @@ public boolean conditionMet() {
                     throw new RuntimeException(e);
                 }
                 StringBuilder errors = new StringBuilder();
+                HashMap<String, WorkerState> taskIdToWorkerState = new 
HashMap<>();
+                for (WorkerState state : status.workers().values()) {
+                    taskIdToWorkerState.put(state.taskId(), state);
+                }
                 for (Map.Entry<String, ExpectedTask> entry : 
expected.entrySet()) {
                     String id = entry.getKey();
                     ExpectedTask worker = entry.getValue();
-                    String differences = 
worker.compare(status.workers().get(id));
+                    String differences = 
worker.compare(taskIdToWorkerState.get(id));
                     if (differences != null) {
                         errors.append(differences);
                     }
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 8101d9c6e4e..c1f7490cc82 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -45,9 +45,9 @@ public void testDeserializationDoesNotProduceNulls() throws 
Exception {
         verify(new ProcessStopFaultSpec(0, 0, null, null));
         verify(new AgentStatusResponse(0, null));
         verify(new TasksResponse(null));
-        verify(new WorkerDone(null, 0, 0, null, null));
-        verify(new WorkerRunning(null, 0, null));
-        verify(new WorkerStopping(null, 0, null));
+        verify(new WorkerDone(null, null, 0, 0, null, null));
+        verify(new WorkerRunning(null, null, 0, null));
+        verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
             0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 
3));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 07f02c5830b..46315c27d15 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -185,7 +185,7 @@ public Void call() throws Exception {
                             }
                             if (node.coordinatorRestResource != null) {
                                 node.coordinator = new 
Coordinator(node.platform, scheduler,
-                                    node.coordinatorRestServer, 
node.coordinatorRestResource);
+                                    node.coordinatorRestServer, 
node.coordinatorRestResource, 0);
                             }
                         } catch (Exception e) {
                             log.error("Unable to initialize {}", nodeName, e);
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 34d7ffe6106..e9434844405 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -35,6 +35,8 @@
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
@@ -57,8 +59,9 @@
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class CoordinatorTest {
     private static final Logger log = 
LoggerFactory.getLogger(CoordinatorTest.class);
@@ -96,11 +99,25 @@ public void testCreateTask() throws Exception {
                     build()).
                 waitFor(cluster.coordinatorClient());
 
+            // Re-creating a task with the same arguments is not an error.
+            cluster.coordinatorClient().createTask(
+                new CreateTaskRequest("foo", fooSpec));
+
+            // Re-creating a task with different arguments gives a 
RequestConflictException.
+            try {
+                NoOpTaskSpec barSpec = new NoOpTaskSpec(1000, 2000);
+                cluster.coordinatorClient().createTask(
+                    new CreateTaskRequest("foo", barSpec));
+                fail("Expected to get an exception when re-creating a task 
with a " +
+                    "different task spec.");
+            } catch (RequestConflictException exception) {
+            }
+
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
-                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 waitFor(cluster.coordinatorClient()).
                 waitFor(cluster.agentClient("node02"));
@@ -149,7 +166,7 @@ public void testTaskDistribution() throws Exception {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 11, status1)).
-                    workerState(new WorkerRunning(fooSpec, 11,  new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 11,  new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -163,7 +180,7 @@ public void testTaskDistribution() throws Exception {
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskDone(fooSpec, 11, 13,
                         "", false, status2)).
-                    workerState(new WorkerDone(fooSpec, 11, 13, new 
TextNode("done"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 13, new 
TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -206,7 +223,7 @@ public void testTaskCancellation() throws Exception {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 11, status1)).
-                    workerState(new WorkerRunning(fooSpec, 11, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 11, new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -221,11 +238,68 @@ public void testTaskCancellation() throws Exception {
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskDone(fooSpec, 11, 12, "",
                         true, status2)).
-                    workerState(new WorkerDone(fooSpec, 11, 12, new 
TextNode("done"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 12, new 
TextNode("done"), "")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+        }
+    }
+
+    @Test
+    public void testTaskDestruction() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            AgentClient agentClient1 = cluster.agentClient("node01");
+            AgentClient agentClient2 = cluster.agentClient("node02");
+
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2);
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            coordinatorClient.createTask(new CreateTaskRequest("foo", 
fooSpec));
+            NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20);
+            coordinatorClient.createTask(new CreateTaskRequest("bar", 
barSpec));
+            coordinatorClient.destroyTask(new DestroyTaskRequest("bar"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").taskState(new 
TaskPending(fooSpec)).build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+            time.sleep(10);
+
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 10, status1)).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
+
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
         }
     }
 
@@ -397,7 +471,7 @@ public void testTasksRequest() throws Exception {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
-                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 addTask(new ExpectedTaskBuilder("bar").
                     taskState(new TaskPending(barSpec)).
@@ -448,7 +522,7 @@ public void testWorkersExitingAtDifferentTimes() throws 
Exception {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status1)).
-                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(cluster.agentClient("node02")).
@@ -461,14 +535,14 @@ public void testWorkersExitingAtDifferentTimes() throws 
Exception {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status2)).
-                    workerState(new WorkerRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(cluster.agentClient("node03"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status2)).
-                    workerState(new WorkerDone(fooSpec, 2, 12, new 
TextNode("halted"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 2, 12, new 
TextNode("halted"), "")).
                     build()).
                 waitFor(cluster.agentClient("node02"));
 
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
index c40f958eb8d..9c7f7522853 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
@@ -24,6 +24,8 @@
 import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.Response;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.junit.Test;
 
@@ -67,6 +69,13 @@ public void testToResponseSerializationException() {
         assertEquals(resp.getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
     }
 
+    @Test
+    public void testToResponseInvalidRequestException() {
+        RestExceptionMapper mapper = new RestExceptionMapper();
+        Response resp = mapper.toResponse(new InvalidRequestException("invalid 
request"));
+        assertEquals(resp.getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+    }
+
     @Test
     public void testToResponseUnknownException() {
         RestExceptionMapper mapper = new RestExceptionMapper();
@@ -84,7 +93,7 @@ public void testToExceptionClassNotFoundException() throws 
Exception {
         
RestExceptionMapper.toException(Response.Status.NOT_IMPLEMENTED.getStatusCode(),
 "Not Implemented");
     }
 
-    @Test(expected = SerializationException.class)
+    @Test(expected = InvalidRequestException.class)
     public void testToExceptionSerializationException() throws Exception {
         
RestExceptionMapper.toException(Response.Status.BAD_REQUEST.getStatusCode(), 
"Bad Request");
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor should support destroying tasks
> ---------------------------------------
>
>                 Key: KAFKA-6696
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6696
>             Project: Kafka
>          Issue Type: Improvement
>          Components: system tests
>            Reporter: Colin P. McCabe
>            Assignee: Colin P. McCabe
>            Priority: Major
>
> Trogdor should support destroying tasks.  This will make it more practical to 
> have very long running Trogdor instances.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to