This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62914129c7d KAFKA-14099 - Fix request logging in connect (#12434)
62914129c7d is described below

commit 62914129c7dbf856a7702f33ed456109c6980b0c
Author: Alexandre Garnier <zig...@users.noreply.github.com>
AuthorDate: Wed Oct 12 16:28:55 2022 +0200

    KAFKA-14099 - Fix request logging in connect (#12434)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   1 +
 .../kafka/connect/runtime/rest/RestServer.java     |  16 +--
 .../kafka/connect/runtime/rest/RestServerTest.java | 137 ++++++++++++---------
 .../unit/kafka/utils/LogCaptureAppender.scala      |   6 +
 5 files changed, 94 insertions(+), 67 deletions(-)

diff --git a/build.gradle b/build.gradle
index 642c904d766..7cf6cc44b1c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1281,6 +1281,7 @@ project(':clients') {
     testRuntimeOnly libs.jacksonJDK8Datatypes
     testImplementation libs.jose4j
     testImplementation libs.jacksonJaxrsJsonProvider
+    testImplementation libs.log4j
 
     generator project(':generator')
   }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cad049935b5..b2959d9ae6b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -573,6 +573,7 @@
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="org.apache.http"/>
         <allow pkg="io.swagger.v3.oas.annotations"/>
+        <allow pkg="kafka.utils" />
         <subpackage name="resources">
           <allow pkg="org.apache.log4j" />
         </subpackage>
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 3c89ddb55fc..baa9b041520 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -41,8 +41,6 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.Slf4jRequestLogWriter;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
-import org.eclipse.jetty.server.handler.DefaultHandler;
-import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
@@ -190,6 +188,11 @@ public class RestServer {
     public void initializeServer() {
         log.info("Initializing REST server");
 
+        Slf4jRequestLogWriter slf4jRequestLogWriter = new 
Slf4jRequestLogWriter();
+        
slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
+        CustomRequestLog requestLog = new 
CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT + 
" %{ms}T");
+        jettyServer.setRequestLog(requestLog);
+
         /* Needed for graceful shutdown as per `setStopTimeout` documentation 
*/
         StatisticsHandler statsHandler = new StatisticsHandler();
         statsHandler.setHandler(handlers);
@@ -284,15 +287,6 @@ public class RestServer {
             configureHttpResponsHeaderFilter(context);
         }
 
-        RequestLogHandler requestLogHandler = new RequestLogHandler();
-        Slf4jRequestLogWriter slf4jRequestLogWriter = new 
Slf4jRequestLogWriter();
-        
slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
-        CustomRequestLog requestLog = new 
CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT + 
" %{ms}T");
-        requestLogHandler.setRequestLog(requestLog);
-
-        contextHandlers.add(new DefaultHandler());
-        contextHandlers.add(requestLogHandler);
-
         handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
         try {
             context.start();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 2aedb32cdf3..82f9e0395e3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.connect.runtime.rest;
 
+import kafka.utils.LogCaptureAppender;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpOptions;
@@ -44,8 +46,11 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.core.MediaType;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -60,10 +65,12 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 public class RestServerTest {
-    
+
     private Herder herder;
     private Plugins plugins;
     private RestServer server;
+    private CloseableHttpClient httpClient;
+    private Collection<CloseableHttpResponse> responses = new ArrayList<>();
 
     protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";
 
@@ -71,10 +78,17 @@ public class RestServerTest {
     public void setUp() {
         herder = mock(Herder.class);
         plugins = mock(Plugins.class);
+        httpClient = HttpClients.createMinimal();
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws IOException {
+        for (CloseableHttpResponse response: responses) {
+            response.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
         if (server != null) {
             server.stop();
         }
@@ -113,6 +127,7 @@ public class RestServerTest {
 
         server = new RestServer(config);
         Assert.assertEquals("http://localhost:8080/";, 
server.advertisedUrl().toString());
+        server.stop();
 
         // Advertised URI from listeners with protocol
         configMap = new HashMap<>(baseWorkerProps());
@@ -122,6 +137,7 @@ public class RestServerTest {
 
         server = new RestServer(config);
         Assert.assertEquals("https://localhost:8443/";, 
server.advertisedUrl().toString());
+        server.stop();
 
         // Advertised URI from listeners with only SSL available
         configMap = new HashMap<>(baseWorkerProps());
@@ -130,6 +146,7 @@ public class RestServerTest {
 
         server = new RestServer(config);
         Assert.assertEquals("https://localhost:8443/";, 
server.advertisedUrl().toString());
+        server.stop();
 
         // Listener is overriden by advertised values
         configMap = new HashMap<>(baseWorkerProps());
@@ -141,6 +158,7 @@ public class RestServerTest {
 
         server = new RestServer(config);
         Assert.assertEquals("http://somehost:10000/";, 
server.advertisedUrl().toString());
+        server.stop();
 
         // correct listener is chosen when https listener is configured before 
http listener and advertised listener is http
         configMap = new HashMap<>(baseWorkerProps());
@@ -149,6 +167,7 @@ public class RestServerTest {
         config = new DistributedConfig(configMap);
         server = new RestServer(config);
         Assert.assertEquals("http://plaintext-localhost:4761/";, 
server.advertisedUrl().toString());
+        server.stop();
     }
 
     @Test
@@ -166,12 +185,7 @@ public class RestServerTest {
 
         HttpOptions request = new HttpOptions("/connectors");
         request.addHeader("Content-Type", MediaType.WILDCARD);
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(
-            server.advertisedUrl().getHost(),
-            server.advertisedUrl().getPort()
-        );
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
         Assert.assertEquals(MediaType.TEXT_PLAIN, 
response.getEntity().getContentType().getValue());
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         response.getEntity().writeTo(baos);
@@ -196,15 +210,12 @@ public class RestServerTest {
         server = new RestServer(workerConfig);
         server.initializeServer();
         server.initializeResources(herder);
+        URI serverUrl = server.advertisedUrl();
+
         HttpRequest request = new HttpGet("/connectors");
         request.addHeader("Referer", origin + "/page");
         request.addHeader("Origin", origin);
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(
-            server.advertisedUrl().getHost(),
-            server.advertisedUrl().getPort()
-        );
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(serverUrl, request);
 
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
 
@@ -217,7 +228,7 @@ public class RestServerTest {
         request.addHeader("Referer", origin + "/page");
         request.addHeader("Origin", origin);
         request.addHeader("Access-Control-Request-Method", method);
-        response = httpClient.execute(httpHost, request);
+        response = executeRequest(serverUrl, request);
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
         if (expectedHeader != null) {
             Assert.assertEquals(expectedHeader,
@@ -244,9 +255,7 @@ public class RestServerTest {
         server.initializeServer();
         server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
 
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
     }
@@ -269,12 +278,11 @@ public class RestServerTest {
 
         ObjectMapper mapper = new ObjectMapper();
 
-        String host = server.advertisedUrl().getHost();
-        int port = server.advertisedUrl().getPort();
+        URI serverUrl = server.advertisedUrl();
 
-        executePut(host, port, "/admin/loggers/a.b.c.s.W", "{\"level\": 
\"INFO\"}");
+        executePut(serverUrl, "/admin/loggers/a.b.c.s.W", "{\"level\": 
\"INFO\"}");
 
-        String responseStr = executeGet(host, port, "/admin/loggers");
+        String responseStr = executeGet(serverUrl, "/admin/loggers");
         Map<String, Map<String, ?>> loggers = mapper.readValue(responseStr, 
new TypeReference<Map<String, Map<String, ?>>>() {
         });
         assertNotNull("expected non null response for /admin/loggers" + 
prettyPrint(loggers), loggers);
@@ -305,12 +313,10 @@ public class RestServerTest {
 
         assertNotEquals(server.advertisedUrl(), server.adminUrl());
 
-        executeGet(server.adminUrl().getHost(), server.adminUrl().getPort(), 
"/admin/loggers");
+        executeGet(server.adminUrl(), "/admin/loggers");
 
         HttpRequest request = new HttpGet("/admin/loggers");
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
@@ -332,12 +338,37 @@ public class RestServerTest {
         assertNull(server.adminUrl());
 
         HttpRequest request = new HttpGet("/admin/loggers");
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testRequestLogs() throws IOException, InterruptedException {
+        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+        DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+        doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+        doReturn(plugins).when(herder).plugins();
+        
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
 workerConfig, ConnectRestExtension.class);
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+
+        LogCaptureAppender restServerAppender = 
LogCaptureAppender.createAndRegister();
+        HttpRequest request = new HttpGet("/");
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
+
+        // Stop the server to flush all logs
+        server.stop();
+
+        Collection<String> logMessages = 
restServerAppender.getRenderedMessages();
+        LogCaptureAppender.unregister(restServerAppender);
+        restServerAppender.close();
+        String expectedlogContent = "\"GET / HTTP/1.1\" " + 
String.valueOf(response.getStatusLine().getStatusCode());
+        assertTrue(logMessages.stream().anyMatch(logMessage -> 
logMessage.contains(expectedlogContent)));
+    }
+
     @Test
     public void testValidCustomizedHttpResponseHeaders() throws IOException  {
         String headerConfig =
@@ -368,51 +399,45 @@ public class RestServerTest {
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
         server = new RestServer(workerConfig);
-        try {
-            server.initializeServer();
-            server.initializeResources(herder);
-            HttpRequest request = new HttpGet("/connectors");
-            try (CloseableHttpClient httpClient = HttpClients.createMinimal()) 
{
-                HttpHost httpHost = new 
HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
-                try (CloseableHttpResponse response = 
httpClient.execute(httpHost, request)) {
-                    Assert.assertEquals(200, 
response.getStatusLine().getStatusCode());
-                    if (!headerConfig.isEmpty()) {
-                        expectedHeaders.forEach((k, v) ->
-                                
Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
-                    } else {
-                        
Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
-                    }
-                }
-            }
-        } finally {
-            server.stop();
-            server = null;
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        HttpResponse response = executeRequest(server.advertisedUrl(), 
request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), 
v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
         }
     }
 
-    private String executeGet(String host, int port, String endpoint) throws 
IOException {
+    private String executeGet(URI serverUrl, String endpoint) throws 
IOException {
         HttpRequest request = new HttpGet(endpoint);
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(host, port);
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(serverUrl, request);
 
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
         return new BasicResponseHandler().handleResponse(response);
     }
 
-    private String executePut(String host, int port, String endpoint, String 
jsonBody) throws IOException {
+    private String executePut(URI serverUrl, String endpoint, String jsonBody) 
throws IOException {
         HttpPut request = new HttpPut(endpoint);
         StringEntity entity = new StringEntity(jsonBody, 
StandardCharsets.UTF_8.name());
         entity.setContentType("application/json");
         request.setEntity(entity);
-        CloseableHttpClient httpClient = HttpClients.createMinimal();
-        HttpHost httpHost = new HttpHost(host, port);
-        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        HttpResponse response = executeRequest(serverUrl, request);
 
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
         return new BasicResponseHandler().handleResponse(response);
     }
 
+    private HttpResponse executeRequest(URI serverUrl, HttpRequest request) 
throws IOException {
+        HttpHost httpHost = new HttpHost(serverUrl.getHost(), 
serverUrl.getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        responses.add(response);
+        return response;
+    }
+
     private static String prettyPrint(Map<String, ?> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map);
diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala 
b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
index 2d071452829..bec77356e1e 100644
--- a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
+++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
@@ -20,6 +20,7 @@ package kafka.utils
 import org.apache.log4j.{AppenderSkeleton, Level, Logger}
 import org.apache.log4j.spi.LoggingEvent
 
+import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 
 class LogCaptureAppender extends AppenderSkeleton {
@@ -37,6 +38,11 @@ class LogCaptureAppender extends AppenderSkeleton {
     }
   }
 
+  def getRenderedMessages: java.util.List[String] = {
+    return getMessages.map(e => e.getRenderedMessage).asJava
+  }
+
+
   override def close(): Unit = {
     events.synchronized {
       events.clear()

Reply via email to