This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 7eacff0  [FLINK-38864] Introduce `IGNORE_STATUS_CODE` as HTTP 
completion state
7eacff0 is described below

commit 7eacff0e994548be6c6f213933966a8c80714de9
Author: David Radley <[email protected]>
AuthorDate: Tue Jan 13 15:21:19 2026 +0000

    [FLINK-38864] Introduce `IGNORE_STATUS_CODE` as HTTP completion state
---
 docs/content.zh/docs/connectors/table/http.md      |  12 +-
 docs/content/docs/connectors/table/http.md         |   8 ++
 .../http/table/lookup/HttpCompletionState.java     |   3 +-
 .../http/table/lookup/HttpTableLookupFunction.java |   3 -
 .../table/lookup/JavaNetHttpPollingClient.java     |  21 +++-
 .../JavaNetHttpPollingClientConnectionTest.java    | 121 +++++++++++++++++++++
 6 files changed, 161 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index b49c915..3fad839 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Timeouts](#timeouts)
     * [Source table HTTP status code](#source-table-http-status-code)
     * [Retries and handling errors (Lookup 
source)](#retries-and-handling-errors-lookup-source)
-        * [Retry strategy](#retry-strategy)
+      * [Retry strategy](#retry-strategy)
       * [Lookup multiple results](#lookup-multiple-results)
   * [Working with HTTP sink tables](#working-with-http-sink-tables)
     * [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Basic Authentication](#basic-authentication)
     * [OIDC Bearer Authentication](#oidc-bearer-authentication)
   * [Logging the HTTP content](#logging-the-http-content)
-      * [Restrictions at this time](#restrictions-at-this-time)
+    * [Restrictions at this time](#restrictions-at-this-time)
 <!-- TOC -->
 ## Dependencies
 
@@ -503,6 +503,10 @@ this means that these columns will be null for nullable 
columns and hold a defau
 
 When using `http.source.lookup.continue-on-error` as true, consider adding 
extra metadata columns that will surface information about failures into your 
stream.
 
+Note that if metadata columns are specified and the status code is ignored, 
then a row containing metadata columns will be produced. If
+the status code is ignored and there are no metadata columns defined, then no 
row will be emitted; this ensures that the expected
+inner join behaviour still occurs.
+
 Metadata columns can be specified and hold http information. They are optional 
read-only columns that must be declared VIRTUAL to exclude them during an 
INSERT INTO operation.
 
 | Key                   | Data Type                        | Description       
                     |
@@ -520,10 +524,14 @@ Metadata columns can be specified and hold http 
information. They are optional r
 | HTTP_ERROR_STATUS              | HTTP error status code              |
 | EXCEPTION                      | An Exception occurred               |
 | UNABLE_TO_DESERIALIZE_RESPONSE | Unable to deserialize HTTP response |
+| IGNORE_STATUS_CODE             | Status code is ignored              |
 
 If the `error-string` metadata column is defined on the table and the call 
succeeds then it will have a null value.
 When the HTTP response cannot be deserialized, then the 
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
 and the `error-string` will be the response body.
+When the HTTP status code is in the 
`http.source.lookup.ignored-response-codes`, then the `http-completion-state` 
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain 
information about the API call that
+occurred.
 
 When a HTTP lookup call fails and populates the metadata columns with the 
error information, the expected enrichment columns from the HTTP call
 are not populated, this means that they will be null for nullable columns and 
hold a default value for the type for non-nullable columns.
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index 0738dde..543a9e5 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -503,6 +503,10 @@ this means that these columns will be null for nullable 
columns and hold a defau
 
 When using `http.source.lookup.continue-on-error` as true, consider adding 
extra metadata columns that will surface information about failures into your 
stream.
 
+Note that if metadata columns are specified and the status code is ignored, 
then a row containing metadata columns will be produced. If
+the status code is ignored and there are no metadata columns defined, then no 
row will be emitted; this ensures that the expected
+inner join behaviour still occurs.
+
 Metadata columns can be specified and hold http information. They are optional 
read-only columns that must be declared VIRTUAL to exclude them during an 
INSERT INTO operation.
 
 | Key                   | Data Type                        | Description       
                     |
@@ -520,10 +524,14 @@ Metadata columns can be specified and hold http 
information. They are optional r
 | HTTP_ERROR_STATUS              | HTTP error status code              |
 | EXCEPTION                      | An Exception occurred               |
 | UNABLE_TO_DESERIALIZE_RESPONSE | Unable to deserialize HTTP response |
+| IGNORE_STATUS_CODE             | Status code is ignored              |
 
 If the `error-string` metadata column is defined on the table and the call 
succeeds then it will have a null value.
 When the HTTP response cannot be deserialized, then the 
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
 and the `error-string` will be the response body.
+When the HTTP status code is in the 
`http.source.lookup.ignored-response-codes`, then the `http-completion-state` 
will 
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain 
information about the API call that 
+occurred.
 
 When a HTTP lookup call fails and populates the metadata columns with the 
error information, the expected enrichment columns from the HTTP call
 are not populated, this means that they will be null for nullable columns and 
hold a default value for the type for non-nullable columns.
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
index 6d3a802..a6ac1d0 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpCompletionState.java
@@ -22,5 +22,6 @@ public enum HttpCompletionState {
     HTTP_ERROR_STATUS,
     EXCEPTION,
     SUCCESS,
-    UNABLE_TO_DESERIALIZE_RESPONSE
+    UNABLE_TO_DESERIALIZE_RESPONSE,
+    IGNORE_STATUS_CODE
 }
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
index 3b73544..5e42f0e 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
@@ -104,9 +104,6 @@ public class HttpTableLookupFunction extends LookupFunction 
{
         int physicalArity = -1;
 
         GenericRowData producedRow = null;
-        if (httpRowDataWrapper.shouldIgnore()) {
-            return Collections.emptyList();
-        }
         // grab the actual data if there is any from the response and populate 
the producedRow with
         // it
         if (!httpCollector.isEmpty()) {
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index 84f4192..64e9adb 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -124,6 +124,13 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
     @Override
     public HttpRowDataWrapper pull(RowData lookupRow) {
         if (lookupRow == null) {
+            /*
+             * We are not sure if the following code can be driven. Tested 
with an equality of booleans (which should
+             * be a filter), but with the latest flink this is rejected by the 
planner.
+             *
+             * If there is a way for lookupRow to be null here, then the 
results will not populate any metadata fields
+             * and we should add a new completion state to identify this 
scenario.
+             */
             return HttpRowDataWrapper.builder()
                     .data(Collections.emptyList())
                     .httpCompletionState(HttpCompletionState.SUCCESS)
@@ -254,7 +261,19 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
         var responseBody = response.body();
 
         log.debug("Received status code [{}] for RestTableSource request", 
response.statusCode());
-
+        final boolean ignoreStatusCode = ignoreResponse(response);
+        if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || 
ignoreStatusCode)) {
+            return HttpRowDataWrapper.builder()
+                    .data(Collections.emptyList())
+                    .httpCompletionState(HttpCompletionState.SUCCESS)
+                    .httpHeadersMap(response.headers().map())
+                    .httpStatusCode(response.statusCode())
+                    .httpCompletionState(
+                            ignoreStatusCode
+                                    ? HttpCompletionState.IGNORE_STATUS_CODE
+                                    : HttpCompletionState.SUCCESS)
+                    .build();
+        }
         if (this.isSuccessWithNoData(isError, responseBody, response)) {
             return HttpRowDataWrapper.builder()
                     .data(Collections.emptyList())
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index 91c793e..3345cf7 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -539,4 +539,125 @@ class JavaNetHttpPollingClientConnectionTest {
                                         .withBody(
                                                 readTestFile(SAMPLES_FOLDER + 
"HttpResult.json"))));
     }
+
+    @Test
+    void shouldSetIgnoreStatusCodeCompletionStateForIgnoredStatusCodes()
+            throws ConfigurationException {
+        // GIVEN - Configure client with ignored status codes (404, 503)
+        configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, 
"404,503");
+        configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+        configuration.setString(
+                
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+        // Set up WireMock to return 404
+        this.stubMapping =
+                wireMockServer.stubFor(
+                        get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+                                .withHeader("Content-Type", 
equalTo("application/json"))
+                                
.willReturn(aResponse().withStatus(404).withBody("Not Found")));
+
+        JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+        // WHEN - Pull data with a lookup row
+        HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+        // THEN - Verify completion state is IGNORE_STATUS_CODE
+        assertThat(result.getHttpCompletionState())
+                .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+        assertThat(result.getData()).isEmpty();
+        assertThat(result.getHttpStatusCode()).isEqualTo(404);
+    }
+
+    @Test
+    void shouldSetIgnoreStatusCodeForMultipleIgnoredCodes() throws 
ConfigurationException {
+        // GIVEN - Configure client with multiple ignored status codes
+        configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, 
"404,503,429");
+        configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+        configuration.setString(
+                
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+        // Set up WireMock to return 503
+        this.stubMapping =
+                wireMockServer.stubFor(
+                        get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+                                .withHeader("Content-Type", 
equalTo("application/json"))
+                                .willReturn(
+                                        aResponse()
+                                                .withStatus(503)
+                                                .withBody("Service 
Unavailable")));
+
+        JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+        // WHEN
+        HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+        // THEN - Verify 503 is also treated as ignored
+        assertThat(result.getHttpCompletionState())
+                .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+        assertThat(result.getData()).isEmpty();
+        assertThat(result.getHttpStatusCode()).isEqualTo(503);
+    }
+
+    @Test
+    void shouldNotSetIgnoreStatusCodeForNonIgnoredCodes() throws 
ConfigurationException {
+        // GIVEN - Configure client with ignored status codes (404, 503)
+        configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, 
"404,503");
+        configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+        configuration.setString(
+                
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+        // Set up WireMock to return 200 (success, not ignored)
+        this.stubMapping = setUpServerStub(200);
+
+        JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+        // WHEN
+        HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+        // THEN - Verify completion state is SUCCESS, not IGNORE_STATUS_CODE
+        
assertThat(result.getHttpCompletionState()).isEqualTo(HttpCompletionState.SUCCESS);
+        assertThat(result.getData()).isNotEmpty();
+        assertThat(result.getHttpStatusCode()).isEqualTo(200);
+    }
+
+    @Test
+    void shouldReturnMetadataForIgnoredStatusCode() throws 
ConfigurationException {
+        // GIVEN - Configure client with ignored status codes (404)
+        configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, 
"404");
+        configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, "200");
+        configuration.setString(
+                
HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), "");
+
+        // Set up WireMock to return 404 with custom headers
+        this.stubMapping =
+                wireMockServer.stubFor(
+                        get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+                                .withHeader("Content-Type", 
equalTo("application/json"))
+                                .willReturn(
+                                        aResponse()
+                                                .withStatus(404)
+                                                .withBody("Not Found")
+                                                .withHeader("X-Request-Id", 
"12345")
+                                                .withHeader("X-Custom-Header", 
"custom-value")));
+
+        JavaNetHttpPollingClient pollingClient = setUpPollingClient();
+
+        // WHEN - Pull data with a lookup row
+        HttpRowDataWrapper result = pollingClient.pull(lookupRowData);
+
+        // THEN - Verify completion state is IGNORE_STATUS_CODE
+        assertThat(result.getHttpCompletionState())
+                .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE);
+        // Verify data is empty (no body content)
+        assertThat(result.getData()).isEmpty();
+        // Verify metadata is present - status code
+        assertThat(result.getHttpStatusCode()).isEqualTo(404);
+        // Verify metadata is present - headers
+        assertThat(result.getHttpHeadersMap()).isNotNull();
+        assertThat(result.getHttpHeadersMap()).containsKey("X-Request-Id");
+        
assertThat(result.getHttpHeadersMap().get("X-Request-Id")).containsExactly("12345");
+        assertThat(result.getHttpHeadersMap()).containsKey("X-Custom-Header");
+        assertThat(result.getHttpHeadersMap().get("X-Custom-Header"))
+                .containsExactly("custom-value");
+    }
 }

Reply via email to