This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5a95c2e Add '?expand' query param for additional info on '/connectors'. (#6658) 5a95c2e is described below commit 5a95c2e1cd555d5f3ec148cc7c765d1bb7d716f9 Author: dan norwood <norw...@confluent.io> AuthorDate: Thu May 16 14:29:30 2019 -0700 Add '?expand' query param for additional info on '/connectors'. (#6658) Per KIP-465, kept existing behavior of `/connectors` resource in the Connect's REST API, but added the ability to specify `?expand` query parameter to get list of connectors with status details on each connector. Added unit tests, and verified passing existing system tests (which use the older list form). See https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API. Author: Dan Norwood <norw...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- .../kafka/connect/runtime/AbstractHerder.java | 21 ++++ .../org/apache/kafka/connect/runtime/Herder.java | 12 ++ .../runtime/distributed/DistributedHerder.java | 5 +- .../runtime/rest/resources/ConnectorsResource.java | 44 +++++++- .../runtime/standalone/StandaloneHerder.java | 8 +- .../kafka/connect/runtime/AbstractHerderTest.java | 50 +++++++++ .../runtime/distributed/DistributedHerderTest.java | 4 +- .../kafka/connect/runtime/rest/RestServerTest.java | 17 +-- .../rest/resources/ConnectorsResourceTest.java | 122 +++++++++++++++++---- 9 files changed, 232 insertions(+), 51 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 82fdecc..8e7d016 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -209,6 +209,27 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con protected abstract Map<String, String> config(String connName); @Override + public Collection<String> connectors() { + return configBackingStore.snapshot().connectors(); + } + + @Override + public ConnectorInfo connectorInfo(String connector) { + final ClusterConfigState configState = configBackingStore.snapshot(); + + if (!configState.contains(connector)) + return null; + Map<String, String> config = configState.rawConnectorConfig(connector); + + return new ConnectorInfo( + connector, + config, + configState.tasks(connector), + connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)) + ); + } + + @Override public ConnectorStateInfo connectorStatus(String connName) { ConnectorStatus connector = statusBackingStore.get(connName); if (connector == null) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index c572e20..550348f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -124,6 +124,18 @@ public interface Herder { void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback); /** + * Get a list of connectors currently running in this cluster. + * @returns A list of connector names + */ + Collection<String> connectors(); + + /** + * Get the definition and status of a connector. + * @param connName name of the connector + */ + ConnectorInfo connectorInfo(String connName); + + /** * Lookup the current status of a connector. * @param connName name of the connector */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 711b6c9..96d4bfc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -451,10 +451,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (!configState.contains(connName)) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { - Map<String, String> config = configState.rawConnectorConfig(connName); - callback.onCompletion(null, new ConnectorInfo(connName, config, - configState.tasks(connName), - connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))); + callback.onCompletion(null, connectorInfo(connName)); } return null; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 4a04512..61cf5da 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; + +import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -44,11 +46,14 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; + import java.net.URI; -import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -79,11 +84,38 @@ public class ConnectorsResource { @GET @Path("/") - public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable { - FutureCallback<Collection<String>> cb = new FutureCallback<>(); - herder.connectors(cb); - return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() { - }, forward); + public Response listConnectors( + final @Context UriInfo uriInfo + ) throws Throwable { + if (uriInfo.getQueryParameters().containsKey("expand")) { + Map<String, Map<String, Object>> out = new HashMap<>(); + for (String connector : herder.connectors()) { + try { + Map<String, Object> connectorExpansions = new HashMap<>(); + for (String expansion : uriInfo.getQueryParameters().get("expand")) { + switch (expansion) { + case "status": + connectorExpansions.put("status", herder.connectorStatus(connector)); + break; + case "info": + connectorExpansions.put("info", herder.connectorInfo(connector)); + break; + default: + log.info("Ignoring unknown expanion type {}", expansion); + } + } + out.put(connector, connectorExpansions); + } catch (NotFoundException e) { + // this likely means that a connector has been removed while we look its info up + // we can just not include this connector in the return entity + log.debug("Unable to get connector info for {} on this worker", connector); + } + + } + return Response.ok(out).build(); + } else { + return Response.ok(herder.connectors()).build(); + } } @POST diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 172c9b2..7b6d16a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -103,7 +103,7 @@ public class StandaloneHerder extends AbstractHerder { // There's no coordination/hand-off to do here since this is all standalone. Instead, we // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all // the tasks. - for (String connName : configState.connectors()) { + for (String connName : connectors()) { removeConnectorTasks(connName); worker.stopConnector(connName); } @@ -118,12 +118,12 @@ public class StandaloneHerder extends AbstractHerder { @Override public synchronized void connectors(Callback<Collection<String>> callback) { - callback.onCompletion(null, configState.connectors()); + callback.onCompletion(null, connectors()); } - + @Override public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) { - ConnectorInfo connectorInfo = createConnectorInfo(connName); + ConnectorInfo connectorInfo = connectorInfo(connName); if (connectorInfo == null) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); return; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index fc557c6..f7ee8a6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -125,6 +125,56 @@ public class AbstractHerderTest { @MockStrict private StatusBackingStore statusStore; @Test + public void testConnectors() { + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor( + Worker.class, + String.class, + String.class, + StatusBackingStore.class, + ConfigBackingStore.class + ) + .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore) + .addMockedMethod("generation") + .createMock(); + + EasyMock.expect(herder.generation()).andStubReturn(generation); + EasyMock.expect(herder.config(connector)).andReturn(null); + EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT); + replayAll(); + assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors())); + PowerMock.verifyAll(); + } + + @Test + public void testConnectorStatus() { + ConnectorTaskId taskId = new ConnectorTaskId(connector, 0); + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor( + Worker.class, + String.class, + String.class, + StatusBackingStore.class, + ConfigBackingStore.class + ) + .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore) + .addMockedMethod("generation") + .createMock(); + + EasyMock.expect(herder.generation()).andStubReturn(generation); + EasyMock.expect(herder.config(connector)).andReturn(null); + EasyMock.expect(statusStore.get(connector)) + .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation)); + EasyMock.expect(statusStore.getAll(connector)) + .andReturn(Collections.singletonList( + new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation))); + + replayAll(); + ConnectorStateInfo csi = herder.connectorStatus(connector); + PowerMock.verifyAll(); + } + + @Test public void connectorStatus() { ConnectorTaskId taskId = new ConnectorTaskId(connector, 0); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index b8ec0f1..fcac1fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1295,6 +1295,7 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT).times(2); WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class); EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) @@ -1376,6 +1377,7 @@ public class DistributedHerderTest { EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList())); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { @@ -1388,7 +1390,7 @@ public class DistributedHerderTest { }); // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart // connector without rebalance - EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG).times(2); worker.stopConnector(CONN1); PowerMock.expectLastCall().andReturn(true); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 91aa5e7..3609fb3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -30,8 +30,6 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.apache.kafka.connect.util.Callback; -import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -47,7 +45,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -212,12 +209,7 @@ public class RestServerTest { ConnectRestExtension.class)) .andStubReturn(Collections.emptyList()); - final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture(); - herder.connectors(EasyMock.capture(connectorsCallback)); - PowerMock.expectLastCall().andAnswer(() -> { - connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b")); - return null; - }); + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b")); PowerMock.replayAll(); @@ -270,12 +262,7 @@ public class RestServerTest { workerConfig, ConnectRestExtension.class)).andStubReturn(Collections.emptyList()); - final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture(); - herder.connectors(EasyMock.capture(connectorsCallback)); - PowerMock.expectLastCall().andAnswer(() -> { - connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b")); - return null; - }); + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b")); PowerMock.replayAll(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index f84cd25..5dc7f1e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.connect.errors.AlreadyExistsException; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -28,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException; import org.apache.kafka.connect.runtime.distributed.NotLeaderException; import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; @@ -47,6 +47,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import javax.ws.rs.BadRequestException; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; + import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -121,12 +125,18 @@ public class ConnectorsResourceTest { @Mock private Herder herder; private ConnectorsResource connectorsResource; + private UriInfo forward; @Before public void setUp() throws NoSuchMethodException { PowerMock.mockStatic(RestClient.class, RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class)); connectorsResource = new ConnectorsResource(herder, null); + forward = EasyMock.mock(UriInfo.class); + MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); + queryParams.putSingle("forward", "true"); + EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); + EasyMock.replay(forward); } private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) { @@ -137,12 +147,11 @@ public class ConnectorsResourceTest { @Test public void testListConnectors() throws Throwable { final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); PowerMock.replayAll(); - Collection<String> connectors = connectorsResource.listConnectors(FORWARD); + Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward).getEntity(); // Ordering isn't guaranteed, compare sets assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); @@ -150,36 +159,107 @@ public class ConnectorsResourceTest { } @Test - public void testListConnectorsNotLeader() throws Throwable { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"), - EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME))); + public void testExpandConnectorsStatus() throws Throwable { + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); + EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); + EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector); + + forward = EasyMock.mock(UriInfo.class); + MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); + queryParams.putSingle("expand", "status"); + EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); + EasyMock.replay(forward); PowerMock.replayAll(); - Collection<String> connectors = connectorsResource.listConnectors(FORWARD); + Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity(); // Ordering isn't guaranteed, compare sets - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet()); + assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); + assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status")); + PowerMock.verifyAll(); + } + @Test + public void testExpandConnectorsInfo() throws Throwable { + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorInfo connector = EasyMock.mock(ConnectorInfo.class); + ConnectorInfo connector2 = EasyMock.mock(ConnectorInfo.class); + EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connector2); + EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connector); + + forward = EasyMock.mock(UriInfo.class); + MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); + queryParams.putSingle("expand", "info"); + EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); + EasyMock.replay(forward); + + PowerMock.replayAll(); + + Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity(); + // Ordering isn't guaranteed, compare sets + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet()); + assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("info")); + assertEquals(connector, expanded.get(CONNECTOR_NAME).get("info")); PowerMock.verifyAll(); } - @Test(expected = ConnectException.class) - public void testListConnectorsNotSynced() throws Throwable { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - herder.connectors(EasyMock.capture(cb)); - expectAndCallbackException(cb, new ConnectException("not synced")); + @Test + public void testFullExpandConnectors() throws Throwable { + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorInfo connectorInfo = EasyMock.mock(ConnectorInfo.class); + ConnectorInfo connectorInfo2 = EasyMock.mock(ConnectorInfo.class); + EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connectorInfo2); + EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connectorInfo); + ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); + EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); + EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector); + + forward = EasyMock.mock(UriInfo.class); + MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); + queryParams.put("expand", Arrays.asList("info", "status")); + EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); + EasyMock.replay(forward); + + PowerMock.replayAll(); + + Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity(); + // Ordering isn't guaranteed, compare sets + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet()); + assertEquals(connectorInfo2, expanded.get(CONNECTOR2_NAME).get("info")); + assertEquals(connectorInfo, expanded.get(CONNECTOR_NAME).get("info")); + assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); + assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status")); + PowerMock.verifyAll(); + } + + @Test + public void testExpandConnectorsWithConnectorNotFound() throws Throwable { + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); + EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); + EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andThrow(EasyMock.mock(NotFoundException.class)); + + forward = EasyMock.mock(UriInfo.class); + MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); + queryParams.putSingle("expand", "status"); + EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); + EasyMock.replay(forward); PowerMock.replayAll(); - // throws - connectorsResource.listConnectors(FORWARD); + Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward).getEntity(); + // Ordering isn't guaranteed, compare sets + assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet()); + assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); + PowerMock.verifyAll(); } + @Test public void testCreateConnector() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));