This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cf55a1  [FLINK-38481] Make HTTP request/response logging configurable
2cf55a1 is described below

commit 2cf55a1efce7c6fb89e6398bb18e18557c851829
Author: David Radley <[email protected]>
AuthorDate: Thu Dec 4 12:50:54 2025 +0000

    [FLINK-38481] Make HTTP request/response logging configurable
---
 docs/content.zh/docs/connectors/table/http.md      |  38 ++-
 docs/content/docs/connectors/table/http.md         |  36 +-
 .../apache/flink/connector/http/HttpLogger.java    | 133 ++++++++
 .../flink/connector/http/HttpLoggingLevelType.java |  12 +
 .../http/config/HttpConnectorConfigConstants.java  |   2 +
 .../sink/httpclient/BatchRequestSubmitter.java     |  18 +-
 .../sink/httpclient/JavaNetSinkHttpClient.java     |   7 +-
 .../http/table/lookup/BodyBasedRequestFactory.java |   7 +-
 .../table/lookup/HttpLookupConnectorOptions.java   |   8 +
 .../table/lookup/HttpLookupTableSourceFactory.java |  19 ++
 .../table/lookup/JavaNetHttpPollingClient.java     |  16 +-
 .../http/table/lookup/RequestFactoryBase.java      |   6 -
 .../lookup/Slf4JHttpLookupPostRequestCallback.java |  10 +-
 .../table/sink/Slf4jHttpPostRequestCallback.java   |  19 +-
 .../connector/http/HttpLoggerRequestTest.java      | 213 ++++++++++++
 .../connector/http/HttpLoggerResponseTest.java     | 362 +++++++++++++++++++++
 .../src/test/resources/simpleLogger.properties     |   2 +
 17 files changed, 849 insertions(+), 59 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index a517c88..4c6b94d 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -51,7 +51,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Timeouts](#timeouts)
     * [Source table HTTP status code](#source-table-http-status-code)
     * [Retries (Lookup source)](#retries-lookup-source)
-        * [Retry strategy](#retry-strategy)
+      * [Retry strategy](#retry-strategy)
       * [Lookup multiple results](#lookup-multiple-results)
   * [Working with HTTP sink tables](#working-with-http-sink-tables)
     * [HTTP Sink](#http-sink)
@@ -156,7 +156,7 @@ Or for REST POST method they will be converted to Json and 
used as request body.
 
 ### Lookup Source Connector Options
 
-Note the options with the prefix _http_ are the HTTP connector specific 
options, the others are Flink options.   
+Note the options with the prefix _http_ are the HTTP connector specific 
options, the others are Flink options.
 
 | Option                                                                 | 
Required | Description/Value                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
@@ -180,12 +180,12 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 | http.security.oidc.token.expiry.reduction                              | 
optional | OIDC tokens will be requested if the current time is later than the 
cached token expiry time minus this value.                                      
                                                                                
                                                                                
                                                                                
                   [...]
 | http.source.lookup.continue-on-error                                   | 
optional | When true, the flow will continue on errors, returning row content. 
When false (the default) the job ends on errors.                                
                                                                                
                                                                                
                                                                                
                   [...]
 | http.source.lookup.request.timeout                                     | 
optional | Sets HTTP request timeout in seconds. If not specified, the default 
value of 30 seconds will be used.                                               
                                                                                
                                                                                
                                                                                
                   [...]
-| http.source.lookup.http-version                                        | 
optional | Version of HTTP to use for lookup http requests. The valid values 
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option 
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 
'HTTP/2 upgrade not supported'.                                                 
                      [...]
 | http.source.lookup.request.thread-pool.size                            | 
optional | Sets the size of pool thread for HTTP lookup request processing. 
Increasing this value would mean that more concurrent requests can be processed 
in the same time. If not specified, the default value of 8 threads will be 
used.                                                                           
                                                                                
                           [...]
 | http.source.lookup.response.thread-pool.size                           | 
optional | Sets the size of pool thread for HTTP lookup response processing. 
Increasing this value would mean that more concurrent requests can be processed 
in the same time. If not specified, the default value of 4 threads will be 
used.                                                                           
                                                                                
                          [...]
 | http.source.lookup.use-raw-authorization-header                        | 
optional | If set to `'true'`, uses the raw value set for the `Authorization` 
header, without transformation for Basic Authentication (base64, addition of 
"Basic " prefix). If not specified, defaults to `'false'`.                      
                                                                                
                                                                                
                       [...]
 | http.source.lookup.request-callback                                    | 
optional | Specify which `HttpLookupPostRequestCallback` implementation to use. 
By default, it is set to `slf4j-lookup-logger` corresponding to 
`Slf4jHttpLookupPostRequestCallback`.                                           
                                                                                
                                                                                
                                  [...]
 | http.source.lookup.connection.timeout                                  | 
optional | Source table connection timeout. Default - no value.                 
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| http.source.lookup.http-version                                        | 
optional | Version of HTTP to use for lookup http requests. The valid values 
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option 
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 
'HTTP/2 upgrade not supported'.                                                 
                      [...]
 | http.source.lookup.success-codes                                       | 
optional | Comma separated http codes considered as success response. Use 
[1-5]XX for groups and '!' character for excluding. The default is 2XX.         
                                                                                
                                                                                
                                                                                
                        [...]
 | http.source.lookup.retry-codes                                         | 
optional | Comma separated http codes considered as transient errors. Use 
[1-5]XX for groups and '!' character for excluding. The default is 500,503,504. 
                                                                                
                                                                                
                                                                                
                        [...]
 | http.source.lookup.ignored-response-codes                              | 
optional | Comma separated http codes. Content for these responses will be 
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored 
responses together with `http.source.lookup.success-codes` are considered as 
successful.                                                                     
                                                                                
                                 [...]
@@ -204,7 +204,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 
 ### Query Creators
 
-In the above example we see that HTTP GET operations and HTTP POST operations 
result in different mapping of the columns to the 
+In the above example we see that HTTP GET operations and HTTP POST operations 
result in different mapping of the columns to the
 HTTP request content. In reality, you will want to have move control over how 
the SQL columns are mapped to the HTTP content.
 The HTTP connector supplies a number of Query Creators that you can use define 
these mappings.
 
@@ -230,7 +230,7 @@ The HTTP connector supplies a number of Query Creators that 
you can use define t
       <td></td>
       <td>✓ for PUTs and POSTs</td>
     </tr>
-  
+
     </tbody>
 </table>
 
@@ -245,7 +245,7 @@ parameters `http.request.query-param-fields`, 
`http.request.body-fields` and `ht
 
 The default Query Creator is called _http-generic-json-url_.  For body based 
queries such as POST/PUT requests, the
 
([GenericGetQueryCreator](flink-connector-http/src/main/java/org/apache/flink/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))is
 provided as a default query creator. This implementation uses Flink's 
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
  to convert RowData object into Json String.
-For GET requests can be used for query parameter based queries. 
+For GET requests can be used for query parameter based queries.
 
 The _http-generic-json-url_ allows for using custom formats that will perform 
serialization to Json. Thanks to this, users can create their own logic for 
converting RowData to Json Strings suitable for their HTTP endpoints and use 
this logic as custom format
 with HTTP Lookup connector and SQL queries.
@@ -331,7 +331,7 @@ pagination methods. Currently, the connector supports only 
two simple approaches
 ## Working with HTTP sink tables
 
 ### HTTP Sink
-The following example shows the minimum Table API example to create a sink: 
+The following example shows the minimum Table API example to create a sink:
 
 ```roomsql
 CREATE TABLE http (
@@ -350,8 +350,9 @@ Then use `INSERT` SQL statement to send data to your HTTP 
endpoint:
 INSERT INTO http VALUES (1, 'Ninette'), (2, 'Hedy')
 ```
 
+
 When `'format' = 'json'` is specified on the table definition, the HTTP sink 
sends json payloads. It is possible to change the format of the payload by 
specifying
-another format name. 
+another format name.
 
 ### Sink Connector Options
 
@@ -366,6 +367,7 @@ another format name.
 | sink.requests.max-buffered                | optional | Maximum number of 
buffered records before applying backpressure.                                  
                                                                                
                                                 |
 | sink.flush-buffer.size                    | optional | The maximum size of a 
batch of entries that may be sent to the HTTP endpoint measured in bytes.       
                                                                                
                                             |
 | sink.flush-buffer.timeout                 | optional | Threshold time in 
milliseconds for an element to be in a buffer before being flushed.             
                                                                                
                                                 |
+| http.logging.level                        | optional | Logging levels for 
HTTP content. Valid values are `MIN` (the default), `REQ_RESP` and `MAX`.       
                                                                                
                                                |
 | http.sink.request-callback                | optional | Specify which 
`HttpPostRequestCallback` implementation to use. By default, it is set to 
`slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`.                 
                                                           |
 | http.sink.error.code                      | optional | List of HTTP status 
codes that should be treated as errors by HTTP Sink, separated with comma.      
                                                                                
                                               |
 | http.sink.error.code.exclude              | optional | List of HTTP status 
codes that should be excluded from the `http.sink.error.code` list, separated 
with comma.                                                                     
                                                 |
@@ -576,6 +578,26 @@ an example of a customised grant type token request. The 
supplied `token request
 a new one is requested. There is a property 
`http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new 
tokens will
 be requested if the current time is later than the cached token expiry time 
minus `http.security.oidc.token.expiry.reduction`.
 
+## Logging the HTTP content
+Debug level logging has been added for class 
`org.apache.flink.connector.http.HttpLogger`. To enable this, alter the log4j 
properties.
+This logging puts out log entries for the HTTP requests and responses. This 
can be useful for diagnostics to confirm that HTTP requests have been issued 
and what
+that HTTP responses or an exception has occurred (for example connection 
Refused).
+
+Logging HTTP may not be appropriate for production systems; where sensitive 
information is not allowed into the logs. But in development environments it is 
useful
+to be able to see HTTP content. Sensitive information can occur in the headers 
for example authentication tokens and passwords. Also the HTTP request and 
response bodies
+could sensitive. The default minimal logging should be used in production. For 
development, you can specify config option `http.logging.level`.
+This dictates the amount of content that debug logging will show around HTTP 
calls; the valid values are:
+
+| log level | Request method | URI | HTTP Body | Response status code | 
Headers |
+|-----------|----------------|-----|-----------|----------------------|---------|
+| MIN       | Y              | Y   | N         | Y                    | N      
 |
+| REQ_RESP  | Y              | Y   | Y         | Y                    | N      
 |
+| MAX       | Y              | Y   | Y         | Y                    | Y      
 |
+
+Notes:
+- you can customize what is traced for lookups using the 
`http.source.lookup.request-callback`.
+- where there is an N in the table the output is obfuscated.
+
 #### Restrictions at this time
 * No authentication is applied to the token request.
 * The processing does not use the refresh token if it present.
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index dd52871..de70e5e 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -51,7 +51,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Timeouts](#timeouts)
     * [Source table HTTP status code](#source-table-http-status-code)
     * [Retries (Lookup source)](#retries-lookup-source)
-        * [Retry strategy](#retry-strategy)
+      * [Retry strategy](#retry-strategy)
       * [Lookup multiple results](#lookup-multiple-results)
   * [Working with HTTP sink tables](#working-with-http-sink-tables)
     * [HTTP Sink](#http-sink)
@@ -155,7 +155,7 @@ Or for REST POST method they will be converted to Json and 
used as request body.
 
 ### Lookup Source Connector Options
 
-Note the options with the prefix _http_ are the HTTP connector specific 
options, the others are Flink options.   
+Note the options with the prefix _http_ are the HTTP connector specific 
options, the others are Flink options.
 
 | Option                                                                 | 
Required | Description/Value                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
@@ -203,7 +203,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 
 ### Query Creators
 
-In the above example we see that HTTP GET operations and HTTP POST operations 
result in different mapping of the columns to the 
+In the above example we see that HTTP GET operations and HTTP POST operations 
result in different mapping of the columns to the
 HTTP request content. In reality, you will want to have move control over how 
the SQL columns are mapped to the HTTP content.
 The HTTP connector supplies a number of Query Creators that you can use define 
these mappings.
 
@@ -229,7 +229,7 @@ The HTTP connector supplies a number of Query Creators that 
you can use define t
       <td></td>
       <td>✓ for PUTs and POSTs</td>
     </tr>
-  
+
     </tbody>
 </table>
 
@@ -244,7 +244,7 @@ parameters `http.request.query-param-fields`, 
`http.request.body-fields` and `ht
 
 The default Query Creator is called _http-generic-json-url_.  For body based 
queries such as POST/PUT requests, the
 
([GenericGetQueryCreator](flink-connector-http/src/main/java/org/apache/flink/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))is
 provided as a default query creator. This implementation uses Flink's 
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
  to convert RowData object into Json String.
-For GET requests can be used for query parameter based queries. 
+For GET requests can be used for query parameter based queries.
 
 The _http-generic-json-url_ allows for using custom formats that will perform 
serialization to Json. Thanks to this, users can create their own logic for 
converting RowData to Json Strings suitable for their HTTP endpoints and use 
this logic as custom format
 with HTTP Lookup connector and SQL queries.
@@ -330,7 +330,7 @@ pagination methods. Currently, the connector supports only 
two simple approaches
 ## Working with HTTP sink tables
 
 ### HTTP Sink
-The following example shows the minimum Table API example to create a sink: 
+The following example shows the minimum Table API example to create a sink:
 
 ```roomsql
 CREATE TABLE http (
@@ -349,8 +349,9 @@ Then use `INSERT` SQL statement to send data to your HTTP 
endpoint:
 INSERT INTO http VALUES (1, 'Ninette'), (2, 'Hedy')
 ```
 
+
 When `'format' = 'json'` is specified on the table definition, the HTTP sink 
sends json payloads. It is possible to change the format of the payload by 
specifying
-another format name. 
+another format name.
 
 ### Sink Connector Options
 
@@ -365,6 +366,7 @@ another format name.
 | sink.requests.max-buffered                | optional | Maximum number of 
buffered records before applying backpressure.                                  
                                                                                
                                                 |
 | sink.flush-buffer.size                    | optional | The maximum size of a 
batch of entries that may be sent to the HTTP endpoint measured in bytes.       
                                                                                
                                             |
 | sink.flush-buffer.timeout                 | optional | Threshold time in 
milliseconds for an element to be in a buffer before being flushed.             
                                                                                
                                                 |
+| http.logging.level                        | optional | Logging levels for 
HTTP content. Valid values are `MIN` (the default), `REQ_RESP` and `MAX`.       
                                                                                
                                                |
 | http.sink.request-callback                | optional | Specify which 
`HttpPostRequestCallback` implementation to use. By default, it is set to 
`slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`.                 
                                                           |
 | http.sink.error.code                      | optional | List of HTTP status 
codes that should be treated as errors by HTTP Sink, separated with comma.      
                                                                                
                                               |
 | http.sink.error.code.exclude              | optional | List of HTTP status 
codes that should be excluded from the `http.sink.error.code` list, separated 
with comma.                                                                     
                                                 |
@@ -575,6 +577,26 @@ an example of a customised grant type token request. The 
supplied `token request
 a new one is requested. There is a property 
`http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new 
tokens will
 be requested if the current time is later than the cached token expiry time 
minus `http.security.oidc.token.expiry.reduction`.
 
+## Logging the HTTP content
+Debug level logging has been added for class 
`org.apache.flink.connector.http.HttpLogger`. To enable this, alter the log4j 
properties.
+This logging puts out log entries for the HTTP requests and responses. This 
can be useful for diagnostics to confirm that HTTP requests have been issued 
and what
+that HTTP responses or an exception has occurred (for example connection 
Refused).
+
+Logging HTTP may not be appropriate for production systems; where sensitive 
information is not allowed into the logs. But in development environments it is 
useful
+to be able to see HTTP content. Sensitive information can occur in the headers 
for example authentication tokens and passwords. Also the HTTP request and 
response bodies
+could sensitive. The default minimal logging should be used in production. For 
development, you can specify config option `http.logging.level`.
+This dictates the amount of content that debug logging will show around HTTP 
calls; the valid values are:
+
+| log level | Request method | URI | HTTP Body | Response status code | 
Headers |
+|-----------|----------------|-----|-----------|----------------------|---------|
+| MIN       | Y              | Y   | N         | Y                    | N      
 |
+| REQ_RESP  | Y              | Y   | Y         | Y                    | N      
 |
+| MAX       | Y              | Y   | Y         | Y                    | Y      
 |
+
+Notes:
+- you can customize what is traced for lookups using the 
`http.source.lookup.request-callback`.
+- where there is an N in the table the output is obfuscated.
+
 #### Restrictions at this time
 * No authentication is applied to the token request.
 * The processing does not use the refresh token if it present.
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
new file mode 100644
index 0000000..759bbcd
--- /dev/null
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.http;
+
+import 
org.apache.flink.connector.http.table.lookup.HttpLookupSourceRequestEntry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
+
+import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
+
+/**
+ * HttpLogger, this is a class to perform HTTP content logging based on a 
level defined in
+ * configuration.
+ */
+@Slf4j
+public class HttpLogger implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final HttpLoggingLevelType httpLoggingLevelType;
+
+    private HttpLogger(Properties properties) {
+        String code = (String) properties.get(HTTP_LOGGING_LEVEL);
+        this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code);
+    }
+
+    public static HttpLogger getHttpLogger(Properties properties) {
+        return new HttpLogger(properties);
+    }
+
+    public void logRequest(HttpRequest httpRequest) {
+        if (log.isDebugEnabled()) {
+            log.debug(createStringForRequest(httpRequest));
+        }
+    }
+
+    public void logResponse(HttpResponse<String> response) {
+
+        if (log.isDebugEnabled()) {
+            log.debug(createStringForResponse(response));
+        }
+    }
+
+    public void logRequestBody(String body) {
+        if (log.isDebugEnabled()) {
+            log.debug(createStringForBody(body));
+        }
+    }
+
+    public void logExceptionResponse(HttpLookupSourceRequestEntry request, 
Exception e) {
+        if (log.isDebugEnabled()) {
+            log.debug(createStringForExceptionResponse(request, e));
+        }
+    }
+
+    String createStringForRequest(HttpRequest httpRequest) {
+        String headersForLog = getHeadersForLog(httpRequest.headers());
+        return String.format(
+                "HTTP %s Request: URL: %s, Headers: %s",
+                httpRequest.method(), httpRequest.uri().toString(), 
headersForLog);
+    }
+
+    private String getHeadersForLog(HttpHeaders httpHeaders) {
+        if (httpHeaders == null) {
+            return "None";
+        }
+        Map<String, List<String>> headersMap = httpHeaders.map();
+        if (headersMap.isEmpty()) {
+            return "None";
+        }
+        if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) {
+            StringJoiner headers = new StringJoiner(";");
+            for (Map.Entry<String, List<String>> reqHeaders : 
headersMap.entrySet()) {
+                StringJoiner values = new StringJoiner(";");
+                for (String value : reqHeaders.getValue()) {
+                    values.add(value);
+                }
+                String header = reqHeaders.getKey() + ":[" + values + "]";
+                headers.add(header);
+            }
+            return headers.toString();
+        }
+        return "***";
+    }
+
+    String createStringForResponse(HttpResponse<String> response) {
+        String headersForLog = getHeadersForLog(response.headers());
+
+        String bodyForLog = "***";
+        if (response.body() == null || response.body().isEmpty()) {
+            bodyForLog = "None";
+        } else {
+            if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
+                bodyForLog = response.body().toString();
+            }
+        }
+        return String.format(
+                "HTTP %s Response: URL: %s,"
+                        + " Response Headers: %s, status code: %s, Response 
Body: %s",
+                response.request().method(),
+                response.uri(),
+                headersForLog,
+                response.statusCode(),
+                bodyForLog);
+    }
+
+    private String createStringForExceptionResponse(
+            HttpLookupSourceRequestEntry request, Exception e) {
+        HttpRequest httpRequest = request.getHttpRequest();
+        return String.format(
+                "HTTP %s Exception Response: URL: %s Exception %s",
+                httpRequest.method(), httpRequest.uri(), e);
+    }
+
+    String createStringForBody(String body) {
+        String bodyForLog = "***";
+        if (body == null || body.isEmpty()) {
+            bodyForLog = "None";
+        } else {
+            if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
+                bodyForLog = body.toString();
+            }
+        }
+        return bodyForLog;
+    }
+}
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
new file mode 100644
index 0000000..7cb63eb
--- /dev/null
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
@@ -0,0 +1,12 @@
+package org.apache.flink.connector.http;
+
+/** Defines the level of http content that will be logged. */
+public enum HttpLoggingLevelType {
+    MIN,
+    REQ_RESP,
+    MAX;
+
+    public static HttpLoggingLevelType valueOfStr(String code) {
+        return code == null ? MIN : valueOf(code);
+    }
+}
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 9fc3b70..5bbc0e4 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -109,6 +109,8 @@ public final class HttpConnectorConfigConstants {
 
     public static final String CONTINUE_ON_ERROR = SOURCE_LOOKUP_PREFIX + 
"continue-on-error";
 
+    public static final String HTTP_LOGGING_LEVEL = FLINK_CONNECTOR_HTTP + 
"logging.level";
+
     public static final String SOURCE_PROXY_HOST = SOURCE_LOOKUP_PREFIX + 
"proxy.host";
 
     public static final String SOURCE_PROXY_PORT = SOURCE_LOOKUP_PREFIX + 
"proxy.port";
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
index 19b45d6..6733ba2 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
@@ -73,18 +73,18 @@ public class BatchRequestSubmitter extends 
AbstractRequestSubmitter {
         }
 
         var responseFutures = new 
ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
-        String previousReqeustMethod = requestsToSubmit.get(0).method;
+        String previousRequestMethod = requestsToSubmit.get(0).method;
         List<HttpSinkRequestEntry> requestBatch = new 
ArrayList<>(httpRequestBatchSize);
 
         for (var entry : requestsToSubmit) {
             if (requestBatch.size() == httpRequestBatchSize
-                    || !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
+                    || !previousRequestMethod.equalsIgnoreCase(entry.method)) {
                 // break batch and submit
                 responseFutures.add(sendBatch(endpointUrl, requestBatch));
                 requestBatch.clear();
             }
             requestBatch.add(entry);
-            previousReqeustMethod = entry.method;
+            previousRequestMethod = entry.method;
         }
 
         // submit anything that left
@@ -98,9 +98,9 @@ public class BatchRequestSubmitter extends 
AbstractRequestSubmitter {
     }
 
     private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
-            String endpointUrl, List<HttpSinkRequestEntry> reqeustBatch) {
+            String endpointUrl, List<HttpSinkRequestEntry> requestBatch) {
 
-        HttpRequest httpRequest = buildHttpRequest(reqeustBatch, 
URI.create(endpointUrl));
+        HttpRequest httpRequest = buildHttpRequest(requestBatch, 
URI.create(endpointUrl));
         return httpClient
                 .sendAsync(httpRequest.getHttpRequest(), 
HttpResponse.BodyHandlers.ofString())
                 .exceptionally(
@@ -115,11 +115,11 @@ public class BatchRequestSubmitter extends 
AbstractRequestSubmitter {
                         publishingThreadPool);
     }
 
-    private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> 
reqeustBatch, URI endpointUri) {
+    private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> 
requestBatch, URI endpointUri) {
 
         try {
-            var method = reqeustBatch.get(0).method;
-            List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
+            var method = requestBatch.get(0).method;
+            List<byte[]> elements = new ArrayList<>(requestBatch.size());
 
             BodyPublisher publisher;
             // By default, Java's BodyPublishers.ofByteArrays(elements) will 
just put Jsons
@@ -127,7 +127,7 @@ public class BatchRequestSubmitter extends 
AbstractRequestSubmitter {
             // What we do here is we pack every Json/byteArray into Json Array 
hence '[' and ']'
             // at the end, and we separate every element with comma.
             elements.add(BATCH_START_BYTES);
-            for (HttpSinkRequestEntry entry : reqeustBatch) {
+            for (HttpSinkRequestEntry entry : requestBatch) {
                 elements.add(entry.element);
                 elements.add(BATCH_ELEMENT_DELIM_BYTES);
             }
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
index 6216288..0072b70 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.http.sink.httpclient;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.http.HttpLogger;
 import org.apache.flink.connector.http.HttpPostRequestCallback;
 import org.apache.flink.connector.http.clients.SinkHttpClient;
 import org.apache.flink.connector.http.clients.SinkHttpClientResponse;
@@ -57,6 +58,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
 
     private final RequestSubmitter requestSubmitter;
 
+    private final HttpLogger httpLogger;
+
     public JavaNetSinkHttpClient(
             Properties properties,
             HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
@@ -85,6 +88,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
         this.headersAndValues = 
HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
         this.requestSubmitter =
                 requestSubmitterFactory.createSubmitter(properties, 
headersAndValues);
+
+        this.httpLogger = HttpLogger.getHttpLogger(properties);
     }
 
     @Override
@@ -114,7 +119,7 @@ public class JavaNetSinkHttpClient implements 
SinkHttpClient {
         for (var response : responses) {
             var sinkRequestEntry = response.getHttpRequest();
             var optResponse = response.getResponse();
-
+            this.httpLogger.logResponse(response.getResponse().get());
             httpPostRequestCallback.call(
                     optResponse.orElse(null), sinkRequestEntry, endpointUrl, 
headerMap);
 
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
index 68fee55..886fe5d 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.http.table.lookup;
 
+import org.apache.flink.connector.http.HttpLogger;
 import org.apache.flink.connector.http.LookupQueryCreator;
 import org.apache.flink.connector.http.preprocessor.HeaderPreprocessor;
 import org.apache.flink.connector.http.utils.uri.URIBuilder;
@@ -38,6 +39,7 @@ import java.net.http.HttpRequest.Builder;
 public class BodyBasedRequestFactory extends RequestFactoryBase {
 
     private final String methodName;
+    private final HttpLogger httpLogger;
 
     public BodyBasedRequestFactory(
             String methodName,
@@ -47,6 +49,7 @@ public class BodyBasedRequestFactory extends 
RequestFactoryBase {
 
         super(lookupQueryCreator, headerPreprocessor, options);
         this.methodName = methodName.toUpperCase();
+        this.httpLogger = HttpLogger.getHttpLogger(options.getProperties());
     }
 
     /**
@@ -59,8 +62,10 @@ public class BodyBasedRequestFactory extends 
RequestFactoryBase {
     @Override
     protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
         HttpRequest.Builder builder = 
super.setUpRequestMethod(lookupQueryInfo);
+        String body = lookupQueryInfo.getLookupQuery();
         builder.uri(constructUri(lookupQueryInfo))
-                .method(methodName, 
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
+                .method(methodName, BodyPublishers.ofString(body));
+        this.httpLogger.logRequestBody(body);
         return builder;
     }
 
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index aedf59d..e5e7d58 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -19,11 +19,13 @@ package org.apache.flink.connector.http.table.lookup;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.http.HttpLoggingLevelType;
 import org.apache.flink.connector.http.retry.RetryStrategyType;
 
 import java.time.Duration;
 
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.CONTINUE_ON_ERROR;
+import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_ENDPOINT_URL;
 import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
@@ -137,6 +139,12 @@ public class HttpLookupConnectorOptions {
                             "Continue job on error. "
                                     + "This includes unsuccessful HTTP status 
codes and client side Exceptions, such as Connection Refused.");
 
+    public static final ConfigOption<String> LOGGING_LEVEL_FOR_HTTP =
+            ConfigOptions.key(HTTP_LOGGING_LEVEL)
+                    .stringType()
+                    .defaultValue(String.valueOf(HttpLoggingLevelType.MIN))
+                    .withDescription("Logging levels");
+
     public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
             ConfigOptions.key(SOURCE_PROXY_HOST)
                     .stringType()
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
index c5d0d3c..934c570 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.http.table.lookup;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.http.HttpLoggingLevelType;
 import org.apache.flink.connector.http.HttpPostRequestCallbackFactory;
 import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
 import org.apache.flink.connector.http.utils.ConfigUtils;
@@ -47,6 +48,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING;
+import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOGGING_LEVEL_FOR_HTTP;
 import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD;
 import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_REQUEST_FORMAT;
 import static 
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.REQUEST_CALLBACK_IDENTIFIER;
@@ -98,6 +100,7 @@ public class HttpLookupTableSourceFactory implements 
DynamicTableSourceFactory {
         helper.validateExcept(
                 // properties coming from 
org.apache.flink.table.api.config.ExecutionConfigOptions
                 "table.",
+                "lookup-request.",
                 HttpConnectorConfigConstants.FLINK_CONNECTOR_HTTP,
                 LOOKUP_REQUEST_FORMAT.key());
         validateHttpLookupSourceOptions(readable);
@@ -139,6 +142,22 @@ public class HttpLookupTableSourceFactory implements 
DynamicTableSourceFactory {
                                                 + " is configured.");
                             }
                         });
+        tableOptions
+                .getOptional(LOGGING_LEVEL_FOR_HTTP)
+                .ifPresent(
+                        value -> {
+                            try {
+                                HttpLoggingLevelType.valueOf(value);
+                            } catch (IllegalArgumentException e) {
+                                throw new IllegalArgumentException(
+                                        "Invalid value for config option "
+                                                + LOGGING_LEVEL_FOR_HTTP.key()
+                                                + ": '"
+                                                + value
+                                                + "'. Valid values are: MIN, 
REQ_RESP, MAX.",
+                                        e);
+                            }
+                        });
     }
 
     @Override
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index 53be575..ef9b1ed 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.http.table.lookup;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.http.HttpLogger;
 import org.apache.flink.connector.http.HttpPostRequestCallback;
 import org.apache.flink.connector.http.HttpStatusCodeValidationFailedException;
 import org.apache.flink.connector.http.clients.PollingClient;
@@ -81,6 +82,7 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
     private final HttpLookupConfig options;
     private final Set<Integer> ignoredErrorCodes;
     private final boolean continueOnError;
+    private final HttpLogger httpLogger;
 
     public JavaNetHttpPollingClient(
             HttpClient httpClient,
@@ -111,6 +113,7 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
                         .retryConfig(RetryConfigProvider.create(config))
                         .responseChecker(new HttpResponseChecker(successCodes, 
errorCodes))
                         .build();
+        this.httpLogger = HttpLogger.getHttpLogger(options.getProperties());
     }
 
     public void open(FunctionContext context) {
@@ -141,11 +144,15 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
         HttpResponse<String> response = null;
         HttpRowDataWrapper httpRowDataWrapper = null;
         try {
+            httpLogger.logRequest(request.getHttpRequest());
             response =
                     httpClient.send(
                             () -> updateHttpRequestIfRequired(request, 
oidcProcessor),
                             BodyHandlers.ofString());
+            httpLogger.logResponse(response);
         } catch (HttpStatusCodeValidationFailedException e) {
+            // log if we fail for status code reasons.
+            httpLogger.logResponse((HttpResponse<String>) e.getResponse());
             // Case 1 http non successful response
             if (!this.continueOnError) {
                 throw e;
@@ -154,13 +161,15 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
             response = (HttpResponse<String>) e.getResponse();
             httpRowDataWrapper = processHttpResponse(response, request, true);
         } catch (Exception e) {
+            httpLogger.logExceptionResponse(request, e);
             // Case 2 Exception occurred
             if (!this.continueOnError) {
                 throw e;
             }
             String errMessage = e.getMessage();
             // some exceptions do not have messages including the 
java.net.ConnectException we can
-            // get here if the connection is bad.
+            // get here if
+            // the connection is bad.
             if (errMessage == null) {
                 errMessage = e.toString();
             }
@@ -243,10 +252,7 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
 
         var responseBody = response.body();
 
-        log.debug(
-                "Received status code [{}] for RestTableSource request with 
Server response body [{}] ",
-                response.statusCode(),
-                responseBody);
+        log.debug("Received status code [{}] for RestTableSource request", 
response.statusCode());
 
         if (this.isSuccessWithNoData(isError, responseBody, response)) {
             return HttpRowDataWrapper.builder()
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 52638f4..6f49638 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -34,7 +34,6 @@ import java.net.http.HttpRequest.Builder;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /** Base class for {@link HttpRequest} factories. */
 @Slf4j
@@ -76,11 +75,6 @@ public abstract class RequestFactoryBase implements 
HttpRequestFactory {
 
         this.headersAndValues = 
HttpHeaderUtils.toHeaderAndValueArray(headerMap);
 
-        log.debug(
-                "RequestFactoryBase headersAndValues: "
-                        + Arrays.stream(headersAndValues)
-                                .map(Object::toString)
-                                .collect(Collectors.joining(",")));
         this.httpRequestTimeOutSeconds =
                 Integer.parseInt(
                         options.getProperties()
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
index 3e66d9c..8bb87d4 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connector.http.table.lookup;
 
 import org.apache.flink.connector.http.HttpPostRequestCallback;
-import org.apache.flink.connector.http.utils.ConfigUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,21 +63,18 @@ public class Slf4JHttpLookupPostRequestCallback
 
             log.info(
                     "Got response for a request.\n  Request:\n    URL: {}\n    
"
-                            + "Method: {}\n    Headers: {}\n    Params/Body: 
{}\nResponse: null",
+                            + "Method: {}\n  Params/Body: {}\nResponse: null",
                     httpRequest.uri().toString(),
                     httpRequest.method(),
-                    headers,
                     requestEntry.getLookupQueryInfo());
         } else {
             log.info(
                     "Got response for a request.\n  Request:\n    URL: {}\n    
"
-                            + "Method: {}\n    Headers: {}\n    Params/Body: 
{}\nResponse: {}\n    Body: {}",
+                            + "Method: {}\n Params/Body: {}\nResponse status 
code: {}\n",
                     httpRequest.uri().toString(),
                     httpRequest.method(),
-                    headers,
                     requestEntry.getLookupQueryInfo(),
-                    response,
-                    
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, ""));
+                    response.statusCode());
         }
     }
 }
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
index feaf205..29ad3a6 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
@@ -19,14 +19,11 @@ package org.apache.flink.connector.http.table.sink;
 
 import org.apache.flink.connector.http.HttpPostRequestCallback;
 import org.apache.flink.connector.http.sink.httpclient.HttpRequest;
-import org.apache.flink.connector.http.utils.ConfigUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.net.http.HttpResponse;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * A {@link HttpPostRequestCallback} that logs pairs of request and response 
as <i>INFO</i> level
@@ -45,25 +42,17 @@ public class Slf4jHttpPostRequestCallback implements 
HttpPostRequestCallback<Htt
             String endpointUrl,
             Map<String, String> headerMap) {
 
-        String requestBody =
-                requestEntry.getElements().stream()
-                        .map(element -> new String(element, 
StandardCharsets.UTF_8))
-                        .collect(Collectors.joining());
-
         if (response == null) {
             log.info(
                     "Got response for a request.\n  Request:\n    "
-                            + "Method: {}\n    Body: {}\n  Response: null",
-                    requestEntry.getMethod(),
-                    requestBody);
+                            + "Method: {}\n   Response: null",
+                    requestEntry.getMethod());
         } else {
             log.info(
                     "Got response for a request.\n  Request:\n    "
-                            + "Method: {}\n    Body: {}\n  Response: {}\n    
Body: {}",
+                            + "Method: {}\n  Response status code: {}\n ",
                     requestEntry.method,
-                    requestBody,
-                    response,
-                    
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, ""));
+                    response.statusCode());
         }
     }
 }
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
new file mode 100644
index 0000000..ec35471
--- /dev/null
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
@@ -0,0 +1,213 @@
+package org.apache.flink.connector.http;
+
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
+
+import lombok.Data;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpRequest;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for logging HTTP requests. */
+public class HttpLoggerRequestTest {
+
+    @ParameterizedTest
+    @MethodSource("configProvider")
+    void testCreateStringForRequest(TestSpec testSpec) throws 
URISyntaxException {
+
+        Properties properties = new Properties();
+        if (testSpec.httpLoggingLevelType != null) {
+            properties.put(
+                    HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL,
+                    testSpec.getHttpLoggingLevelType().name());
+        }
+
+        HttpLogger httpLogger = HttpLogger.getHttpLogger(properties);
+        URI uri = new URI("http://aaa";);
+        HttpRequest.Builder httpRequestBuilder = 
HttpRequest.newBuilder().uri(uri);
+        if (testSpec.isHasHeaders()) {
+            httpRequestBuilder.headers("bbb", "ccc", "bbb", "ddd", "eee", 
"fff");
+        }
+        if (testSpec.getMethod().equals("POST")) {
+            if (testSpec.isHasBody()) {
+                httpRequestBuilder.method("POST", 
HttpRequest.BodyPublishers.ofString("my body"));
+            } else {
+                httpRequestBuilder.method("POST", 
HttpRequest.BodyPublishers.noBody());
+            }
+        }
+        
assertThat(httpLogger.createStringForRequest(httpRequestBuilder.build()))
+                .isEqualTo(testSpec.getExpectedOutput());
+    }
+
+    @Data
+    static class TestSpec {
+        final HttpLoggingLevelType httpLoggingLevelType;
+        final String method;
+        final boolean hasBody;
+        final boolean hasHeaders;
+        final String expectedOutput;
+    }
+
+    static Collection<TestSpec> configProvider() {
+        return List.of(
+                // GET no headers
+                new TestSpec(
+                        null,
+                        "GET",
+                        false,
+                        false,
+                        "HTTP GET Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "GET",
+                        false,
+                        false,
+                        "HTTP GET Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "GET",
+                        false,
+                        false,
+                        "HTTP GET Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "GET",
+                        false,
+                        false,
+                        "HTTP GET Request: URL: http://aaa, Headers: None"),
+                // GET with headers
+                new TestSpec(
+                        null,
+                        "GET",
+                        false,
+                        true,
+                        "HTTP GET Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "GET",
+                        false,
+                        true,
+                        "HTTP GET Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "GET",
+                        false,
+                        true,
+                        "HTTP GET Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "GET",
+                        false,
+                        true,
+                        "HTTP GET Request: URL: http://aaa, Headers: 
bbb:[ccc;ddd];eee:[fff]"),
+
+                // POST no headers
+                new TestSpec(
+                        null,
+                        "POST",
+                        false,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "POST",
+                        false,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "POST",
+                        false,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "POST",
+                        false,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                // POST with headers
+                new TestSpec(
+                        null,
+                        "POST",
+                        false,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "POST",
+                        false,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "POST",
+                        false,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "POST",
+                        false,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: 
bbb:[ccc;ddd];eee:[fff]"),
+
+                // POST no headers with body
+                new TestSpec(
+                        null,
+                        "POST",
+                        true,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "POST",
+                        true,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "POST",
+                        true,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "POST",
+                        true,
+                        false,
+                        "HTTP POST Request: URL: http://aaa, Headers: None"),
+                // POST with headers with body
+                new TestSpec(
+                        null,
+                        "POST",
+                        true,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MIN,
+                        "POST",
+                        true,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.REQ_RESP,
+                        "POST",
+                        true,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: ***"),
+                new TestSpec(
+                        HttpLoggingLevelType.MAX,
+                        "POST",
+                        true,
+                        true,
+                        "HTTP POST Request: URL: http://aaa, Headers: 
bbb:[ccc;ddd];eee:[fff]"));
+    }
+}
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
new file mode 100644
index 0000000..2cf18c6
--- /dev/null
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
@@ -0,0 +1,362 @@
+package org.apache.flink.connector.http;
+
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
+
+import lombok.Data;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.net.ssl.SSLSession;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for logging HTTP responses. */
+public class HttpLoggerResponseTest {
+    @ParameterizedTest
+    @MethodSource("configProvider")
+    void testCreateStringForResponse(TestSpec testSpec) throws 
URISyntaxException {
+
+        Properties properties = new Properties();
+        if (testSpec.httpLoggingLevelType != null) {
+            properties.put(
+                    HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL,
+                    testSpec.getHttpLoggingLevelType().name());
+        }
+
+        HttpLogger httpLogger = HttpLogger.getHttpLogger(properties);
+        URI uri = new URI("http://aaa";);
+        MockHttpResponse response = new MockHttpResponse();
+        response.setStatusCode(testSpec.getStatusCode());
+        if (testSpec.method.equals("GET")) {
+            
response.setRequest(HttpRequest.newBuilder().GET().uri(uri).build());
+        } else {
+            // dummy request so we can populate the method in the log
+            response.setRequest(
+                    HttpRequest.newBuilder()
+                            .POST(HttpRequest.BodyPublishers.noBody())
+                            .uri(uri)
+                            .build());
+        }
+
+        if (testSpec.isHasHeaders()) {
+            // "bbb","ccc","bbb","ddd","eee","fff"
+            Map<String, List<String>> headersMap = new HashMap<>();
+            headersMap.put("bbb", List.of("ccc", "ddd"));
+            headersMap.put("eee", List.of("fff"));
+
+            HttpHeaders headers = HttpHeaders.of(headersMap, (name, value) -> 
true);
+            response.setHeaders(headers);
+        }
+
+        if (testSpec.isHasBody()) {
+            response.setBody("my body");
+        }
+
+        assertThat(httpLogger.createStringForResponse(response))
+                .isEqualTo(testSpec.getExpectedOutput());
+    }
+
+    @Data
+    static class TestSpec {
+        final String method;
+        final int statusCode;
+        final HttpLoggingLevelType httpLoggingLevelType;
+        final boolean hasBody;
+        final boolean hasHeaders;
+        final String expectedOutput;
+    }
+
+    static Collection<TestSpec> configProvider() {
+        return Stream.concat(getTestSpecs("GET", 200).stream(), 
getTestSpecs("POST", 500).stream())
+                .collect(Collectors.toList());
+    }
+
+    private static List<TestSpec> getTestSpecs(String method, int statusCode) {
+        return List.of(
+                // no headers no body
+                new TestSpec(
+                        method,
+                        statusCode,
+                        null,
+                        false,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MIN,
+                        false,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.REQ_RESP,
+                        false,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MAX,
+                        false,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                // with headers
+
+                new TestSpec(
+                        method,
+                        statusCode,
+                        null,
+                        false,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: ***, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MIN,
+                        false,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: ***, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.REQ_RESP,
+                        false,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: ***, status code: "
+                                + statusCode
+                                + ", Response Body: None"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MAX,
+                        false,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: bbb:[ccc;ddd];eee:[fff], 
status code: "
+                                + statusCode
+                                + ", "
+                                + "Response Body: None"),
+
+                // no headers with body
+                new TestSpec(
+                        method,
+                        statusCode,
+                        null,
+                        true,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: ***"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MIN,
+                        true,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: ***"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.REQ_RESP,
+                        true,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: my body"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MAX,
+                        true,
+                        false,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: None, status code: "
+                                + statusCode
+                                + ", Response Body: my body"),
+
+                // headers with body
+                new TestSpec(
+                        method,
+                        statusCode,
+                        null,
+                        true,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, Response 
Headers: ***"
+                                + ", status code: "
+                                + statusCode
+                                + ", Response Body: ***"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MIN,
+                        true,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, Response 
Headers: ***"
+                                + ", status code: "
+                                + statusCode
+                                + ", Response Body: ***"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.REQ_RESP,
+                        true,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, Response 
Headers: ***"
+                                + ", status code: "
+                                + statusCode
+                                + ", Response Body: my body"),
+                new TestSpec(
+                        method,
+                        statusCode,
+                        HttpLoggingLevelType.MAX,
+                        true,
+                        true,
+                        "HTTP "
+                                + method
+                                + " Response: URL: http://aaa, "
+                                + "Response Headers: bbb:[ccc;ddd];eee:[fff]"
+                                + ", status code: "
+                                + statusCode
+                                + ", Response Body: my body"));
+    }
+
+    private class MockHttpResponse implements HttpResponse {
+        private int statusCode = 0;
+        private HttpRequest request = null;
+        private HttpHeaders headers = null;
+        private String body = null;
+
+        @Override
+        public int statusCode() {
+            return this.statusCode;
+        }
+
+        @Override
+        public HttpRequest request() {
+            return this.request;
+        }
+
+        @Override
+        public Optional<HttpResponse> previousResponse() {
+            return Optional.empty();
+        }
+
+        @Override
+        public HttpHeaders headers() {
+            return this.headers;
+        }
+
+        @Override
+        public Object body() {
+            return this.body;
+        }
+
+        @Override
+        public Optional<SSLSession> sslSession() {
+            return Optional.empty();
+        }
+
+        @Override
+        public URI uri() {
+            URI uri;
+            try {
+                uri = new URI("http://aaa";);
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(e);
+            }
+            return uri;
+        }
+
+        @Override
+        public HttpClient.Version version() {
+            return null;
+        }
+
+        public void setStatusCode(int statusCode) {
+            this.statusCode = statusCode;
+        }
+
+        public void setRequest(HttpRequest request) {
+            this.request = request;
+        }
+
+        public void setHeaders(HttpHeaders headers) {
+            this.headers = headers;
+        }
+
+        public void setBody(String body) {
+            this.body = body;
+        }
+    }
+}
diff --git a/flink-connector-http/src/test/resources/simpleLogger.properties 
b/flink-connector-http/src/test/resources/simpleLogger.properties
new file mode 100644
index 0000000..32961f7
--- /dev/null
+++ b/flink-connector-http/src/test/resources/simpleLogger.properties
@@ -0,0 +1,2 @@
+org.slf4j.simpleLogger.defaultLogLevel=INFO
+org.apache.flink.connector.http.HttpLogger=DEBUG

Reply via email to