C0urante commented on code in PR #12434:
URL: https://github.com/apache/kafka/pull/12434#discussion_r930423970


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java:
##########
@@ -338,6 +341,41 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        URI advertisedUrl = server.advertisedUrl();  // Call before capturing 
logs as this creates a log
+
+        try (LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister(RestServer.class)) {
+            HttpRequest request = new HttpGet("/");
+            CloseableHttpClient httpClient = HttpClients.createMinimal();
+            HttpHost httpHost = new HttpHost(advertisedUrl.getHost(), 
advertisedUrl.getPort());
+            CloseableHttpResponse response = httpClient.execute(httpHost, 
request);
+
+            // Wait for server to handle request completion and create request 
log
+            int tries = 0;
+            while (restServerAppender.getMessages().size() < 1 && tries < 10) {
+                ++tries;
+                System.err.println("Wait for log message to arrive, retry " + 
tries);
+                Thread.sleep(1000);
+            }
+
+            List<String> logMessages = restServerAppender.getMessages();
+            System.err.println(logMessages);

Review Comment:
   If this information could still be valuable, we should convert this line to 
use an SLF4J logger instead of printing to stderr.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java:
##########
@@ -338,6 +341,41 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        URI advertisedUrl = server.advertisedUrl();  // Call before capturing 
logs as this creates a log
+
+        try (LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister(RestServer.class)) {
+            HttpRequest request = new HttpGet("/");
+            CloseableHttpClient httpClient = HttpClients.createMinimal();

Review Comment:
   I don't like that we're not closing this client, but this style does match 
the other tests. It might be nice to have a class-level `httpClient` field 
that, if non-null, gets closed at the end of each test in `tearDown`, but I 
don't think it's worth blocking this PR on that change, so if you'd prefer, I 
can do that in a follow-up.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java:
##########
@@ -338,6 +341,41 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        URI advertisedUrl = server.advertisedUrl();  // Call before capturing 
logs as this creates a log
+
+        try (LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister(RestServer.class)) {
+            HttpRequest request = new HttpGet("/");
+            CloseableHttpClient httpClient = HttpClients.createMinimal();
+            HttpHost httpHost = new HttpHost(advertisedUrl.getHost(), 
advertisedUrl.getPort());
+            CloseableHttpResponse response = httpClient.execute(httpHost, 
request);
+
+            // Wait for server to handle request completion and create request 
log
+            int tries = 0;
+            while (restServerAppender.getMessages().size() < 1 && tries < 10) {
+                ++tries;
+                System.err.println("Wait for log message to arrive, retry " + 
tries);
+                Thread.sleep(1000);
+            }
+
+            List<String> logMessages = restServerAppender.getMessages();
+            System.err.println(logMessages);
+            String requestlog = logMessages.get(0);

Review Comment:
   This may be brittle if we add other log messages to the `RestServer` class. 
Do you think we can relax the constraint here to check that at least one log 
message matches the expected format, but not necessarily the first one?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java:
##########
@@ -338,6 +341,41 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        URI advertisedUrl = server.advertisedUrl();  // Call before capturing 
logs as this creates a log
+
+        try (LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister(RestServer.class)) {
+            HttpRequest request = new HttpGet("/");
+            CloseableHttpClient httpClient = HttpClients.createMinimal();
+            HttpHost httpHost = new HttpHost(advertisedUrl.getHost(), 
advertisedUrl.getPort());
+            CloseableHttpResponse response = httpClient.execute(httpHost, 
request);
+
+            // Wait for server to handle request completion and create request 
log
+            int tries = 0;
+            while (restServerAppender.getMessages().size() < 1 && tries < 10) {
+                ++tries;
+                System.err.println("Wait for log message to arrive, retry " + 
tries);
+                Thread.sleep(1000);
+            }

Review Comment:
   Would invoking `server.stop()` be enough to guarantee that it completes the 
request here? I haven't dug too deeply into the Jetty server internals yet but 
it seems promising.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java:
##########
@@ -338,6 +341,41 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        URI advertisedUrl = server.advertisedUrl();  // Call before capturing 
logs as this creates a log
+
+        try (LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister(RestServer.class)) {
+            HttpRequest request = new HttpGet("/");
+            CloseableHttpClient httpClient = HttpClients.createMinimal();
+            HttpHost httpHost = new HttpHost(advertisedUrl.getHost(), 
advertisedUrl.getPort());
+            CloseableHttpResponse response = httpClient.execute(httpHost, 
request);

Review Comment:
   Same thoughts w/r/t potential resource leaks here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to