C0urante commented on code in PR #12725: URL: https://github.com/apache/kafka/pull/12725#discussion_r991411528
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -153,19 +157,23 @@ public class ConnectorsResourceTest { @Mock private WorkerConfig workerConfig; + private MockedStatic<RestClient> restClientStatic; + @Before public void setUp() throws NoSuchMethodException { - PowerMock.mockStatic(RestClient.class, - RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class)); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - PowerMock.replay(workerConfig); + restClientStatic = mockStatic(RestClient.class); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); connectorsResource = new ConnectorsResource(herder, workerConfig); - forward = EasyMock.mock(UriInfo.class); + forward = mock(UriInfo.class); MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("forward", "true"); - EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); - EasyMock.replay(forward); + when(forward.getQueryParameters()).thenReturn(queryParams); + } + + @After + public void teardown() { + restClientStatic.close(); Review Comment: Can we throw in a call to `verifyNoMoreInteractions(herder);` here? With the existing tests, if something leads to an unexpected call to, e.g., delete a connector, we'll catch that; it's worth trying to preserve those guarantees if possible. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -262,127 +254,96 @@ public void testFullExpandConnectors() { assertEquals(connectorInfo, expanded.get(CONNECTOR_NAME).get("info")); assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status")); - PowerMock.verifyAll(); } @Test public void testExpandConnectorsWithConnectorNotFound() { - EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); - ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); - EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); - EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andThrow(EasyMock.mock(NotFoundException.class)); + when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorStateInfo connector = mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = mock(ConnectorStateInfo.class); + when(herder.connectorStatus(CONNECTOR2_NAME)).thenReturn(connector2); + doThrow(mock(NotFoundException.class)).when(herder).connectorStatus(CONNECTOR_NAME); - forward = EasyMock.mock(UriInfo.class); + forward = mock(UriInfo.class); MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("expand", "status"); - EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); - EasyMock.replay(forward); - - PowerMock.replayAll(); + when(forward.getQueryParameters()).thenReturn(queryParams); Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity(); // Ordering isn't guaranteed, compare sets assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet()); assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); - PowerMock.verifyAll(); } @Test public void testCreateConnector() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, - CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); - - PowerMock.replayAll(); + CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)) + ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorNotLeader() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, - ConnectorType.SOURCE))); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture()); - PowerMock.replayAll(); + // Should forward request + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); - - PowerMock.verifyAll(); - - } @Test public void testCreateConnectorWithHeaderAuthorization() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class); - EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic YWxhZGRpbjpvcGVuc2VzYW1l").times(1); - EasyMock.replay(httpHeaders); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); + HttpHeaders httpHeaders = mock(HttpHeaders.class); + expectAndCallbackNotLeaderException(cb) + .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors?forward=false"), - EasyMock.eq("POST"), EasyMock.eq(httpHeaders), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.createConnector(FORWARD, httpHeaders, body); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorWithoutHeaderAuthorization() throws Throwable { Review Comment: Do we still need this test? It looks like it's identical to `testCreateConnectorWithHeaderAuthorization`; maybe we could consolidate the two into something like `testCreateConnectorWithForwardedHeaders`? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -841,55 +702,45 @@ public void testRestartConnectorAndTasksRequestAccepted() throws Throwable { ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, state, Collections.emptyList(), ConnectorType.SOURCE); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, true, false); - final Capture<Callback<ConnectorStateInfo>> cb = Capture.newInstance(); - herder.restartConnectorAndTasks(EasyMock.eq(restartRequest), EasyMock.capture(cb)); - expectAndCallbackResult(cb, connectorStateInfo); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<ConnectorStateInfo>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb, connectorStateInfo) + .when(herder).restartConnectorAndTasks(eq(restartRequest), cb.capture()); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, restartRequest.includeTasks(), restartRequest.onlyFailed(), FORWARD); assertEquals(CONNECTOR_NAME, ((ConnectorStateInfo) response.getEntity()).name()); assertEquals(state.state(), ((ConnectorStateInfo) response.getEntity()).connector().state()); assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); } @Test public void testFenceZombiesNoInternalRequestSignature() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.fenceZombieSourceTasks(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb), EasyMock.anyObject(InternalRequestSignature.class)); - expectAndCallbackResult(cb, null); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class); Review Comment: Do we need this additional captor? It looks like we don't use one in the EasyMock/PowerMock tests, and we're not doing much with it in the new Mockito tests. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -899,250 +750,199 @@ public void testFenceZombiesWithInternalRequestSignature() throws Throwable { expectedSignature, signatureCapture.getValue() ); - - PowerMock.verifyAll(); } @Test public void testFenceZombiesConnectorNotFound() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.fenceZombieSourceTasks(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb), EasyMock.anyObject(InternalRequestSignature.class)); - - expectAndCallbackException(cb, new NotFoundException("not found")); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); - PowerMock.replayAll(); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any()); assertThrows(NotFoundException.class, () -> connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null))); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorNotFound() { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, FORWARD) ); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorLeaderRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartConnector(eq(CONNECTOR_NAME), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, null); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorOwnerRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, true); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartTaskNotFound() { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartTask(eq(taskId), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD)); - - PowerMock.verifyAll(); } @Test public void testRestartTaskLeaderRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartTask(eq(taskId), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null); - - PowerMock.verifyAll(); } @Test public void testRestartTaskOwnerRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartTask(eq(taskId), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true); - - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking reset is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - EasyMock.expect(herder.connectorActiveTopics(CONNECTOR_NAME)) - .andReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + when(herder.connectorActiveTopics(CONNECTOR_NAME)) + .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); Map<String, Map<String, Object>> body = (Map<String, Map<String, Object>>) response.getEntity(); assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector()); assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS), ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); herder.resetConnectorActiveTopics(CONNECTOR_NAME); Review Comment: This part can be removed; it was only used to set up mocking expectations with EasyMock/PowerMock. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -547,118 +473,88 @@ public void testGetTasksConfig() throws Throwable { final Map<ConnectorTaskId, Map<String, String>> expectedTasksConnector2 = new HashMap<>(); expectedTasksConnector2.put(connector2Task0, connector2Task0Configs); - final Capture<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb1 = Capture.newInstance(); - herder.tasksConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb1)); - expectAndCallbackResult(cb1, expectedTasksConnector); - final Capture<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb2 = Capture.newInstance(); - herder.tasksConfig(EasyMock.eq(CONNECTOR2_NAME), EasyMock.capture(cb2)); - expectAndCallbackResult(cb2, expectedTasksConnector2); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb1 = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).tasksConfig(eq(CONNECTOR_NAME), cb1.capture()); + final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb2 = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).tasksConfig(eq(CONNECTOR2_NAME), cb2.capture()); Map<ConnectorTaskId, Map<String, String>> tasksConfig = connectorsResource.getTasksConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD); assertEquals(expectedTasksConnector, tasksConfig); Map<ConnectorTaskId, Map<String, String>> tasksConfig2 = connectorsResource.getTasksConfig(CONNECTOR2_NAME, NULL_HEADERS, FORWARD); assertEquals(expectedTasksConnector2, tasksConfig2); - - PowerMock.verifyAll(); } @Test(expected = NotFoundException.class) public void testGetTasksConfigConnectorNotFound() throws Throwable { - final Capture<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb = Capture.newInstance(); - herder.tasksConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).tasksConfig(eq(CONNECTOR_NAME), cb.capture()); connectorsResource.getTasksConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD); Review Comment: While we're in the neighborhood, can we replace the `expected = NotFoundException.class` in the `@Test` annotation above with an inline assertion in the test here? ```suggestion assertThrows(NotFoundException.class, () -> connectorsResource.getTasksConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD)); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -262,127 +254,96 @@ public void testFullExpandConnectors() { assertEquals(connectorInfo, expanded.get(CONNECTOR_NAME).get("info")); assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status")); - PowerMock.verifyAll(); } @Test public void testExpandConnectorsWithConnectorNotFound() { - EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); - ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); - EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); - EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andThrow(EasyMock.mock(NotFoundException.class)); + when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorStateInfo connector = mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = mock(ConnectorStateInfo.class); + when(herder.connectorStatus(CONNECTOR2_NAME)).thenReturn(connector2); + doThrow(mock(NotFoundException.class)).when(herder).connectorStatus(CONNECTOR_NAME); - forward = EasyMock.mock(UriInfo.class); + forward = mock(UriInfo.class); MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("expand", "status"); - EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); - EasyMock.replay(forward); - - PowerMock.replayAll(); + when(forward.getQueryParameters()).thenReturn(queryParams); Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity(); // Ordering isn't guaranteed, compare sets assertEquals(Collections.singleton(CONNECTOR2_NAME), expanded.keySet()); assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); - PowerMock.verifyAll(); } @Test public void testCreateConnector() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, - CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); - - PowerMock.replayAll(); + CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)) + ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture()); connectorsResource.createConnector(FORWARD, NULL_HEADERS, body); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorNotLeader() throws Throwable { CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); - // Should forward request - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, - ConnectorType.SOURCE))); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture()); - PowerMock.replayAll(); + // Should forward request + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); Review Comment: Is there an easy way to verify that this method is actually invoked, since the point of these tests is to make sure that the request is forwarded via the `RestClient` (and, in later cases, with the expected headers)? I know that the test fails if we set up incorrect expectations but that may change if we refactor the `RestClient` class; hoping we can get something like strict stubbing but with this static class if possible. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -899,250 +750,199 @@ public void testFenceZombiesWithInternalRequestSignature() throws Throwable { expectedSignature, signatureCapture.getValue() ); - - PowerMock.verifyAll(); } @Test public void testFenceZombiesConnectorNotFound() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.fenceZombieSourceTasks(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb), EasyMock.anyObject(InternalRequestSignature.class)); - - expectAndCallbackException(cb, new NotFoundException("not found")); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); - PowerMock.replayAll(); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any()); assertThrows(NotFoundException.class, () -> connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null))); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorNotFound() { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, FORWARD) ); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorLeaderRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartConnector(eq(CONNECTOR_NAME), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, null); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorOwnerRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, true); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartTaskNotFound() { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartTask(eq(taskId), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD)); - - PowerMock.verifyAll(); } @Test public void testRestartTaskLeaderRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartTask(eq(taskId), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null); - - PowerMock.verifyAll(); } @Test public void testRestartTaskOwnerRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartTask(eq(taskId), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true); - - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking reset is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - EasyMock.expect(herder.connectorActiveTopics(CONNECTOR_NAME)) - .andReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + when(herder.connectorActiveTopics(CONNECTOR_NAME)) + .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); Map<String, Map<String, Object>> body = (Map<String, Map<String, Object>>) response.getEntity(); assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector()); assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS), ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); herder.resetConnectorActiveTopics(CONNECTOR_NAME); - EasyMock.expectLastCall(); - PowerMock.replay(workerConfig); + verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers); assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); } @Test public void testCompleteOrForwardWithErrorAndNoForwardUrl() { - final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); - herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class); String leaderUrl = null; - expectAndCallbackException(cb, new NotLeaderException("not leader", leaderUrl)); - - PowerMock.replayAll(); + expectAndCallbackException(cb, new NotLeaderException("not leader", leaderUrl)) + .when(herder).deleteConnectorConfig(eq(CONNECTOR_NAME), cb.capture()); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD)); assertTrue(e.getMessage().contains("no known leader URL")); - PowerMock.verifyAll(); } private <T> byte[] serializeAsBytes(final T value) throws IOException { return new ObjectMapper().writeValueAsBytes(value); } - private <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) { - PowerMock.expectLastCall().andAnswer(() -> { + private <T> Stubber expectAndCallbackResult(final ArgumentCaptor<Callback<T>> cb, final T value) { + return doAnswer(invocation -> { cb.getValue().onCompletion(null, value); return null; }); } - private <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) { - PowerMock.expectLastCall().andAnswer((IAnswer<Void>) () -> { + private <T> Stubber expectAndCallbackException(final ArgumentCaptor<Callback<T>> cb, final Throwable t) { + return doAnswer(invocation -> { cb.getValue().onCompletion(t, null); return null; }); } - private <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) { - expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL)); + private <T> Stubber expectAndCallbackNotLeaderException(final ArgumentCaptor<Callback<T>> cb) { + return expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL)); Review Comment: This is really smooth, nicely done 👍 ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -899,250 +750,199 @@ public void testFenceZombiesWithInternalRequestSignature() throws Throwable { expectedSignature, signatureCapture.getValue() ); - - PowerMock.verifyAll(); } @Test public void testFenceZombiesConnectorNotFound() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.fenceZombieSourceTasks(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb), EasyMock.anyObject(InternalRequestSignature.class)); - - expectAndCallbackException(cb, new NotFoundException("not found")); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); - PowerMock.replayAll(); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any()); assertThrows(NotFoundException.class, () -> connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null))); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorNotFound() { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, FORWARD) ); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorLeaderRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartConnector(eq(CONNECTOR_NAME), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, null); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartConnectorOwnerRedirect() throws Throwable { - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, true); assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - PowerMock.verifyAll(); - - PowerMock.verifyAll(); } @Test public void testRestartTaskNotFound() { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackException(cb, new NotFoundException("not found")); - - PowerMock.replayAll(); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackException(cb, new NotFoundException("not found")) + .when(herder).restartTask(eq(taskId), cb.capture()); assertThrows(NotFoundException.class, () -> connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD)); - - PowerMock.verifyAll(); } @Test public void testRestartTaskLeaderRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); - expectAndCallbackNotLeaderException(cb); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackNotLeaderException(cb).when(herder) + .restartTask(eq(taskId), cb.capture()); - EasyMock.expect(RestClient.httpRequest(EasyMock.eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); - - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null); - - PowerMock.verifyAll(); } @Test public void testRestartTaskOwnerRedirect() throws Throwable { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - final Capture<Callback<Void>> cb = Capture.newInstance(); - herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); String ownerUrl = "http://owner:8083"; - expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); - - EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), - EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class))) - .andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)) + .when(herder).restartTask(eq(taskId), cb.capture()); - PowerMock.replayAll(); + restClientStatic.when(() -> + RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), + eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)) + ).thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null)); connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true); - - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(false); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(false); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false); + HttpHeaders headers = mock(HttpHeaders.class); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); assertEquals("Topic tracking reset is disabled.", e.getMessage()); - PowerMock.verifyAll(); } @Test public void testConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - EasyMock.expect(herder.connectorActiveTopics(CONNECTOR_NAME)) - .andReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); - PowerMock.replay(workerConfig); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + when(herder.connectorActiveTopics(CONNECTOR_NAME)) + .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); connectorsResource = new ConnectorsResource(herder, workerConfig); - PowerMock.replayAll(); Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); Map<String, Map<String, Object>> body = (Map<String, Map<String, Object>>) response.getEntity(); assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector()); assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS), ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics()); - PowerMock.verifyAll(); } @Test public void testResetConnectorActiveTopics() { - PowerMock.reset(workerConfig); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).andReturn(true); - EasyMock.expect(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).andReturn(true); - HttpHeaders headers = EasyMock.mock(HttpHeaders.class); + when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true); + when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true); + HttpHeaders headers = mock(HttpHeaders.class); herder.resetConnectorActiveTopics(CONNECTOR_NAME); - EasyMock.expectLastCall(); - PowerMock.replay(workerConfig); + verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME); Review Comment: This part should be moved after the call to `connectorsResource::resetConnectorActiveTopics`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ########## @@ -175,85 +183,69 @@ private static Map<String, String> getConnectorConfig(Map<String, String> mapToC @Test public void testListConnectors() { - final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); - EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - - PowerMock.replayAll(); + when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); Collection<String> connectors = (Collection<String>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity(); // Ordering isn't guaranteed, compare sets assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); - - PowerMock.verifyAll(); } @Test public void testExpandConnectorsStatus() { - EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - ConnectorStateInfo connector = EasyMock.mock(ConnectorStateInfo.class); - ConnectorStateInfo connector2 = EasyMock.mock(ConnectorStateInfo.class); - EasyMock.expect(herder.connectorStatus(CONNECTOR2_NAME)).andReturn(connector2); - EasyMock.expect(herder.connectorStatus(CONNECTOR_NAME)).andReturn(connector); + when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorStateInfo connector = mock(ConnectorStateInfo.class); + ConnectorStateInfo connector2 = mock(ConnectorStateInfo.class); + when(herder.connectorStatus(CONNECTOR2_NAME)).thenReturn(connector2); + when(herder.connectorStatus(CONNECTOR_NAME)).thenReturn(connector); - forward = EasyMock.mock(UriInfo.class); + forward = mock(UriInfo.class); MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("expand", "status"); - EasyMock.expect(forward.getQueryParameters()).andReturn(queryParams).anyTimes(); - EasyMock.replay(forward); - - PowerMock.replayAll(); + when(forward.getQueryParameters()).thenReturn(queryParams); Map<String, Map<String, Object>> expanded = (Map<String, Map<String, Object>>) connectorsResource.listConnectors(forward, NULL_HEADERS).getEntity(); // Ordering isn't guaranteed, compare sets assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), expanded.keySet()); assertEquals(connector2, expanded.get(CONNECTOR2_NAME).get("status")); assertEquals(connector, expanded.get(CONNECTOR_NAME).get("status")); - PowerMock.verifyAll(); } @Test public void testExpandConnectorsInfo() { - EasyMock.expect(herder.connectors()).andReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - ConnectorInfo connector = EasyMock.mock(ConnectorInfo.class); - ConnectorInfo connector2 = EasyMock.mock(ConnectorInfo.class); - EasyMock.expect(herder.connectorInfo(CONNECTOR2_NAME)).andReturn(connector2); - EasyMock.expect(herder.connectorInfo(CONNECTOR_NAME)).andReturn(connector); + when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); + ConnectorInfo connector = mock(ConnectorInfo.class); + ConnectorInfo connector2 = mock(ConnectorInfo.class); Review Comment: I'm not sure I follow--it looks like this is necessary for the assertions at lines 228 and 229 to pass? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org