This is an automated email from the ASF dual-hosted git repository. davidrad pushed a commit to branch FLINK-39333 in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
commit 7671ee9fdf8138a868a7eb3bd2bbd192f2ec03c4 Author: [email protected] <[email protected]> AuthorDate: Thu Apr 16 15:44:25 2026 +0100 Addressed review comments Signed-off-by: [email protected] <[email protected]> --- docs/content.zh/docs/connectors/table/http.md | 19 +++++++-------- docs/content/docs/connectors/table/http.md | 3 ++- .../GenericJsonAndUrlQueryCreator.java | 20 ++++++++++++++-- .../GenericJsonAndUrlQueryCreatorTest.java | 27 ++++++++++++++++++++++ 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/http.md b/docs/content.zh/docs/connectors/table/http.md index 2ebc85f..af41510 100644 --- a/docs/content.zh/docs/connectors/table/http.md +++ b/docs/content.zh/docs/connectors/table/http.md @@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup Joins](https://nightlies.apache.org/f * [Timeouts](#timeouts) * [Source table HTTP status code](#source-table-http-status-code) * [Retries and handling errors (Lookup source)](#retries-and-handling-errors-lookup-source) - * [Retry strategy](#retry-strategy) + * [Retry strategy](#retry-strategy) * [Lookup multiple results](#lookup-multiple-results) * [Working with HTTP sink tables](#working-with-http-sink-tables) * [HTTP Sink](#http-sink) @@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup Joins](https://nightlies.apache.org/f * [Basic Authentication](#basic-authentication) * [OIDC Bearer Authentication](#oidc-bearer-authentication) * [Logging the HTTP content](#logging-the-http-content) - * [Restrictions at this time](#restrictions-at-this-time) + * [Restrictions at this time](#restrictions-at-this-time) <!-- TOC --> ## Dependencies @@ -87,10 +87,11 @@ The Flink connector does have some changes that you need to be aware of if you a * Existing java applications will need to be recompiled to pick up the new flink package names. * Existing application and SQL need to be amended to use the new connector option names. The new option names do not have - the _com.getindata.http_ prefix, the prefix is now _http_. +the _com.getindata.http_ prefix, the prefix is now _http_. * The name of the connector and the identifiers of components that are discovered have been changed, so that the GetInData jar file can co-exist - with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector. -* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`. | optional | Used for the +with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector. +* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`. +* Note that if you were incorrectly using `gid.connector.http.request.query-param-fields` with POST or PUT did not give an error. This connector corrects the behaviour so specifying `http.request.query-param-fields` with POST or PUT does give an error. ## Working with HTTP lookup source tables @@ -383,10 +384,10 @@ This query creator allows you to populate API calls very flexibly. To do this ef 8) If you start from an OpenAPI specification that contains nested content required as a lookup join key, then use `http.request.body-template` to map top-level columns into that structure. 9) Response content is mapped to matching named top-level columns in the lookup table. You should arrange your table columns so that some are request columns (all top level) and some are response columns. 10) Use single quotes for the value of `http.request.body-template` so you do not need to escape the double quotes, and add newline characters for readability. -11) If you want to enrich every event with the same API content, you can specify a placeholder as the complete URL the `url`, then use `http.request.url-map` to map it. In this scenario switching on caching is advised to avoid repeated identical API calls. +11) If you want to enrich every event with the same API content, you can specify a placeholder as the complete URL the `url`, then use `http.request.url-map` to map it. In this scenario switching on caching is advised to avoid repeated identical API calls. 12) Note that columns in SQL tables (the DDL) do not have a natural way to distinguish between request and response fields. Where possible, use the API field name as column names in the DDL; this minimizes the number of columns you need to define. 13) The exception to 12) is when a response API field name is the same as a request API field **and** they have incompatible types. In this case, define the request column with a different name, then use `http.request.query-param-fields-with-key`, `http.request.body-template`, and/or `http.request.url-map` to provide the mapping to the API field. -14) Note the columns representing the response are those that should be used for enrichment. +14) Note the columns representing the response are those that should be used for enrichment. ### Format considerations @@ -677,8 +678,8 @@ Metadata columns can be specified and hold http information. They are optional r If the `error-string` metadata column is defined on the table and the call succeeds then it will have a null value. When the HTTP response cannot be deserialized, then the `http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE` and the `error-string` will be the response body. -When the HTTP status code is in the `http.source.lookup.ignored-response-codes`, then the `http-completion-state` will -be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain information about the API call that +When the HTTP status code is in the `http.source.lookup.ignored-response-codes`, then the `http-completion-state` will +be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain information about the API call that occurred. When a HTTP lookup call fails and populates the metadata columns with the error information, the expected enrichment columns from the HTTP call diff --git a/docs/content/docs/connectors/table/http.md b/docs/content/docs/connectors/table/http.md index 9b9d94e..af41510 100644 --- a/docs/content/docs/connectors/table/http.md +++ b/docs/content/docs/connectors/table/http.md @@ -90,7 +90,8 @@ The Flink connector does have some changes that you need to be aware of if you a the _com.getindata.http_ prefix, the prefix is now _http_. * The name of the connector and the identifiers of components that are discovered have been changed, so that the GetInData jar file can co-exist with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector. -* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`. | optional | Used for the +* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`. +* Note that if you were incorrectly using `gid.connector.http.request.query-param-fields` with POST or PUT did not give an error. This connector corrects the behaviour so specifying `http.request.query-param-fields` with POST or PUT does give an error. ## Working with HTTP lookup source tables diff --git a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java index 6f12367..50d964a 100644 --- a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java +++ b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java @@ -140,9 +140,13 @@ public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator { String columnName = entry.getKey(); String queryParamKey = entry.getValue(); JsonNode value = jsonObject.get(columnName); - if (value != null) { - jsonObjectForQueryParams.set(queryParamKey, value); + if (value == null) { + throw new IllegalArgumentException( + String.format( + "Query parameter mapping references column '%s' that does not exist in the lookup row. Available columns: %s", + columnName, getAvailableColumnNames(jsonObject))); } + jsonObjectForQueryParams.set(queryParamKey, value); } // TODO can we convertToQueryParameters for all ops // and not use/deprecate bodyBasedUrlQueryParams @@ -206,6 +210,18 @@ public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator { return result.toString(); } + /** + * Get a comma-separated list of available column names from the JSON object. + * + * @param jsonObject the JSON object containing field names + * @return comma-separated string of available column names + */ + private String getAvailableColumnNames(ObjectNode jsonObject) { + StringJoiner joiner = new StringJoiner(", "); + jsonObject.fieldNames().forEachRemaining(joiner::add); + return joiner.toString(); + } + /** * Create a Row from a RowData and DataType. * diff --git a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java index 80ed2f7..9f7620d 100644 --- a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java +++ b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java @@ -721,6 +721,33 @@ class GenericJsonAndUrlQueryCreatorTest { assertThat(queryString).doesNotContain("status="); } + @Test + public void testQueryParamFieldsWithKeyMissingColumn() { + // GIVEN - Query param map references a column that doesn't exist + Configuration config = new Configuration(); + config.set(LOOKUP_METHOD, "GET"); + Map<String, String> queryParamMap = new java.util.HashMap<>(); + queryParamMap.put("misspelledCol", "customer"); // misspelledCol doesn't exist + config.set(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY, queryParamMap); + + LookupRow lookupRow = getLookupRow(KEY_1); // Only has key1 + + GenericJsonAndUrlQueryCreator creator = + (GenericJsonAndUrlQueryCreator) + new GenericJsonAndUrlQueryCreatorFactory() + .createLookupQueryCreator( + config, + lookupRow, + getTableContext(config, RESOLVED_SCHEMA)); + + // WHEN/THEN - Should throw IllegalArgumentException with helpful message + assertThatThrownBy(() -> creator.createLookupQuery(ROWDATA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("misspelledCol") + .hasMessageContaining("does not exist") + .hasMessageContaining("Available columns:"); + } + // Helper methods private static GenericRowData getRowData(int numFields, String value) { if (numFields == 1) {
