This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 2cf55a1 [FLINK-38481] Make HTTP request/response logging configurable
2cf55a1 is described below
commit 2cf55a1efce7c6fb89e6398bb18e18557c851829
Author: David Radley <[email protected]>
AuthorDate: Thu Dec 4 12:50:54 2025 +0000
[FLINK-38481] Make HTTP request/response logging configurable
---
docs/content.zh/docs/connectors/table/http.md | 38 ++-
docs/content/docs/connectors/table/http.md | 36 +-
.../apache/flink/connector/http/HttpLogger.java | 133 ++++++++
.../flink/connector/http/HttpLoggingLevelType.java | 12 +
.../http/config/HttpConnectorConfigConstants.java | 2 +
.../sink/httpclient/BatchRequestSubmitter.java | 18 +-
.../sink/httpclient/JavaNetSinkHttpClient.java | 7 +-
.../http/table/lookup/BodyBasedRequestFactory.java | 7 +-
.../table/lookup/HttpLookupConnectorOptions.java | 8 +
.../table/lookup/HttpLookupTableSourceFactory.java | 19 ++
.../table/lookup/JavaNetHttpPollingClient.java | 16 +-
.../http/table/lookup/RequestFactoryBase.java | 6 -
.../lookup/Slf4JHttpLookupPostRequestCallback.java | 10 +-
.../table/sink/Slf4jHttpPostRequestCallback.java | 19 +-
.../connector/http/HttpLoggerRequestTest.java | 213 ++++++++++++
.../connector/http/HttpLoggerResponseTest.java | 362 +++++++++++++++++++++
.../src/test/resources/simpleLogger.properties | 2 +
17 files changed, 849 insertions(+), 59 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index a517c88..4c6b94d 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -51,7 +51,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Timeouts](#timeouts)
* [Source table HTTP status code](#source-table-http-status-code)
* [Retries (Lookup source)](#retries-lookup-source)
- * [Retry strategy](#retry-strategy)
+ * [Retry strategy](#retry-strategy)
* [Lookup multiple results](#lookup-multiple-results)
* [Working with HTTP sink tables](#working-with-http-sink-tables)
* [HTTP Sink](#http-sink)
@@ -156,7 +156,7 @@ Or for REST POST method they will be converted to Json and
used as request body.
### Lookup Source Connector Options
-Note the options with the prefix _http_ are the HTTP connector specific
options, the others are Flink options.
+Note the options with the prefix _http_ are the HTTP connector specific
options, the others are Flink options.
| Option |
Required | Description/Value
[...]
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
@@ -180,12 +180,12 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
[...]
| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
[...]
| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
[...]
-| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
[...]
| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
[...]
| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
[...]
| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
[...]
| http.source.lookup.request-callback |
optional | Specify which `HttpLookupPostRequestCallback` implementation to use.
By default, it is set to `slf4j-lookup-logger` corresponding to
`Slf4jHttpLookupPostRequestCallback`.
[...]
| http.source.lookup.connection.timeout |
optional | Source table connection timeout. Default - no value.
[...]
+| http.source.lookup.http-version |
optional | Version of HTTP to use for lookup http requests. The valid values
are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option
may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1
endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and
'HTTP/2 upgrade not supported'.
[...]
| http.source.lookup.success-codes |
optional | Comma separated http codes considered as success response. Use
[1-5]XX for groups and '!' character for excluding. The default is 2XX.
[...]
| http.source.lookup.retry-codes |
optional | Comma separated http codes considered as transient errors. Use
[1-5]XX for groups and '!' character for excluding. The default is 500,503,504.
[...]
| http.source.lookup.ignored-response-codes |
optional | Comma separated http codes. Content for these responses will be
ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored
responses together with `http.source.lookup.success-codes` are considered as
successful.
[...]
@@ -204,7 +204,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
### Query Creators
-In the above example we see that HTTP GET operations and HTTP POST operations
result in different mapping of the columns to the
+In the above example we see that HTTP GET operations and HTTP POST operations
result in different mapping of the columns to the
HTTP request content. In reality, you will want to have move control over how
the SQL columns are mapped to the HTTP content.
The HTTP connector supplies a number of Query Creators that you can use define
these mappings.
@@ -230,7 +230,7 @@ The HTTP connector supplies a number of Query Creators that
you can use define t
<td></td>
<td>✓ for PUTs and POSTs</td>
</tr>
-
+
</tbody>
</table>
@@ -245,7 +245,7 @@ parameters `http.request.query-param-fields`,
`http.request.body-fields` and `ht
The default Query Creator is called _http-generic-json-url_. For body based
queries such as POST/PUT requests, the
([GenericGetQueryCreator](flink-connector-http/src/main/java/org/apache/flink/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))is
provided as a default query creator. This implementation uses Flink's
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
to convert RowData object into Json String.
-For GET requests can be used for query parameter based queries.
+For GET requests can be used for query parameter based queries.
The _http-generic-json-url_ allows for using custom formats that will perform
serialization to Json. Thanks to this, users can create their own logic for
converting RowData to Json Strings suitable for their HTTP endpoints and use
this logic as custom format
with HTTP Lookup connector and SQL queries.
@@ -331,7 +331,7 @@ pagination methods. Currently, the connector supports only
two simple approaches
## Working with HTTP sink tables
### HTTP Sink
-The following example shows the minimum Table API example to create a sink:
+The following example shows the minimum Table API example to create a sink:
```roomsql
CREATE TABLE http (
@@ -350,8 +350,9 @@ Then use `INSERT` SQL statement to send data to your HTTP
endpoint:
INSERT INTO http VALUES (1, 'Ninette'), (2, 'Hedy')
```
+
When `'format' = 'json'` is specified on the table definition, the HTTP sink
sends json payloads. It is possible to change the format of the payload by
specifying
-another format name.
+another format name.
### Sink Connector Options
@@ -366,6 +367,7 @@ another format name.
| sink.requests.max-buffered | optional | Maximum number of
buffered records before applying backpressure.
|
| sink.flush-buffer.size | optional | The maximum size of a
batch of entries that may be sent to the HTTP endpoint measured in bytes.
|
| sink.flush-buffer.timeout | optional | Threshold time in
milliseconds for an element to be in a buffer before being flushed.
|
+| http.logging.level | optional | Logging levels for
HTTP content. Valid values are `MIN` (the default), `REQ_RESP` and `MAX`.
|
| http.sink.request-callback | optional | Specify which
`HttpPostRequestCallback` implementation to use. By default, it is set to
`slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`.
|
| http.sink.error.code | optional | List of HTTP status
codes that should be treated as errors by HTTP Sink, separated with comma.
|
| http.sink.error.code.exclude | optional | List of HTTP status
codes that should be excluded from the `http.sink.error.code` list, separated
with comma.
|
@@ -576,6 +578,26 @@ an example of a customised grant type token request. The
supplied `token request
a new one is requested. There is a property
`http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new
tokens will
be requested if the current time is later than the cached token expiry time
minus `http.security.oidc.token.expiry.reduction`.
+## Logging the HTTP content
+Debug level logging has been added for class
`org.apache.flink.connector.http.HttpLogger`. To enable this, alter the log4j
properties.
+This logging puts out log entries for the HTTP requests and responses. This
can be useful for diagnostics to confirm that HTTP requests have been issued
and what
+that HTTP responses or an exception has occurred (for example connection
Refused).
+
+Logging HTTP may not be appropriate for production systems; where sensitive
information is not allowed into the logs. But in development environments it is
useful
+to be able to see HTTP content. Sensitive information can occur in the headers
for example authentication tokens and passwords. Also the HTTP request and
response bodies
+could sensitive. The default minimal logging should be used in production. For
development, you can specify config option `http.logging.level`.
+This dictates the amount of content that debug logging will show around HTTP
calls; the valid values are:
+
+| log level | Request method | URI | HTTP Body | Response status code |
Headers |
+|-----------|----------------|-----|-----------|----------------------|---------|
+| MIN | Y | Y | N | Y | N
|
+| REQ_RESP | Y | Y | Y | Y | N
|
+| MAX | Y | Y | Y | Y | Y
|
+
+Notes:
+- you can customize what is traced for lookups using the
`http.source.lookup.request-callback`.
+- where there is an N in the table the output is obfuscated.
+
#### Restrictions at this time
* No authentication is applied to the token request.
* The processing does not use the refresh token if it present.
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index dd52871..de70e5e 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -51,7 +51,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Timeouts](#timeouts)
* [Source table HTTP status code](#source-table-http-status-code)
* [Retries (Lookup source)](#retries-lookup-source)
- * [Retry strategy](#retry-strategy)
+ * [Retry strategy](#retry-strategy)
* [Lookup multiple results](#lookup-multiple-results)
* [Working with HTTP sink tables](#working-with-http-sink-tables)
* [HTTP Sink](#http-sink)
@@ -155,7 +155,7 @@ Or for REST POST method they will be converted to Json and
used as request body.
### Lookup Source Connector Options
-Note the options with the prefix _http_ are the HTTP connector specific
options, the others are Flink options.
+Note the options with the prefix _http_ are the HTTP connector specific
options, the others are Flink options.
| Option |
Required | Description/Value
[...]
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
@@ -203,7 +203,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
### Query Creators
-In the above example we see that HTTP GET operations and HTTP POST operations
result in different mapping of the columns to the
+In the above example we see that HTTP GET operations and HTTP POST operations
result in different mapping of the columns to the
HTTP request content. In reality, you will want to have move control over how
the SQL columns are mapped to the HTTP content.
The HTTP connector supplies a number of Query Creators that you can use define
these mappings.
@@ -229,7 +229,7 @@ The HTTP connector supplies a number of Query Creators that
you can use define t
<td></td>
<td>✓ for PUTs and POSTs</td>
</tr>
-
+
</tbody>
</table>
@@ -244,7 +244,7 @@ parameters `http.request.query-param-fields`,
`http.request.body-fields` and `ht
The default Query Creator is called _http-generic-json-url_. For body based
queries such as POST/PUT requests, the
([GenericGetQueryCreator](flink-connector-http/src/main/java/org/apache/flink/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))is
provided as a default query creator. This implementation uses Flink's
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
to convert RowData object into Json String.
-For GET requests can be used for query parameter based queries.
+For GET requests can be used for query parameter based queries.
The _http-generic-json-url_ allows for using custom formats that will perform
serialization to Json. Thanks to this, users can create their own logic for
converting RowData to Json Strings suitable for their HTTP endpoints and use
this logic as custom format
with HTTP Lookup connector and SQL queries.
@@ -330,7 +330,7 @@ pagination methods. Currently, the connector supports only
two simple approaches
## Working with HTTP sink tables
### HTTP Sink
-The following example shows the minimum Table API example to create a sink:
+The following example shows the minimum Table API example to create a sink:
```roomsql
CREATE TABLE http (
@@ -349,8 +349,9 @@ Then use `INSERT` SQL statement to send data to your HTTP
endpoint:
INSERT INTO http VALUES (1, 'Ninette'), (2, 'Hedy')
```
+
When `'format' = 'json'` is specified on the table definition, the HTTP sink
sends json payloads. It is possible to change the format of the payload by
specifying
-another format name.
+another format name.
### Sink Connector Options
@@ -365,6 +366,7 @@ another format name.
| sink.requests.max-buffered | optional | Maximum number of
buffered records before applying backpressure.
|
| sink.flush-buffer.size | optional | The maximum size of a
batch of entries that may be sent to the HTTP endpoint measured in bytes.
|
| sink.flush-buffer.timeout | optional | Threshold time in
milliseconds for an element to be in a buffer before being flushed.
|
+| http.logging.level | optional | Logging levels for
HTTP content. Valid values are `MIN` (the default), `REQ_RESP` and `MAX`.
|
| http.sink.request-callback | optional | Specify which
`HttpPostRequestCallback` implementation to use. By default, it is set to
`slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`.
|
| http.sink.error.code | optional | List of HTTP status
codes that should be treated as errors by HTTP Sink, separated with comma.
|
| http.sink.error.code.exclude | optional | List of HTTP status
codes that should be excluded from the `http.sink.error.code` list, separated
with comma.
|
@@ -575,6 +577,26 @@ an example of a customised grant type token request. The
supplied `token request
a new one is requested. There is a property
`http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new
tokens will
be requested if the current time is later than the cached token expiry time
minus `http.security.oidc.token.expiry.reduction`.
+## Logging the HTTP content
+Debug level logging has been added for class
`org.apache.flink.connector.http.HttpLogger`. To enable this, alter the log4j
properties.
+This logging puts out log entries for the HTTP requests and responses. This
can be useful for diagnostics to confirm that HTTP requests have been issued
and what
+that HTTP responses or an exception has occurred (for example connection
Refused).
+
+Logging HTTP may not be appropriate for production systems; where sensitive
information is not allowed into the logs. But in development environments it is
useful
+to be able to see HTTP content. Sensitive information can occur in the headers
for example authentication tokens and passwords. Also the HTTP request and
response bodies
+could sensitive. The default minimal logging should be used in production. For
development, you can specify config option `http.logging.level`.
+This dictates the amount of content that debug logging will show around HTTP
calls; the valid values are:
+
+| log level | Request method | URI | HTTP Body | Response status code |
Headers |
+|-----------|----------------|-----|-----------|----------------------|---------|
+| MIN | Y | Y | N | Y | N
|
+| REQ_RESP | Y | Y | Y | Y | N
|
+| MAX | Y | Y | Y | Y | Y
|
+
+Notes:
+- you can customize what is traced for lookups using the
`http.source.lookup.request-callback`.
+- where there is an N in the table the output is obfuscated.
+
#### Restrictions at this time
* No authentication is applied to the token request.
* The processing does not use the refresh token if it present.
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
new file mode 100644
index 0000000..759bbcd
--- /dev/null
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLogger.java
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.http;
+
+import
org.apache.flink.connector.http.table.lookup.HttpLookupSourceRequestEntry;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
+
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
+
+/**
+ * HttpLogger, this is a class to perform HTTP content logging based on a
level defined in
+ * configuration.
+ */
+@Slf4j
+public class HttpLogger implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final HttpLoggingLevelType httpLoggingLevelType;
+
+ private HttpLogger(Properties properties) {
+ String code = (String) properties.get(HTTP_LOGGING_LEVEL);
+ this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code);
+ }
+
+ public static HttpLogger getHttpLogger(Properties properties) {
+ return new HttpLogger(properties);
+ }
+
+ public void logRequest(HttpRequest httpRequest) {
+ if (log.isDebugEnabled()) {
+ log.debug(createStringForRequest(httpRequest));
+ }
+ }
+
+ public void logResponse(HttpResponse<String> response) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(createStringForResponse(response));
+ }
+ }
+
+ public void logRequestBody(String body) {
+ if (log.isDebugEnabled()) {
+ log.debug(createStringForBody(body));
+ }
+ }
+
+ public void logExceptionResponse(HttpLookupSourceRequestEntry request,
Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(createStringForExceptionResponse(request, e));
+ }
+ }
+
+ String createStringForRequest(HttpRequest httpRequest) {
+ String headersForLog = getHeadersForLog(httpRequest.headers());
+ return String.format(
+ "HTTP %s Request: URL: %s, Headers: %s",
+ httpRequest.method(), httpRequest.uri().toString(),
headersForLog);
+ }
+
+ private String getHeadersForLog(HttpHeaders httpHeaders) {
+ if (httpHeaders == null) {
+ return "None";
+ }
+ Map<String, List<String>> headersMap = httpHeaders.map();
+ if (headersMap.isEmpty()) {
+ return "None";
+ }
+ if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) {
+ StringJoiner headers = new StringJoiner(";");
+ for (Map.Entry<String, List<String>> reqHeaders :
headersMap.entrySet()) {
+ StringJoiner values = new StringJoiner(";");
+ for (String value : reqHeaders.getValue()) {
+ values.add(value);
+ }
+ String header = reqHeaders.getKey() + ":[" + values + "]";
+ headers.add(header);
+ }
+ return headers.toString();
+ }
+ return "***";
+ }
+
+ String createStringForResponse(HttpResponse<String> response) {
+ String headersForLog = getHeadersForLog(response.headers());
+
+ String bodyForLog = "***";
+ if (response.body() == null || response.body().isEmpty()) {
+ bodyForLog = "None";
+ } else {
+ if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
+ bodyForLog = response.body().toString();
+ }
+ }
+ return String.format(
+ "HTTP %s Response: URL: %s,"
+ + " Response Headers: %s, status code: %s, Response
Body: %s",
+ response.request().method(),
+ response.uri(),
+ headersForLog,
+ response.statusCode(),
+ bodyForLog);
+ }
+
+ private String createStringForExceptionResponse(
+ HttpLookupSourceRequestEntry request, Exception e) {
+ HttpRequest httpRequest = request.getHttpRequest();
+ return String.format(
+ "HTTP %s Exception Response: URL: %s Exception %s",
+ httpRequest.method(), httpRequest.uri(), e);
+ }
+
+ String createStringForBody(String body) {
+ String bodyForLog = "***";
+ if (body == null || body.isEmpty()) {
+ bodyForLog = "None";
+ } else {
+ if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
+ bodyForLog = body.toString();
+ }
+ }
+ return bodyForLog;
+ }
+}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
new file mode 100644
index 0000000..7cb63eb
--- /dev/null
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/HttpLoggingLevelType.java
@@ -0,0 +1,12 @@
+package org.apache.flink.connector.http;
+
+/** Defines the level of http content that will be logged. */
+public enum HttpLoggingLevelType {
+ MIN,
+ REQ_RESP,
+ MAX;
+
+ public static HttpLoggingLevelType valueOfStr(String code) {
+ return code == null ? MIN : valueOf(code);
+ }
+}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 9fc3b70..5bbc0e4 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -109,6 +109,8 @@ public final class HttpConnectorConfigConstants {
public static final String CONTINUE_ON_ERROR = SOURCE_LOOKUP_PREFIX +
"continue-on-error";
+ public static final String HTTP_LOGGING_LEVEL = FLINK_CONNECTOR_HTTP +
"logging.level";
+
public static final String SOURCE_PROXY_HOST = SOURCE_LOOKUP_PREFIX +
"proxy.host";
public static final String SOURCE_PROXY_PORT = SOURCE_LOOKUP_PREFIX +
"proxy.port";
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
index 19b45d6..6733ba2 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/BatchRequestSubmitter.java
@@ -73,18 +73,18 @@ public class BatchRequestSubmitter extends
AbstractRequestSubmitter {
}
var responseFutures = new
ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
- String previousReqeustMethod = requestsToSubmit.get(0).method;
+ String previousRequestMethod = requestsToSubmit.get(0).method;
List<HttpSinkRequestEntry> requestBatch = new
ArrayList<>(httpRequestBatchSize);
for (var entry : requestsToSubmit) {
if (requestBatch.size() == httpRequestBatchSize
- || !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
+ || !previousRequestMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, requestBatch));
requestBatch.clear();
}
requestBatch.add(entry);
- previousReqeustMethod = entry.method;
+ previousRequestMethod = entry.method;
}
// submit anything that left
@@ -98,9 +98,9 @@ public class BatchRequestSubmitter extends
AbstractRequestSubmitter {
}
private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
- String endpointUrl, List<HttpSinkRequestEntry> reqeustBatch) {
+ String endpointUrl, List<HttpSinkRequestEntry> requestBatch) {
- HttpRequest httpRequest = buildHttpRequest(reqeustBatch,
URI.create(endpointUrl));
+ HttpRequest httpRequest = buildHttpRequest(requestBatch,
URI.create(endpointUrl));
return httpClient
.sendAsync(httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(
@@ -115,11 +115,11 @@ public class BatchRequestSubmitter extends
AbstractRequestSubmitter {
publishingThreadPool);
}
- private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry>
reqeustBatch, URI endpointUri) {
+ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry>
requestBatch, URI endpointUri) {
try {
- var method = reqeustBatch.get(0).method;
- List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
+ var method = requestBatch.get(0).method;
+ List<byte[]> elements = new ArrayList<>(requestBatch.size());
BodyPublisher publisher;
// By default, Java's BodyPublishers.ofByteArrays(elements) will
just put Jsons
@@ -127,7 +127,7 @@ public class BatchRequestSubmitter extends
AbstractRequestSubmitter {
// What we do here is we pack every Json/byteArray into Json Array
hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add(BATCH_START_BYTES);
- for (HttpSinkRequestEntry entry : reqeustBatch) {
+ for (HttpSinkRequestEntry entry : requestBatch) {
elements.add(entry.element);
elements.add(BATCH_ELEMENT_DELIM_BYTES);
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
index 6216288..0072b70 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.http.sink.httpclient;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.http.HttpLogger;
import org.apache.flink.connector.http.HttpPostRequestCallback;
import org.apache.flink.connector.http.clients.SinkHttpClient;
import org.apache.flink.connector.http.clients.SinkHttpClientResponse;
@@ -57,6 +58,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
private final RequestSubmitter requestSubmitter;
+ private final HttpLogger httpLogger;
+
public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
@@ -85,6 +88,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
this.headersAndValues =
HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
this.requestSubmitter =
requestSubmitterFactory.createSubmitter(properties,
headersAndValues);
+
+ this.httpLogger = HttpLogger.getHttpLogger(properties);
}
@Override
@@ -114,7 +119,7 @@ public class JavaNetSinkHttpClient implements
SinkHttpClient {
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();
-
+ this.httpLogger.logResponse(response.getResponse().get());
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl,
headerMap);
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
index 68fee55..886fe5d 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.http.table.lookup;
+import org.apache.flink.connector.http.HttpLogger;
import org.apache.flink.connector.http.LookupQueryCreator;
import org.apache.flink.connector.http.preprocessor.HeaderPreprocessor;
import org.apache.flink.connector.http.utils.uri.URIBuilder;
@@ -38,6 +39,7 @@ import java.net.http.HttpRequest.Builder;
public class BodyBasedRequestFactory extends RequestFactoryBase {
private final String methodName;
+ private final HttpLogger httpLogger;
public BodyBasedRequestFactory(
String methodName,
@@ -47,6 +49,7 @@ public class BodyBasedRequestFactory extends
RequestFactoryBase {
super(lookupQueryCreator, headerPreprocessor, options);
this.methodName = methodName.toUpperCase();
+ this.httpLogger = HttpLogger.getHttpLogger(options.getProperties());
}
/**
@@ -59,8 +62,10 @@ public class BodyBasedRequestFactory extends
RequestFactoryBase {
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
HttpRequest.Builder builder =
super.setUpRequestMethod(lookupQueryInfo);
+ String body = lookupQueryInfo.getLookupQuery();
builder.uri(constructUri(lookupQueryInfo))
- .method(methodName,
BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
+ .method(methodName, BodyPublishers.ofString(body));
+ this.httpLogger.logRequestBody(body);
return builder;
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index aedf59d..e5e7d58 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -19,11 +19,13 @@ package org.apache.flink.connector.http.table.lookup;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.http.HttpLoggingLevelType;
import org.apache.flink.connector.http.retry.RetryStrategyType;
import java.time.Duration;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.CONTINUE_ON_ERROR;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_ENDPOINT_URL;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
@@ -137,6 +139,12 @@ public class HttpLookupConnectorOptions {
"Continue job on error. "
+ "This includes unsuccessful HTTP status
codes and client side Exceptions, such as Connection Refused.");
+ public static final ConfigOption<String> LOGGING_LEVEL_FOR_HTTP =
+ ConfigOptions.key(HTTP_LOGGING_LEVEL)
+ .stringType()
+ .defaultValue(String.valueOf(HttpLoggingLevelType.MIN))
+ .withDescription("Logging levels");
+
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
ConfigOptions.key(SOURCE_PROXY_HOST)
.stringType()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
index c5d0d3c..934c570 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.http.table.lookup;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.http.HttpLoggingLevelType;
import org.apache.flink.connector.http.HttpPostRequestCallbackFactory;
import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
import org.apache.flink.connector.http.utils.ConfigUtils;
@@ -47,6 +48,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOGGING_LEVEL_FOR_HTTP;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.LOOKUP_REQUEST_FORMAT;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.REQUEST_CALLBACK_IDENTIFIER;
@@ -98,6 +100,7 @@ public class HttpLookupTableSourceFactory implements
DynamicTableSourceFactory {
helper.validateExcept(
// properties coming from
org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
+ "lookup-request.",
HttpConnectorConfigConstants.FLINK_CONNECTOR_HTTP,
LOOKUP_REQUEST_FORMAT.key());
validateHttpLookupSourceOptions(readable);
@@ -139,6 +142,22 @@ public class HttpLookupTableSourceFactory implements
DynamicTableSourceFactory {
+ " is configured.");
}
});
+ tableOptions
+ .getOptional(LOGGING_LEVEL_FOR_HTTP)
+ .ifPresent(
+ value -> {
+ try {
+ HttpLoggingLevelType.valueOf(value);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid value for config option "
+ + LOGGING_LEVEL_FOR_HTTP.key()
+ + ": '"
+ + value
+ + "'. Valid values are: MIN,
REQ_RESP, MAX.",
+ e);
+ }
+ });
}
@Override
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index 53be575..ef9b1ed 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.http.table.lookup;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.http.HttpLogger;
import org.apache.flink.connector.http.HttpPostRequestCallback;
import org.apache.flink.connector.http.HttpStatusCodeValidationFailedException;
import org.apache.flink.connector.http.clients.PollingClient;
@@ -81,6 +82,7 @@ public class JavaNetHttpPollingClient implements
PollingClient {
private final HttpLookupConfig options;
private final Set<Integer> ignoredErrorCodes;
private final boolean continueOnError;
+ private final HttpLogger httpLogger;
public JavaNetHttpPollingClient(
HttpClient httpClient,
@@ -111,6 +113,7 @@ public class JavaNetHttpPollingClient implements
PollingClient {
.retryConfig(RetryConfigProvider.create(config))
.responseChecker(new HttpResponseChecker(successCodes,
errorCodes))
.build();
+ this.httpLogger = HttpLogger.getHttpLogger(options.getProperties());
}
public void open(FunctionContext context) {
@@ -141,11 +144,15 @@ public class JavaNetHttpPollingClient implements
PollingClient {
HttpResponse<String> response = null;
HttpRowDataWrapper httpRowDataWrapper = null;
try {
+ httpLogger.logRequest(request.getHttpRequest());
response =
httpClient.send(
() -> updateHttpRequestIfRequired(request,
oidcProcessor),
BodyHandlers.ofString());
+ httpLogger.logResponse(response);
} catch (HttpStatusCodeValidationFailedException e) {
+ // log if we fail for status code reasons.
+ httpLogger.logResponse((HttpResponse<String>) e.getResponse());
// Case 1 http non successful response
if (!this.continueOnError) {
throw e;
@@ -154,13 +161,15 @@ public class JavaNetHttpPollingClient implements
PollingClient {
response = (HttpResponse<String>) e.getResponse();
httpRowDataWrapper = processHttpResponse(response, request, true);
} catch (Exception e) {
+ httpLogger.logExceptionResponse(request, e);
// Case 2 Exception occurred
if (!this.continueOnError) {
throw e;
}
String errMessage = e.getMessage();
// some exceptions do not have messages including the
java.net.ConnectException we can
- // get here if the connection is bad.
+ // get here if
+ // the connection is bad.
if (errMessage == null) {
errMessage = e.toString();
}
@@ -243,10 +252,7 @@ public class JavaNetHttpPollingClient implements
PollingClient {
var responseBody = response.body();
- log.debug(
- "Received status code [{}] for RestTableSource request with
Server response body [{}] ",
- response.statusCode(),
- responseBody);
+ log.debug("Received status code [{}] for RestTableSource request",
response.statusCode());
if (this.isSuccessWithNoData(isError, responseBody, response)) {
return HttpRowDataWrapper.builder()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 52638f4..6f49638 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -34,7 +34,6 @@ import java.net.http.HttpRequest.Builder;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
-import java.util.stream.Collectors;
/** Base class for {@link HttpRequest} factories. */
@Slf4j
@@ -76,11 +75,6 @@ public abstract class RequestFactoryBase implements
HttpRequestFactory {
this.headersAndValues =
HttpHeaderUtils.toHeaderAndValueArray(headerMap);
- log.debug(
- "RequestFactoryBase headersAndValues: "
- + Arrays.stream(headersAndValues)
- .map(Object::toString)
- .collect(Collectors.joining(",")));
this.httpRequestTimeOutSeconds =
Integer.parseInt(
options.getProperties()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
index 3e66d9c..8bb87d4 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/Slf4JHttpLookupPostRequestCallback.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.http.table.lookup;
import org.apache.flink.connector.http.HttpPostRequestCallback;
-import org.apache.flink.connector.http.utils.ConfigUtils;
import lombok.extern.slf4j.Slf4j;
@@ -64,21 +63,18 @@ public class Slf4JHttpLookupPostRequestCallback
log.info(
"Got response for a request.\n Request:\n URL: {}\n
"
- + "Method: {}\n Headers: {}\n Params/Body:
{}\nResponse: null",
+ + "Method: {}\n Params/Body: {}\nResponse: null",
httpRequest.uri().toString(),
httpRequest.method(),
- headers,
requestEntry.getLookupQueryInfo());
} else {
log.info(
"Got response for a request.\n Request:\n URL: {}\n
"
- + "Method: {}\n Headers: {}\n Params/Body:
{}\nResponse: {}\n Body: {}",
+ + "Method: {}\n Params/Body: {}\nResponse status
code: {}\n",
httpRequest.uri().toString(),
httpRequest.method(),
- headers,
requestEntry.getLookupQueryInfo(),
- response,
-
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, ""));
+ response.statusCode());
}
}
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
index feaf205..29ad3a6 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/Slf4jHttpPostRequestCallback.java
@@ -19,14 +19,11 @@ package org.apache.flink.connector.http.table.sink;
import org.apache.flink.connector.http.HttpPostRequestCallback;
import org.apache.flink.connector.http.sink.httpclient.HttpRequest;
-import org.apache.flink.connector.http.utils.ConfigUtils;
import lombok.extern.slf4j.Slf4j;
import java.net.http.HttpResponse;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
-import java.util.stream.Collectors;
/**
* A {@link HttpPostRequestCallback} that logs pairs of request and response
as <i>INFO</i> level
@@ -45,25 +42,17 @@ public class Slf4jHttpPostRequestCallback implements
HttpPostRequestCallback<Htt
String endpointUrl,
Map<String, String> headerMap) {
- String requestBody =
- requestEntry.getElements().stream()
- .map(element -> new String(element,
StandardCharsets.UTF_8))
- .collect(Collectors.joining());
-
if (response == null) {
log.info(
"Got response for a request.\n Request:\n "
- + "Method: {}\n Body: {}\n Response: null",
- requestEntry.getMethod(),
- requestBody);
+ + "Method: {}\n Response: null",
+ requestEntry.getMethod());
} else {
log.info(
"Got response for a request.\n Request:\n "
- + "Method: {}\n Body: {}\n Response: {}\n
Body: {}",
+ + "Method: {}\n Response status code: {}\n ",
requestEntry.method,
- requestBody,
- response,
-
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, ""));
+ response.statusCode());
}
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
new file mode 100644
index 0000000..ec35471
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerRequestTest.java
@@ -0,0 +1,213 @@
+package org.apache.flink.connector.http;
+
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
+
+import lombok.Data;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpRequest;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for logging HTTP requests. */
+public class HttpLoggerRequestTest {
+
+ @ParameterizedTest
+ @MethodSource("configProvider")
+ void testCreateStringForRequest(TestSpec testSpec) throws
URISyntaxException {
+
+ Properties properties = new Properties();
+ if (testSpec.httpLoggingLevelType != null) {
+ properties.put(
+ HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL,
+ testSpec.getHttpLoggingLevelType().name());
+ }
+
+ HttpLogger httpLogger = HttpLogger.getHttpLogger(properties);
+ URI uri = new URI("http://aaa");
+ HttpRequest.Builder httpRequestBuilder =
HttpRequest.newBuilder().uri(uri);
+ if (testSpec.isHasHeaders()) {
+ httpRequestBuilder.headers("bbb", "ccc", "bbb", "ddd", "eee",
"fff");
+ }
+ if (testSpec.getMethod().equals("POST")) {
+ if (testSpec.isHasBody()) {
+ httpRequestBuilder.method("POST",
HttpRequest.BodyPublishers.ofString("my body"));
+ } else {
+ httpRequestBuilder.method("POST",
HttpRequest.BodyPublishers.noBody());
+ }
+ }
+
assertThat(httpLogger.createStringForRequest(httpRequestBuilder.build()))
+ .isEqualTo(testSpec.getExpectedOutput());
+ }
+
+ @Data
+ static class TestSpec {
+ final HttpLoggingLevelType httpLoggingLevelType;
+ final String method;
+ final boolean hasBody;
+ final boolean hasHeaders;
+ final String expectedOutput;
+ }
+
+ static Collection<TestSpec> configProvider() {
+ return List.of(
+ // GET no headers
+ new TestSpec(
+ null,
+ "GET",
+ false,
+ false,
+ "HTTP GET Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "GET",
+ false,
+ false,
+ "HTTP GET Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "GET",
+ false,
+ false,
+ "HTTP GET Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "GET",
+ false,
+ false,
+ "HTTP GET Request: URL: http://aaa, Headers: None"),
+ // GET with headers
+ new TestSpec(
+ null,
+ "GET",
+ false,
+ true,
+ "HTTP GET Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "GET",
+ false,
+ true,
+ "HTTP GET Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "GET",
+ false,
+ true,
+ "HTTP GET Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "GET",
+ false,
+ true,
+ "HTTP GET Request: URL: http://aaa, Headers:
bbb:[ccc;ddd];eee:[fff]"),
+
+ // POST no headers
+ new TestSpec(
+ null,
+ "POST",
+ false,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "POST",
+ false,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "POST",
+ false,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "POST",
+ false,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ // POST with headers
+ new TestSpec(
+ null,
+ "POST",
+ false,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "POST",
+ false,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "POST",
+ false,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "POST",
+ false,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers:
bbb:[ccc;ddd];eee:[fff]"),
+
+ // POST no headers with body
+ new TestSpec(
+ null,
+ "POST",
+ true,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "POST",
+ true,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "POST",
+ true,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "POST",
+ true,
+ false,
+ "HTTP POST Request: URL: http://aaa, Headers: None"),
+ // POST with headers with body
+ new TestSpec(
+ null,
+ "POST",
+ true,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MIN,
+ "POST",
+ true,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.REQ_RESP,
+ "POST",
+ true,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers: ***"),
+ new TestSpec(
+ HttpLoggingLevelType.MAX,
+ "POST",
+ true,
+ true,
+ "HTTP POST Request: URL: http://aaa, Headers:
bbb:[ccc;ddd];eee:[fff]"));
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
new file mode 100644
index 0000000..2cf18c6
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/HttpLoggerResponseTest.java
@@ -0,0 +1,362 @@
+package org.apache.flink.connector.http;
+
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
+
+import lombok.Data;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.net.ssl.SSLSession;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for logging HTTP responses. */
+public class HttpLoggerResponseTest {
+ @ParameterizedTest
+ @MethodSource("configProvider")
+ void testCreateStringForResponse(TestSpec testSpec) throws
URISyntaxException {
+
+ Properties properties = new Properties();
+ if (testSpec.httpLoggingLevelType != null) {
+ properties.put(
+ HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL,
+ testSpec.getHttpLoggingLevelType().name());
+ }
+
+ HttpLogger httpLogger = HttpLogger.getHttpLogger(properties);
+ URI uri = new URI("http://aaa");
+ MockHttpResponse response = new MockHttpResponse();
+ response.setStatusCode(testSpec.getStatusCode());
+ if (testSpec.method.equals("GET")) {
+
response.setRequest(HttpRequest.newBuilder().GET().uri(uri).build());
+ } else {
+ // dummy request so we can populate the method in the log
+ response.setRequest(
+ HttpRequest.newBuilder()
+ .POST(HttpRequest.BodyPublishers.noBody())
+ .uri(uri)
+ .build());
+ }
+
+ if (testSpec.isHasHeaders()) {
+ // "bbb","ccc","bbb","ddd","eee","fff"
+ Map<String, List<String>> headersMap = new HashMap<>();
+ headersMap.put("bbb", List.of("ccc", "ddd"));
+ headersMap.put("eee", List.of("fff"));
+
+ HttpHeaders headers = HttpHeaders.of(headersMap, (name, value) ->
true);
+ response.setHeaders(headers);
+ }
+
+ if (testSpec.isHasBody()) {
+ response.setBody("my body");
+ }
+
+ assertThat(httpLogger.createStringForResponse(response))
+ .isEqualTo(testSpec.getExpectedOutput());
+ }
+
+ @Data
+ static class TestSpec {
+ final String method;
+ final int statusCode;
+ final HttpLoggingLevelType httpLoggingLevelType;
+ final boolean hasBody;
+ final boolean hasHeaders;
+ final String expectedOutput;
+ }
+
+ static Collection<TestSpec> configProvider() {
+ return Stream.concat(getTestSpecs("GET", 200).stream(),
getTestSpecs("POST", 500).stream())
+ .collect(Collectors.toList());
+ }
+
+ private static List<TestSpec> getTestSpecs(String method, int statusCode) {
+ return List.of(
+ // no headers no body
+ new TestSpec(
+ method,
+ statusCode,
+ null,
+ false,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MIN,
+ false,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.REQ_RESP,
+ false,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MAX,
+ false,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ // with headers
+
+ new TestSpec(
+ method,
+ statusCode,
+ null,
+ false,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: ***, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MIN,
+ false,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: ***, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.REQ_RESP,
+ false,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: ***, status code: "
+ + statusCode
+ + ", Response Body: None"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MAX,
+ false,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: bbb:[ccc;ddd];eee:[fff],
status code: "
+ + statusCode
+ + ", "
+ + "Response Body: None"),
+
+ // no headers with body
+ new TestSpec(
+ method,
+ statusCode,
+ null,
+ true,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: ***"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MIN,
+ true,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: ***"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.REQ_RESP,
+ true,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: my body"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MAX,
+ true,
+ false,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: None, status code: "
+ + statusCode
+ + ", Response Body: my body"),
+
+ // headers with body
+ new TestSpec(
+ method,
+ statusCode,
+ null,
+ true,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, Response
Headers: ***"
+ + ", status code: "
+ + statusCode
+ + ", Response Body: ***"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MIN,
+ true,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, Response
Headers: ***"
+ + ", status code: "
+ + statusCode
+ + ", Response Body: ***"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.REQ_RESP,
+ true,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, Response
Headers: ***"
+ + ", status code: "
+ + statusCode
+ + ", Response Body: my body"),
+ new TestSpec(
+ method,
+ statusCode,
+ HttpLoggingLevelType.MAX,
+ true,
+ true,
+ "HTTP "
+ + method
+ + " Response: URL: http://aaa, "
+ + "Response Headers: bbb:[ccc;ddd];eee:[fff]"
+ + ", status code: "
+ + statusCode
+ + ", Response Body: my body"));
+ }
+
+ private class MockHttpResponse implements HttpResponse {
+ private int statusCode = 0;
+ private HttpRequest request = null;
+ private HttpHeaders headers = null;
+ private String body = null;
+
+ @Override
+ public int statusCode() {
+ return this.statusCode;
+ }
+
+ @Override
+ public HttpRequest request() {
+ return this.request;
+ }
+
+ @Override
+ public Optional<HttpResponse> previousResponse() {
+ return Optional.empty();
+ }
+
+ @Override
+ public HttpHeaders headers() {
+ return this.headers;
+ }
+
+ @Override
+ public Object body() {
+ return this.body;
+ }
+
+ @Override
+ public Optional<SSLSession> sslSession() {
+ return Optional.empty();
+ }
+
+ @Override
+ public URI uri() {
+ URI uri;
+ try {
+ uri = new URI("http://aaa");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ return uri;
+ }
+
+ @Override
+ public HttpClient.Version version() {
+ return null;
+ }
+
+ public void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ public void setRequest(HttpRequest request) {
+ this.request = request;
+ }
+
+ public void setHeaders(HttpHeaders headers) {
+ this.headers = headers;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+ }
+}
diff --git a/flink-connector-http/src/test/resources/simpleLogger.properties
b/flink-connector-http/src/test/resources/simpleLogger.properties
new file mode 100644
index 0000000..32961f7
--- /dev/null
+++ b/flink-connector-http/src/test/resources/simpleLogger.properties
@@ -0,0 +1,2 @@
+org.slf4j.simpleLogger.defaultLogLevel=INFO
+org.apache.flink.connector.http.HttpLogger=DEBUG