This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 7eacff0 [FLINK-38864] Introduce `IGNORE_STATUS_CODE` as HTTP
completion state
7eacff0 is described below
commit 7eacff0e994548be6c6f213933966a8c80714de9
Author: David Radley <[email protected]>
AuthorDate: Tue Jan 13 15:21:19 2026 +0000
[FLINK-38864] Introduce `IGNORE_STATUS_CODE` as HTTP completion state
---
docs/content.zh/docs/connectors/table/http.md | 12 +-
docs/content/docs/connectors/table/http.md | 8 ++
.../http/table/lookup/HttpCompletionState.java | 3 +-
.../http/table/lookup/HttpTableLookupFunction.java | 3 -
.../table/lookup/JavaNetHttpPollingClient.java | 21 +++-
.../JavaNetHttpPollingClientConnectionTest.java | 121 +++++++++++++++++++++
6 files changed, 161 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index b49c915..3fad839 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Timeouts](#timeouts)
* [Source table HTTP status code](#source-table-http-status-code)
* [Retries and handling errors (Lookup
source)](#retries-and-handling-errors-lookup-source)
- * [Retry strategy](#retry-strategy)
+ * [Retry strategy](#retry-strategy)
* [Lookup multiple results](#lookup-multiple-results)
* [Working with HTTP sink tables](#working-with-http-sink-tables)
* [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Basic Authentication](#basic-authentication)
* [OIDC Bearer Authentication](#oidc-bearer-authentication)
* [Logging the HTTP content](#logging-the-http-content)
- * [Restrictions at this time](#restrictions-at-this-time)
+ * [Restrictions at this time](#restrictions-at-this-time)
<!-- TOC -->
## Dependencies
@@ -503,6 +503,10 @@ this means that these columns will be null for nullable
columns and hold a defau
When using `http.source.lookup.continue-on-error` as true, consider adding
extra metadata columns that will surface information about failures into your
stream.
+Note that if metadata columns are specified and the status code is ignored,
then a row containing metadata columns will be produced. If
+the status code is ignored and there are no metadata columns defined, then no
row will be emitted; this ensures that the expected
+inner join behaviour still occurs.
+
Metadata columns can be specified and hold http information. They are optional
read-only columns that must be declared VIRTUAL to exclude them during an
INSERT INTO operation.
| Key | Data Type | Description
|
@@ -520,10 +524,14 @@ Metadata columns can be specified and hold http
information. They are optional r
| HTTP_ERROR_STATUS | HTTP error status code |
| EXCEPTION | An Exception occurred |
| UNABLE_TO_DESERIALIZE_RESPONSE | Unable to deserialize HTTP response |
+| IGNORE_STATUS_CODE | Status code is ignored |
If the `error-string` metadata column is defined on the table and the call
succeeds then it will have a null value.
When the HTTP response cannot be deserialized, then the
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
and the `error-string` will be the response body.
+When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
+occurred.
When a HTTP lookup call fails and populates the metadata columns with the
error information, the expected enrichment columns from the HTTP call
are not populated, this means that they will be null for nullable columns and
hold a default value for the type for non-nullable columns.
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index 0738dde..543a9e5 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -503,6 +503,10 @@ this means that these columns will be null for nullable
columns and hold a defau
When using `http.source.lookup.continue-on-error` as true, consider adding
extra metadata columns that will surface information about failures into your
stream.
+Note that if metadata columns are specified and the status code is ignored,
then a row containing metadata columns will be produced. If
+the status code is ignored and there are no metadata columns defined, then no
row will be emitted; this ensures that the expected
+inner join behaviour still occurs.
+
Metadata columns can be specified and hold http information. They are optional
read-only columns that must be declared VIRTUAL to exclude them during an
INSERT INTO operation.
| Key | Data Type | Description
|
@@ -520,10 +524,14 @@ Metadata columns can be specified and hold http
information. They are optional r
| HTTP_ERROR_STATUS | HTTP error status code |
| EXCEPTION | An Exception occurred |
| UNABLE_TO_DESERIALIZE_RESPONSE | Unable to deserialize HTTP response |
+| IGNORE_STATUS_CODE | Status code is ignored |
If the `error-string` metadata column is defined on the table and the call
succeeds then it will have a null value.
When the HTTP response cannot be deserialized, then the
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
and the `error-string` will be the response body.
+When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
+occurred.
When a HTTP lookup call fails and populates the metadata columns with the
error information, the expected enrichment columns from the HTTP call
are not populated, this means that they will be null for nullable columns and
hold a default value for the type for non-nullable columns.
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
index 6d3a802..a6ac1d0 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
@@ -22,5 +22,6 @@ public enum HttpCompletionState {
HTTP_ERROR_STATUS,
EXCEPTION,
SUCCESS,
- UNABLE_TO_DESERIALIZE_RESPONSE
+ UNABLE_TO_DESERIALIZE_RESPONSE,
+ IGNORE_STATUS_CODE
}
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
index 3b73544..5e42f0e 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
@@ -104,9 +104,6 @@ public class HttpTableLookupFunction extends LookupFunction
{
int physicalArity = -1;
GenericRowData producedRow = null;
- if (httpRowDataWrapper.shouldIgnore()) {
- return Collections.emptyList();
- }
// grab the actual data if there is any from the response and populate
the producedRow with
// it
if (!httpCollector.isEmpty()) {
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index 84f4192..64e9adb 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -124,6 +124,13 @@ public class JavaNetHttpPollingClient implements
PollingClient {
@Override
public HttpRowDataWrapper pull(RowData lookupRow) {
if (lookupRow == null) {
+ /*
+ * We are not sure if the following code can be driven. Tested
with an equality of booleans (which should
+ * be a filter), but with the latest flink this is rejected by the
planner.
+ *
+ * If there is a way for lookupRow to be null here, then the
results will not populate any metadata fields
+ * and we should add a new completion state to identify this
scenario.
+ */
return HttpRowDataWrapper.builder()
.data(Collections.emptyList())
.httpCompletionState(HttpCompletionState.SUCCESS)
@@ -254,7 +261,19 @@ public class JavaNetHttpPollingClient implements
PollingClient {
var responseBody = response.body();
log.debug("Received status code [{}] for RestTableSource request",
response.statusCode());
-
+ final boolean ignoreStatusCode = ignoreResponse(response);
+ if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) ||
ignoreStatusCode)) {
+ return HttpRowDataWrapper.builder()
+ .data(Collections.emptyList())
+ .httpCompletionState(HttpCompletionState.SUCCESS)
+ .httpHeadersMap(response.headers().map())
+ .httpStatusCode(response.statusCode())
+ .httpCompletionState(
+ ignoreStatusCode
+ ? HttpCompletionState.IGNORE_STATUS_CODE
+ : HttpCompletionState.SUCCESS)
+ .build();
+ }
if (this.isSuccessWithNoData(isError, responseBody, response)) {
return HttpRowDataWrapper.builder()
.data(Collections.emptyList())
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index 91c793e..3345cf7 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -539,4 +539,125 @@ class JavaNetHttpPollingClientConnectionTest {
.withBody(
readTestFile(SAMPLES_FOLDER +
"HttpResult.json"))));
}
+
+ @Test
+ void shouldSetIgnoreStatusCodeCompletionStateForIgnoredStatusCodes()
+ throws ConfigurationException {
+ // GIVEN - Configure client with ignored status codes (404, 503)
+ configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
"404,503");
+ configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+ configuration.setString(
+
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+ // Set up WireMock to return 404
+ this.stubMapping =
+ wireMockServer.stubFor(
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type",
equalTo("application/json"))
+
.willReturn(aResponse().withStatus(404).withBody("Not Found")));
+
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+ // WHEN - Pull data with a lookup row
+ HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+ // THEN - Verify completion state is IGNORE_STATUS_CODE
+ assertThat(result.getHttpCompletionState())
+ .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+ assertThat(result.getData()).isEmpty();
+ assertThat(result.getHttpStatusCode()).isEqualTo(404);
+ }
+
+ @Test
+ void shouldSetIgnoreStatusCodeForMultipleIgnoredCodes() throws
ConfigurationException {
+ // GIVEN - Configure client with multiple ignored status codes
+ configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
"404,503,429");
+ configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+ configuration.setString(
+
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+ // Set up WireMock to return 503
+ this.stubMapping =
+ wireMockServer.stubFor(
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type",
equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(503)
+ .withBody("Service
Unavailable")));
+
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+ // WHEN
+ HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+ // THEN - Verify 503 is also treated as ignored
+ assertThat(result.getHttpCompletionState())
+ .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+ assertThat(result.getData()).isEmpty();
+ assertThat(result.getHttpStatusCode()).isEqualTo(503);
+ }
+
+ @Test
+ void shouldNotSetIgnoreStatusCodeForNonIgnoredCodes() throws
ConfigurationException {
+ // GIVEN - Configure client with ignored status codes (404, 503)
+ configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
"404,503");
+ configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+ configuration.setString(
+
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+ // Set up WireMock to return 200 (success, not ignored)
+ this.stubMapping = setUpServerStub(200);
+
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+ // WHEN
+ HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+ // THEN - Verify completion state is SUCCESS, not IGNORE_STATUS_CODE
+
assertThat(result.getHttpCompletionState()).isEqualTo(HttpCompletionState.SUCCESS);
+ assertThat(result.getData()).isNotEmpty();
+ assertThat(result.getHttpStatusCode()).isEqualTo(200);
+ }
+
+ @Test
+ void shouldReturnMetadataForIgnoredStatusCode() throws
ConfigurationException {
+ // GIVEN - Configure client with ignored status codes (404)
+ configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
"404");
+ configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+ configuration.setString(
+
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+ // Set up WireMock to return 404 with custom headers
+ this.stubMapping =
+ wireMockServer.stubFor(
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type",
equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(404)
+ .withBody("Not Found")
+ .withHeader("X-Request-Id",
"12345")
+ .withHeader("X-Custom-Header",
"custom-value")));
+
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+ // WHEN - Pull data with a lookup row
+ HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+ // THEN - Verify completion state is IGNORE_STATUS_CODE
+ assertThat(result.getHttpCompletionState())
+ .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+ // Verify data is empty (no body content)
+ assertThat(result.getData()).isEmpty();
+ // Verify metadata is present - status code
+ assertThat(result.getHttpStatusCode()).isEqualTo(404);
+ // Verify metadata is present - headers
+ assertThat(result.getHttpHeadersMap()).isNotNull();
+ assertThat(result.getHttpHeadersMap()).containsKey("X-Request-Id");
+
assertThat(result.getHttpHeadersMap().get("X-Request-Id")).containsExactly("12345");
+ assertThat(result.getHttpHeadersMap()).containsKey("X-Custom-Header");
+ assertThat(result.getHttpHeadersMap().get("X-Custom-Header"))
+ .containsExactly("custom-value");
+ }
}