This is an automated email from the ASF dual-hosted git repository.
davidradl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 11400d2 [FLINK-39333] add query params key (#38)
11400d2 is described below
commit 11400d2dfc526549e514450fadb69b2025e30aa0
Author: David Radley <[email protected]>
AuthorDate: Fri Apr 17 17:21:08 2026 +0100
[FLINK-39333] add query params key (#38)
* [FLINK-39333] add query params key
Signed-off-by: [email protected] <[email protected]>
---
docs/content.zh/docs/connectors/table/http.md | 117 ++++--
docs/content/docs/connectors/table/http.md | 107 +++++-
.../GenericJsonAndUrlQueryCreator.java | 39 +-
.../GenericJsonAndUrlQueryCreatorFactory.java | 163 ++++++++-
...onTransform.java => AbstractJsonTransform.java} | 106 ++++--
.../flink/connector/http/app/HttpStubApp.java | 4 +-
.../flink/connector/http/app/JsonTransformApp.java | 36 ++
.../http/app/JsonTransformCustomerObject.java | 41 +++
.../lookup/HttpLookupTableSourceITCaseTest.java | 223 +++++++++++-
.../JavaNetHttpPollingClientWithWireTest.java | 2 +-
.../connector/http/table/lookup/JsonTransform.java | 126 -------
.../http/table/lookup/JsonTransformLookup.java | 41 +++
.../GenericJsonAndUrlQueryCreatorFactoryTest.java | 392 ++++++++++++++++++++-
.../GenericJsonAndUrlQueryCreatorTest.java | 264 ++++++++++++--
14 files changed, 1420 insertions(+), 241 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index 9b6e0b4..2ae6abe 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Timeouts](#timeouts)
* [Source table HTTP status code](#source-table-http-status-code)
* [Retries and handling errors (Lookup
source)](#retries-and-handling-errors-lookup-source)
- * [Retry strategy](#retry-strategy)
+ * [Retry strategy](#retry-strategy)
* [Lookup multiple results](#lookup-multiple-results)
* [Working with HTTP sink tables](#working-with-http-sink-tables)
* [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup
Joins](https://nightlies.apache.org/f
* [Basic Authentication](#basic-authentication)
* [OIDC Bearer Authentication](#oidc-bearer-authentication)
* [Logging the HTTP content](#logging-the-http-content)
- * [Restrictions at this time](#restrictions-at-this-time)
+ * [Restrictions at this time](#restrictions-at-this-time)
<!-- TOC -->
## Dependencies
@@ -87,10 +87,11 @@ The Flink connector does have some changes that you need to
be aware of if you a
* Existing java applications will need to be recompiled to pick up the new
flink package names.
* Existing application and SQL need to be amended to use the new connector
option names. The new option names do not have
- the _com.getindata.http_ prefix, the prefix is now _http_.
+the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
- with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
-* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
| optional | Used for the
+with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
+* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
+* Note that if you were incorrectly using
`gid.connector.http.request.query-param-fields` with POST or PUT did not give
an error. This connector corrects the behaviour so specifying
`http.request.query-param-fields` with POST or PUT does give an error.
## Working with HTTP lookup source tables
@@ -204,7 +205,8 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.source.lookup.proxy.username |
optional | Specify the username used for proxy authentication.
[...]
| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
[...]
| http.request.query-param-fields |
optional | Used for the `http-generic-json-url` query creator. The names of the
fields that will be mapped to query parameters. The parameters are separated by
semicolons, such as `param1;param2`.
[...]
-| http.request.body-template |
optional | Used for the `http-generic-json-url` query creator. A JSON template
string for constructing the request body for PUT and POST operations. Use
`{{fieldName}}` placeholders to reference top-level columns from the lookup
table. Supports creating complex nested JSON structures with both placeholders
and literal values. See the [Body Template](#body-template) section for details
and examples. [...]
+| http.request.query-param-fields-with-key |
optional | A map of column names to query parameter keys. See the [Query
Parameter Mapping](#query-parameter-mapping) section for details and examples.
[...]
+| http.request.body-template |
optional | Used for the `http-generic-json-url` query creator. A JSON template
string for constructing the request body for PUT and POST operations. Use
`{{fieldName}}` placeholders to reference top-level columns from the lookup
table. Supports creating complex nested JSON structures with both placeholders
and literal values. See the [Body Template](#body-template) section for details
and examples. [...]
| http.request.url-map |
optional | Used for the `http-generic-json-url` query creator. The map of
insert names to column names used as url segments. Parses a string as a map of
strings. For example if there are table columns called `customerId` and
`orderId`, then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for
the lookup query will dynamically pic [...]
### Query Creators
@@ -244,7 +246,77 @@ The HTTP connector supplies a number of Query Creators
that you can use to defin
The recommended Query creator for json is called _http-generic-json-url_,
which allows column content to be mapped as URL, path, body and query parameter
request values; it supports
POST, PUT and GET operations. This query creator allows you to issue json
requests without needing to code
your own custom http connector. The mappings from columns to the json request
are supplied in the query creator configuration
-parameters `http.request.query-param-fields`, `http.request.body-fields` and
`http.request.url-map`.
+parameters `http.request.query-param-fields`,
`http.request.query-param-fields-with-key`, `http.request.body-template` and
`http.request.url-map`.
+#### Query Parameter Mapping
+
+The `http.request.query-param-fields` and
`http.request.query-param-fields-with-key` options provide flexible ways to map
table columns to HTTP query parameters for GET requests.
+
+**Basic Query Parameter Mapping (`http.request.query-param-fields`):**
+
+Use this option to map column names directly to query parameters with the same
name. The parameters are separated by semicolons.
+
+```sql
+'http.request.query-param-fields' = 'userId;orderId'
+```
+
+This will map the `userId` and `orderId` columns to query parameters
`?userId=value1&orderId=value2`.
+
+**Advanced Query Parameter Mapping
(`http.request.query-param-fields-with-key`):**
+
+Use this option when you need to use different names for query parameters than
the table column names. This is necessary when request API fields have the same
name as response fields and incompatible types.
+
+**Format:** `columnName1:queryParamKey1,columnName2:queryParamKey2`
+
+**Example Scenario:**
+
+If your API response has a field `customer` defined as an object (complex
type), but you need to send a customer ID as a query parameter with the same
name `customer`, you can:
+
+1. Define a new string type column `qp_customer` in your table for the request
parameter
+2. Keep the `customer` column for the response object
+3. Map the request column to the query parameter:
+
+```sql
+'http.request.query-param-fields-with-key' = 'qp_customer:customer'
+```
+
+This will map the `qp_customer` column value to the query parameter
`?customer=value`.
+
+**Complete Example:**
+
+```sql
+CREATE TABLE CustomerLookup (
+ qp_customer STRING, -- Request: customer ID as string
+ qp_order STRING, -- Request: order ID as string
+ customer ROW< -- Response: customer object
+ id STRING,
+ name STRING
+ >,
+ order ROW< -- Response: order object
+ id STRING,
+ total DECIMAL
+ >
+) WITH (
+ 'connector' = 'http',
+ 'format' = 'json',
+ 'url' = 'http://api.example.com/lookup',
+ 'lookup-method' = 'GET',
+ 'http.request.query-param-fields-with-key' =
'qp_customer:customer,qp_order:order'
+)
+```
+
+In this example, when you join with `qp_customer='C123'` and
`qp_order='O456'`, the HTTP request will be:
+```
+GET http://api.example.com/lookup?customer=C123&order=O456
+```
+
+The response will populate the `customer` and `order` complex objects.
+
+**Important Notes for `http.request.query-param-fields-with-key`:**
+- Can only be used with GET requests
+- Column names and query parameter keys cannot be null or empty
+- Query parameter keys must be unique (no duplicates allowed)
+- Cannot conflict with columns defined in `http.request.query-param-fields`
+
#### Body Template
@@ -295,24 +367,27 @@ This creates a nested JSON structure where `{{userId}}`,
`{{userName}}`, and `{{
- The template must be valid JSON with placeholders
- All referenced fields must exist in the table schema
- Field values are properly JSON-encoded (strings are quoted, numbers/booleans
are not)
-- Cannot be used together with `http.request.query-param-fields` for body
construction
+- Can only be used with POST/PUT methods (not GET - use query parameters for
GET requests)
- Note that all API response fields should match the name structure and type
of Table defined columns.
**Using http-generic-json-url Query Creator in your flow**
This query creator allows you to populate API calls very flexibly. To do this
effectively follow the below guidance:
-1) All look join keys need to be at the top level.
-2) The lookup connector will only see runtime content that is a lookup join key
-3) Everything we want to put into the body, query params or paths need to be
lookup join keys.
-4) For paths and query params - these columns need to be defined at the top
level of the HTTP table.
-5) For constants that might be required on the query-params- they should be
defined as query parameter on the URL e.g suffix the url with
`?myParam=myvalue`.
-6) For constants required in the path, hard code as a url segment.
-5) The join key may be needed in the body. The `http.request.body-template`
allows you to populate the body as required including nested levels, using the
template; it also allows you to specify contest required to be the same for
every call.
-6) An observation, if you start from an Open API specification, that contains
nested content that will be required as a lookup join key, then the
`http.request.body-template` is used to map top level columns into that
structure/
-7) Response content is mapped to matching named top level columns in the
lookup table. You should arrange your table columns so that some are request
columns
- (all top level) and some response columns.
-8) Use single quotes for the value of `http.request.body-template`, so you do
not need to escape the double quotes and add new line characters for readabilitu
+1) All lookup join keys need to be at the top level.
+2) The lookup connector will only see runtime content that is a lookup join
key.
+3) Everything we want to put into the body, query params, or paths needs to be
lookup join keys.
+4) For paths and query params, these columns need to be defined at the top
level of the HTTP table.
+5) For constants that might be required in the query params, define them as
query parameters in the URL, e.g., suffix the URL with `?myParam=myvalue`.
+6) For constants required in the path, hard code them as URL segments.
+7) The join key may be needed in the body. The `http.request.body-template`
allows you to populate the body as required, including nested levels, using the
template. It also allows you to specify content required to be the same for
every call.
+8) If you start from an OpenAPI specification that contains nested content
required as a lookup join key, then use `http.request.body-template` to map
top-level columns into that structure.
+9) Response content is mapped to matching named top-level columns in the
lookup table. You should arrange your table columns so that some are request
columns (all top level) and some are response columns.
+10) Use single quotes for the value of `http.request.body-template` so you do
not need to escape the double quotes, and add newline characters for
readability.
+11) If you want to enrich every event with the same API content, you can
specify a placeholder as the complete URL the `url`, then use
`http.request.url-map` to map it. In this scenario switching on caching is
advised to avoid repeated identical API calls.
+12) Note that columns in SQL tables (the DDL) do not have a natural way to
distinguish between request and response fields. Where possible, use the API
field name as column names in the DDL; this minimizes the number of columns you
need to define.
+13) The exception to 12) is when a response API field name is the same as a
request API field **and** they have incompatible types. In this case, define
the request column with a different name, then use
`http.request.query-param-fields-with-key`, `http.request.body-template`,
and/or `http.request.url-map` to provide the mapping to the API field.
+14) Note the columns representing the response are those that should be used
for enrichment.
### Format considerations
@@ -603,8 +678,8 @@ Metadata columns can be specified and hold http
information. They are optional r
If the `error-string` metadata column is defined on the table and the call
succeeds then it will have a null value.
When the HTTP response cannot be deserialized, then the
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
and the `error-string` will be the response body.
-When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
-be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
+When the HTTP status code is in the
`http.source.lookup.ignored-response-codes`, then the `http-completion-state`
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain
information about the API call that
occurred.
When a HTTP lookup call fails and populates the metadata columns with the
error information, the expected enrichment columns from the HTTP call
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index 45d2a86..2ae6abe 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -90,7 +90,8 @@ The Flink connector does have some changes that you need to
be aware of if you a
the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against this connector.
-* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
| optional | Used for the
+* Note that the `http-generic-json-url` query creator now processes HTTP
bodies differently using `http.request.body-template`.
+* Note that if you were incorrectly using
`gid.connector.http.request.query-param-fields` with POST or PUT did not give
an error. This connector corrects the behaviour so specifying
`http.request.query-param-fields` with POST or PUT does give an error.
## Working with HTTP lookup source tables
@@ -204,7 +205,8 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.source.lookup.proxy.username |
optional | Specify the username used for proxy authentication.
[...]
| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
[...]
| http.request.query-param-fields |
optional | Used for the `http-generic-json-url` query creator. The names of the
fields that will be mapped to query parameters. The parameters are separated by
semicolons, such as `param1;param2`.
[...]
-| http.request.body-template |
optional | Used for the `http-generic-json-url` query creator. A JSON template
string for constructing the request body for PUT and POST operations. Use
`{{fieldName}}` placeholders to reference top-level columns from the lookup
table. Supports creating complex nested JSON structures with both placeholders
and literal values. See the [Body Template](#body-template) section for details
and examples. [...]
+| http.request.query-param-fields-with-key |
optional | A map of column names to query parameter keys. See the [Query
Parameter Mapping](#query-parameter-mapping) section for details and examples.
[...]
+| http.request.body-template |
optional | Used for the `http-generic-json-url` query creator. A JSON template
string for constructing the request body for PUT and POST operations. Use
`{{fieldName}}` placeholders to reference top-level columns from the lookup
table. Supports creating complex nested JSON structures with both placeholders
and literal values. See the [Body Template](#body-template) section for details
and examples. [...]
| http.request.url-map |
optional | Used for the `http-generic-json-url` query creator. The map of
insert names to column names used as url segments. Parses a string as a map of
strings. For example if there are table columns called `customerId` and
`orderId`, then specifying value `customerId:cid,orderID:oid` and a url of
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for
the lookup query will dynamically pic [...]
### Query Creators
@@ -244,7 +246,77 @@ The HTTP connector supplies a number of Query Creators
that you can use to defin
The recommended Query creator for json is called _http-generic-json-url_,
which allows column content to be mapped as URL, path, body and query parameter
request values; it supports
POST, PUT and GET operations. This query creator allows you to issue json
requests without needing to code
your own custom http connector. The mappings from columns to the json request
are supplied in the query creator configuration
-parameters `http.request.query-param-fields`, `http.request.body-fields` and
`http.request.url-map`.
+parameters `http.request.query-param-fields`,
`http.request.query-param-fields-with-key`, `http.request.body-template` and
`http.request.url-map`.
+#### Query Parameter Mapping
+
+The `http.request.query-param-fields` and
`http.request.query-param-fields-with-key` options provide flexible ways to map
table columns to HTTP query parameters for GET requests.
+
+**Basic Query Parameter Mapping (`http.request.query-param-fields`):**
+
+Use this option to map column names directly to query parameters with the same
name. The parameters are separated by semicolons.
+
+```sql
+'http.request.query-param-fields' = 'userId;orderId'
+```
+
+This will map the `userId` and `orderId` columns to query parameters
`?userId=value1&orderId=value2`.
+
+**Advanced Query Parameter Mapping
(`http.request.query-param-fields-with-key`):**
+
+Use this option when you need to use different names for query parameters than
the table column names. This is necessary when request API fields have the same
name as response fields and incompatible types.
+
+**Format:** `columnName1:queryParamKey1,columnName2:queryParamKey2`
+
+**Example Scenario:**
+
+If your API response has a field `customer` defined as an object (complex
type), but you need to send a customer ID as a query parameter with the same
name `customer`, you can:
+
+1. Define a new string type column `qp_customer` in your table for the request
parameter
+2. Keep the `customer` column for the response object
+3. Map the request column to the query parameter:
+
+```sql
+'http.request.query-param-fields-with-key' = 'qp_customer:customer'
+```
+
+This will map the `qp_customer` column value to the query parameter
`?customer=value`.
+
+**Complete Example:**
+
+```sql
+CREATE TABLE CustomerLookup (
+ qp_customer STRING, -- Request: customer ID as string
+ qp_order STRING, -- Request: order ID as string
+ customer ROW< -- Response: customer object
+ id STRING,
+ name STRING
+ >,
+ order ROW< -- Response: order object
+ id STRING,
+ total DECIMAL
+ >
+) WITH (
+ 'connector' = 'http',
+ 'format' = 'json',
+ 'url' = 'http://api.example.com/lookup',
+ 'lookup-method' = 'GET',
+ 'http.request.query-param-fields-with-key' =
'qp_customer:customer,qp_order:order'
+)
+```
+
+In this example, when you join with `qp_customer='C123'` and
`qp_order='O456'`, the HTTP request will be:
+```
+GET http://api.example.com/lookup?customer=C123&order=O456
+```
+
+The response will populate the `customer` and `order` complex objects.
+
+**Important Notes for `http.request.query-param-fields-with-key`:**
+- Can only be used with GET requests
+- Column names and query parameter keys cannot be null or empty
+- Query parameter keys must be unique (no duplicates allowed)
+- Cannot conflict with columns defined in `http.request.query-param-fields`
+
#### Body Template
@@ -295,24 +367,27 @@ This creates a nested JSON structure where `{{userId}}`,
`{{userName}}`, and `{{
- The template must be valid JSON with placeholders
- All referenced fields must exist in the table schema
- Field values are properly JSON-encoded (strings are quoted, numbers/booleans
are not)
-- Cannot be used together with `http.request.query-param-fields` for body
construction
-- Note that all API response fields should match the name structure and type
of Table defined columns.
+- Can only be used with POST/PUT methods (not GET - use query parameters for
GET requests)
+- Note that all API response fields should match the name structure and type
of Table defined columns.
**Using http-generic-json-url Query Creator in your flow**
This query creator allows you to populate API calls very flexibly. To do this
effectively follow the below guidance:
-1) All look join keys need to be at the top level.
-2) The lookup connector will only see runtime content that is a lookup join key
-3) Everything we want to put into the body, query params or paths need to be
lookup join keys.
-4) For paths and query params - these columns need to be defined at the top
level of the HTTP table.
-5) For constants that might be required on the query-params- they should be
defined as query parameter on the URL e.g suffix the url with
`?myParam=myvalue`.
-6) For constants required in the path, hard code as a url segment.
-5) The join key may be needed in the body. The `http.request.body-template`
allows you to populate the body as required including nested levels, using the
template; it also allows you to specify contest required to be the same for
every call.
-6) An observation, if you start from an Open API specification, that contains
nested content that will be required as a lookup join key, then the
`http.request.body-template` is used to map top level columns into that
structure/
-7) Response content is mapped to matching named top level columns in the
lookup table. You should arrange your table columns so that some are request
columns
- (all top level) and some response columns.
-8) Use single quotes for the value of `http.request.body-template`, so you do
not need to escape the double quotes and add new line characters for readabilitu
+1) All lookup join keys need to be at the top level.
+2) The lookup connector will only see runtime content that is a lookup join
key.
+3) Everything we want to put into the body, query params, or paths needs to be
lookup join keys.
+4) For paths and query params, these columns need to be defined at the top
level of the HTTP table.
+5) For constants that might be required in the query params, define them as
query parameters in the URL, e.g., suffix the URL with `?myParam=myvalue`.
+6) For constants required in the path, hard code them as URL segments.
+7) The join key may be needed in the body. The `http.request.body-template`
allows you to populate the body as required, including nested levels, using the
template. It also allows you to specify content required to be the same for
every call.
+8) If you start from an OpenAPI specification that contains nested content
required as a lookup join key, then use `http.request.body-template` to map
top-level columns into that structure.
+9) Response content is mapped to matching named top-level columns in the
lookup table. You should arrange your table columns so that some are request
columns (all top level) and some are response columns.
+10) Use single quotes for the value of `http.request.body-template` so you do
not need to escape the double quotes, and add newline characters for
readability.
+11) If you want to enrich every event with the same API content, you can
specify a placeholder as the complete URL the `url`, then use
`http.request.url-map` to map it. In this scenario switching on caching is
advised to avoid repeated identical API calls.
+12) Note that columns in SQL tables (the DDL) do not have a natural way to
distinguish between request and response fields. Where possible, use the API
field name as column names in the DDL; this minimizes the number of columns you
need to define.
+13) The exception to 12) is when a response API field name is the same as a
request API field **and** they have incompatible types. In this case, define
the request column with a different name, then use
`http.request.query-param-fields-with-key`, `http.request.body-template`,
and/or `http.request.url-map` to provide the mapping to the API field.
+14) Note the columns representing the response are those that should be used
for enrichment.
### Format considerations
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index aed2d54..50d964a 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -71,13 +71,15 @@ import java.util.regex.Pattern;
@Slf4j
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
private static final long serialVersionUID = 1L;
+ private static final Pattern TEMPLATE_PLACEHOLDER_PATTERN =
+ Pattern.compile("\\{\\{([^}]+)\\}\\}");
// not final so we can mutate for unit test
private SerializationSchema<RowData> serializationSchema;
private boolean schemaOpened = false;
private LookupRow lookupRow;
private final String httpMethod;
- private final List<String> requestQueryParamsFields;
+ private final Map<String, String> requestQueryParamsMap;
private final Map<String, String> requestUrlMap;
private final String bodyTemplate;
@@ -86,7 +88,7 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
*
* @param httpMethod the requested http method
* @param serializationSchema serialization schema for RowData
- * @param requestQueryParamsFields query param fields
+ * @param requestQueryParamsMap map of column names to query parameter keys
* @param requestUrlMap url map
* @param bodyTemplate template string for request body with placeholders
like {@code
* {{fieldName}}}
@@ -95,14 +97,14 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
public GenericJsonAndUrlQueryCreator(
final String httpMethod,
final SerializationSchema<RowData> serializationSchema,
- final List<String> requestQueryParamsFields,
+ final Map<String, String> requestQueryParamsMap,
final Map<String, String> requestUrlMap,
final String bodyTemplate,
final LookupRow lookupRow) {
this.httpMethod = httpMethod;
this.serializationSchema = serializationSchema;
this.lookupRow = lookupRow;
- this.requestQueryParamsFields = requestQueryParamsFields;
+ this.requestQueryParamsMap = requestQueryParamsMap;
this.requestUrlMap = requestUrlMap;
this.bodyTemplate = bodyTemplate;
}
@@ -132,9 +134,19 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
}
// Parameters are encoded as query params for GET and none GET.
// Later code will turn these query params into the body for PUTs and
POSTs
+ // Use the map to rename columns to query param keys
ObjectNode jsonObjectForQueryParams =
ObjectMapperAdapter.instance().createObjectNode();
- for (String requestColumnName : this.requestQueryParamsFields) {
- jsonObjectForQueryParams.set(requestColumnName,
jsonObject.get(requestColumnName));
+ for (Map.Entry<String, String> entry :
this.requestQueryParamsMap.entrySet()) {
+ String columnName = entry.getKey();
+ String queryParamKey = entry.getValue();
+ JsonNode value = jsonObject.get(columnName);
+ if (value == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Query parameter mapping references column
'%s' that does not exist in the lookup row. Available columns: %s",
+ columnName,
getAvailableColumnNames(jsonObject)));
+ }
+ jsonObjectForQueryParams.set(queryParamKey, value);
}
// TODO can we convertToQueryParameters for all ops
// and not use/deprecate bodyBasedUrlQueryParams
@@ -173,8 +185,7 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
* @return the template with placeholders replaced by actual values
*/
private String substituteTemplate(String template, ObjectNode jsonObject) {
- Pattern pattern = Pattern.compile("\\{\\{([^}]+)\\}\\}");
- Matcher matcher = pattern.matcher(template);
+ Matcher matcher = TEMPLATE_PLACEHOLDER_PATTERN.matcher(template);
StringBuilder result = new StringBuilder();
while (matcher.find()) {
@@ -199,6 +210,18 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
return result.toString();
}
+ /**
+ * Get a comma-separated list of available column names from the JSON
object.
+ *
+ * @param jsonObject the JSON object containing field names
+ * @return comma-separated string of available column names
+ */
+ private String getAvailableColumnNames(ObjectNode jsonObject) {
+ StringJoiner joiner = new StringJoiner(", ");
+ jsonObject.fieldNames().forEachRemaining(joiner::add);
+ return joiner.toString();
+ }
+
/**
* Create a Row from a RowData and DataType.
*
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index 55ef636..94ec897 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,6 +69,12 @@ public class GenericJsonAndUrlQueryCreatorFactory implements
LookupQueryCreatorF
+ " The parameters are separated by
semicolons,"
+ " such as 'param1;param2'.");
+ public static final ConfigOption<Map<String, String>>
REQUEST_QUERY_PARAM_FIELDS_WITH_KEY =
+ ConfigOptions.key("http.request.query-param-fields-with-key")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("A map of column names to query parameter
keys.");
+
public static final ConfigOption<Map<String, String>> REQUEST_URL_MAP =
ConfigOptions.key("http.request.url-map")
.mapType()
@@ -103,9 +111,27 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
final DynamicTableFactory.Context dynamicTableFactoryContext) {
final String httpMethod = readableConfig.get(LOOKUP_METHOD);
final String formatIdentifier =
readableConfig.get(LOOKUP_REQUEST_FORMAT);
- // get the information from config
+
+ // Validate configuration based on HTTP method
+ validateConfigurationForHttpMethod(httpMethod, readableConfig);
+
+ // Handle query param fields - merge both list and map formats
+ Map<String, String> requestQueryParamsMap = new LinkedHashMap<>();
+
+ // First, add entries from list format (key = value for backward
compatibility)
final List<String> requestQueryParamsFields =
readableConfig.get(REQUEST_QUERY_PARAM_FIELDS);
+ requestQueryParamsFields.forEach(field ->
requestQueryParamsMap.put(field, field));
+
+ // Then, add entries from map format
+ if
(readableConfig.getOptional(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY).isPresent()) {
+ Map<String, String> queryParamFieldsWithKey =
+ readableConfig.get(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY);
+
+ validateQueryParamMap(queryParamFieldsWithKey, readableConfig);
+ requestQueryParamsMap.putAll(queryParamFieldsWithKey);
+ }
+
Map<String, String> requestUrlMap =
readableConfig.get(REQUEST_URL_MAP);
String bodyTemplate =
readableConfig.getOptional(REQUEST_BODY_TEMPLATE).orElse(null);
@@ -118,7 +144,7 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
return new GenericJsonAndUrlQueryCreator(
httpMethod,
jsonSerializationSchema,
- requestQueryParamsFields,
+ requestQueryParamsMap,
requestUrlMap,
bodyTemplate,
lookupRow);
@@ -136,7 +162,138 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_URL_MAP,
REQUEST_BODY_TEMPLATE);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ REQUEST_URL_MAP,
+ REQUEST_BODY_TEMPLATE);
+ }
+
+ /**
+ * Validates the query parameter map to ensure all constraints are met.
+ *
+ * <ul>
+ * <li>Column names (keys) are not null or empty
+ * <li>Query param keys (values) are not null or empty
+ * <li>Query param keys do not conflict with old
REQUEST_QUERY_PARAM_FIELDS list
+ * <li>Query param keys are unique (no duplicates)
+ * <li>Column names in new map do not conflict with column names in old
list
+ * </ul>
+ *
+ * @param queryParamMap the map to validate
+ * @param readableConfig the configuration to check for conflicts
+ * @throws IllegalArgumentException if validation fails
+ */
+ private void validateQueryParamMap(
+ Map<String, String> queryParamMap, ReadableConfig readableConfig) {
+ if (queryParamMap == null || queryParamMap.isEmpty()) {
+ return; // Empty map is valid
+ }
+
+ // Get the list format for conflict checking
+ final List<String> queryParamFields =
readableConfig.get(REQUEST_QUERY_PARAM_FIELDS);
+
+ // Track seen query param keys to detect duplicates
+ Set<String> seenQueryParamKeys = new HashSet<>();
+
+ for (Map.Entry<String, String> entry : queryParamMap.entrySet()) {
+ String columnName = entry.getKey();
+ String queryParamKey = entry.getValue();
+
+ // Validate column name (map key)
+ if (columnName == null || columnName.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Column name in "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + " cannot be null or empty");
+ }
+
+ // Validate query param key (map value)
+ if (queryParamKey == null || queryParamKey.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Query parameter key for column '"
+ + columnName
+ + "' in "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + " cannot be null or empty");
+ }
+
+ // Check for duplicate query param keys
+ if (seenQueryParamKeys.contains(queryParamKey)) {
+ throw new IllegalArgumentException(
+ "Duplicate query parameter key '"
+ + queryParamKey
+ + "' in "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + ". Each query parameter key must be
unique.");
+ }
+ seenQueryParamKeys.add(queryParamKey);
+
+ // Check for conflicts with list format - both query param keys
and column names
+ if (!queryParamFields.isEmpty()
+ && (queryParamFields.contains(queryParamKey)
+ || queryParamFields.contains(columnName))) {
+ throw new IllegalArgumentException(
+ (queryParamFields.contains(queryParamKey)
+ ? "Query parameter key '" +
queryParamKey
+ : "Column name '" + columnName)
+ + "' in "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + " conflicts with existing columns defined in
"
+ + REQUEST_QUERY_PARAM_FIELDS.key());
+ }
+ }
+ }
+
+ /**
+ * Validates that configuration options are appropriate for the HTTP
method.
+ *
+ * <p>Rules:
+ *
+ * <ul>
+ * <li>Query parameter configs can only be used with GET
+ * <li>Body template can only be used with POST/PUT
+ * </ul>
+ *
+ * @param httpMethod the HTTP method (GET, POST, PUT)
+ * @param readableConfig the configuration to validate
+ * @throws IllegalArgumentException if configuration is invalid for the
HTTP method
+ */
+ private void validateConfigurationForHttpMethod(
+ String httpMethod, ReadableConfig readableConfig) {
+ boolean hasQueryParams =
!readableConfig.get(REQUEST_QUERY_PARAM_FIELDS).isEmpty();
+ boolean hasQueryParamsWithKey =
+
readableConfig.getOptional(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY).isPresent();
+ boolean hasBodyTemplate =
+ readableConfig.getOptional(REQUEST_BODY_TEMPLATE).isPresent()
+ &&
!readableConfig.get(REQUEST_BODY_TEMPLATE).trim().isEmpty();
+
+ if (httpMethod.equalsIgnoreCase("GET")) {
+ if (hasBodyTemplate) {
+ throw new IllegalArgumentException(
+ "Body template configuration ("
+ + REQUEST_BODY_TEMPLATE.key()
+ + ") cannot be used with GET method. "
+ + "For GET requests, use "
+ + REQUEST_QUERY_PARAM_FIELDS.key()
+ + " or "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + " instead.");
+ }
+ } else {
+ // POST/PUT
+ if (hasQueryParams || hasQueryParamsWithKey) {
+ throw new IllegalArgumentException(
+ "Query parameter configuration ("
+ + REQUEST_QUERY_PARAM_FIELDS.key()
+ + " or "
+ + REQUEST_QUERY_PARAM_FIELDS_WITH_KEY.key()
+ + ") can only be used with GET method. "
+ + "For POST/PUT requests, use "
+ + REQUEST_BODY_TEMPLATE.key()
+ + " instead.");
+ }
+ }
}
/**
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransform.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/AbstractJsonTransform.java
similarity index 61%
rename from
flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransform.java
rename to
flink-connector-http/src/test/java/org/apache/flink/connector/http/app/AbstractJsonTransform.java
index 6766f44..3d7b968 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransform.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/AbstractJsonTransform.java
@@ -18,27 +18,30 @@
package org.apache.flink.connector.http.app;
-import com.github.tomakehurst.wiremock.common.FileSource;
-import com.github.tomakehurst.wiremock.extension.Parameters;
-import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
-import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformerV2;
import com.github.tomakehurst.wiremock.http.Response;
+import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
+import com.github.tomakehurst.wiremock.verification.LoggedRequest;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * Wiremock Extension that prepares HTTP REST endpoint response body. This
extension is stateful,
- * every next response will have values like id == counter, id2 == counter + 1
and uuid =
- * randomValue value in its response, where counter is incremented for every
subsequent request.
+ * Abstract base class for WireMock response transformers that generate JSON
responses with
+ * counter-based IDs and configurable UUID behavior.
*
- * <p>This class is used for AppDemo Wiremock.
+ * <p>Subclasses must provide:
+ *
+ * <ul>
+ * <li>Transformer name via {@link #getName()}
+ * <li>JSON template via {@link #getJsonTemplate()}
+ * <li>UUID generation strategy via {@link #getUuidValue()}
+ * </ul>
*/
-public class JsonTransform extends ResponseTransformer {
+public abstract class AbstractJsonTransform implements ResponseTransformerV2 {
- public static final String NAME = "JsonTransform";
+ private final AtomicInteger counter = new AtomicInteger(0);
- private static final String RESULT_JSON =
+ private static final String BASE_JSON_TEMPLATE =
"{\n"
+ "\t\"id\": \"&COUNTER&\",\n"
+ "\t\"id2\": \"&COUNTER_2&\",\n"
@@ -85,7 +88,7 @@ public class JsonTransform extends ResponseTransformer {
+ "\t\t\t\"name\": \"Lula Rogers\"\n"
+ "\t\t}\n"
+ "\t],\n"
- + "\t\"details\": {\n"
+ + "\t\"&NESTED_OBJECT&\": {\n"
+ "\t\t\"isActive\": true,\n"
+ "\t\t\"nestedDetails\": {\n"
+ "\t\t\t\"index\": 0,\n"
@@ -96,13 +99,11 @@ public class JsonTransform extends ResponseTransformer {
+ "\t\"greeting\": \"Hello, Marva Fischer! You have 7
unread messages.\",\n"
+ "\t\"favoriteFruit\": \"banana\"\n"
+ "}";
- private final AtomicInteger counter = new AtomicInteger(0);
@Override
- public Response transform(
- Request request, Response response, FileSource files, Parameters
parameters) {
+ public final Response transform(Response response, ServeEvent serveEvent) {
int cnt = counter.getAndIncrement();
-
+ LoggedRequest request = serveEvent.getRequest();
return Response.response()
.body(generateResponse(request.getUrl(), cnt))
.status(response.getStatus())
@@ -110,16 +111,71 @@ public class JsonTransform extends ResponseTransformer {
.build();
}
- @Override
- public String getName() {
- return NAME;
+ /**
+ * Returns the JSON template with placeholders for dynamic values.
+ *
+ * <p>Subclasses that need a different JSON structure should override this
method. By default,
+ * returns the base template with &NESTED_OBJECT& placeholder.
+ *
+ * <p>Supported placeholders:
+ *
+ * <ul>
+ * <li>&PARAM& - Request URL
+ * <li>&COUNTER& - Current counter value
+ * <li>&COUNTER_2& - Counter value + 1
+ * <li>&UUID& - UUID value (if used)
+ * <li>&NESTED_OBJECT& - Nested object name (if used)
+ * </ul>
+ *
+ * @return JSON template string
+ */
+ protected String getJsonTemplate() {
+ return BASE_JSON_TEMPLATE;
+ }
+
+ /**
+ * Returns the name for the nested object in the JSON response.
+ *
+ * <p>Default implementation returns null (no nested object placeholder
replacement). Override
+ * this method in subclasses that use the &NESTED_OBJECT& placeholder.
+ *
+ * @return nested object name, or null if not used
+ */
+ protected String getNestedObjectName() {
+ return null;
+ }
+
+ /**
+ * Returns the UUID value to use in responses.
+ *
+ * @return fixed UUID string for deterministic behavior
+ */
+ private String getUuidValue() {
+ return "fbb68a46-80a9-46da-9d40-314b5287079c";
}
private String generateResponse(String param, int counter) {
- return RESULT_JSON
- .replaceAll("&PARAM&", param)
- .replaceAll("&COUNTER&", String.valueOf(counter))
- .replaceAll("&COUNTER_2&", String.valueOf(counter + 1))
- .replaceAll("&UUID&", UUID.randomUUID().toString());
+ String response =
+ getJsonTemplate()
+ .replace("&PARAM&", param)
+ .replace("&COUNTER&", String.valueOf(counter))
+ .replace("&COUNTER_2&", String.valueOf(counter + 1));
+
+ String uuid = getUuidValue();
+ if (uuid != null) {
+ response = response.replace("&UUID&", uuid);
+ }
+
+ String nestedObjectName = getNestedObjectName();
+ if (nestedObjectName != null) {
+ response = response.replace("&NESTED_OBJECT&", nestedObjectName);
+ }
+
+ return response;
+ }
+
+ @Override
+ public boolean applyGlobally() {
+ return false;
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/HttpStubApp.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/HttpStubApp.java
index 21b814e..bf7a0e4 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/HttpStubApp.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/HttpStubApp.java
@@ -43,7 +43,7 @@ public class HttpStubApp {
new WireMockServer(
WireMockConfiguration.wireMockConfig()
.port(WireMockServerPortAllocator.getServerPort())
- .extensions(JsonTransform.class));
+ .extensions(JsonTransformApp.class));
wireMockServer.start();
wireMockServer.addStubMapping(setupServerStub());
@@ -52,6 +52,6 @@ public class HttpStubApp {
private static StubMapping setupServerStub() {
return wireMockServer.stubFor(
get(urlPathEqualTo(URL))
-
.willReturn(aResponse().withTransformers(JsonTransform.NAME)));
+
.willReturn(aResponse().withTransformers(JsonTransformApp.NAME)));
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformApp.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformApp.java
new file mode 100644
index 0000000..53db549
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformApp.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.app;
+
+import org.apache.flink.connector.http.table.lookup.JsonTransformLookup;
+
+/**
+ * Wiremock Extension that prepares HTTP REST endpoint response body. This
extension is stateful,
+ * every next response will have values like id == counter, id2 == counter + 1
and uuid = fixed
+ * value in its response, where counter is incremented for every subsequent
request.
+ */
+public class JsonTransformApp extends JsonTransformLookup {
+
+ public static final String NAME = "JsonTransformApp";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformCustomerObject.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformCustomerObject.java
new file mode 100644
index 0000000..1004638
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/app/JsonTransformCustomerObject.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.app;
+
+/**
+ * Wiremock Extension that prepares HTTP REST endpoint response body. This
extension is stateful,
+ * every next response will have values like id == counter, id2 == counter + 1
and uuid =
+ * randomValue value in its response, where counter is incremented for every
subsequent request.
+ * This mock has a customer object as the nested object name, so a unit test
can supply customer in
+ * the request and have the same name field with a different type in the
response.
+ */
+public class JsonTransformCustomerObject extends AbstractJsonTransform {
+
+ public static final String NAME = "JsonTransformCustomerObject";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ protected String getNestedObjectName() {
+ return "customer";
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index ed7a8ad..e3d6150 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -23,6 +23,7 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.connector.http.WireMockServerPortAllocator;
+import org.apache.flink.connector.http.app.JsonTransformCustomerObject;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
@@ -115,12 +116,12 @@ class HttpLookupTableSourceITCaseTest {
private WireMockServer wireMockServer;
+ private File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
+ private File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);
+
@SuppressWarnings("unchecked")
@BeforeEach
void setup() {
-
- File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
- File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);
serverPort = WireMockServerPortAllocator.getServerPort();
secServerPort = WireMockServerPortAllocator.getSecureServerPort();
wireMockServer =
@@ -134,7 +135,7 @@ class HttpLookupTableSourceITCaseTest {
.needClientAuth(true)
.trustStorePath(trustStoreFile.getAbsolutePath())
.trustStorePassword("password")
- .extensions(JsonTransform.class));
+ .extensions(JsonTransformLookup.class));
wireMockServer.start();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -254,7 +255,9 @@ class HttpLookupTableSourceITCaseTest {
.withQueryParam("id", matching("[0-9]+"))
.withQueryParam("id2", matching("[0-9]+"))
.willReturn(
-
aResponse().withTransformers(JsonTransform.NAME).withStatus(200)));
+ aResponse()
+
.withTransformers(JsonTransformLookup.NAME)
+ .withStatus(200)));
var lookupTable =
"CREATE TABLE Customers ("
@@ -297,7 +300,7 @@ class HttpLookupTableSourceITCaseTest {
.withHeader("Content-Type",
equalTo("application/json"))
.withQueryParam("id", matching("[0-9]+"))
.withQueryParam("id2", matching("[0-9]+"))
-
.willReturn(aResponse().withBody(JsonTransform.NAME).withStatus(404))
+
.willReturn(aResponse().withBody(JsonTransformLookup.NAME).withStatus(404))
.willSetStateTo("second_request"));
wireMockServer.stubFor(
get(urlPathEqualTo(ENDPOINT))
@@ -307,7 +310,9 @@ class HttpLookupTableSourceITCaseTest {
.withQueryParam("id", matching("[0-9]+"))
.withQueryParam("id2", matching("[0-9]+"))
.willReturn(
-
aResponse().withTransformers(JsonTransform.NAME).withStatus(200)));
+ aResponse()
+
.withTransformers(JsonTransformLookup.NAME)
+ .withStatus(200)));
var lookupTable =
"CREATE TABLE Customers ("
@@ -1058,6 +1063,206 @@ class HttpLookupTableSourceITCaseTest {
.withRequestBody(matchingJsonPath("$.id2")));
}
+ @Test
+ void testLookupJoinWithQueryParamMappingForNameConflict() throws Exception
{
+ // GIVEN - Scenario demonstrating name conflict resolution with query
parameters:
+ // - Orders table has "id" column (string)
+ // - API expects query param named "customer" (string)
+ // - API response has field "customer" (ROW type with nested fields)
+ // Solution: Use http.request.query-param-fields-with-key to map:
+ // - Lookup table column "id" → query parameter "customer"
+ // - Lookup table column "customer" → response field "customer" (ROW
type)
+
+ int serverPort2 = WireMockServerPortAllocator.getServerPort();
+ int secServerPort2 = WireMockServerPortAllocator.getSecureServerPort();
+ WireMockServer wireMockServer2 =
+ new WireMockServer(
+ WireMockConfiguration.wireMockConfig()
+ .port(serverPort2)
+ .httpsPort(secServerPort2)
+ .keystorePath(keyStoreFile.getAbsolutePath())
+ .keystorePassword("password")
+ .keyManagerPassword("password")
+ .needClientAuth(true)
+
.trustStorePath(trustStoreFile.getAbsolutePath())
+ .trustStorePassword("password")
+
.extensions(JsonTransformCustomerObject.class));
+ wireMockServer2.start();
+
+ wireMockServer2.stubFor(
+ get(urlPathEqualTo(ENDPOINT))
+ .withHeader("Content-Type",
equalTo("application/json"))
+ .withQueryParam("customer", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+"))
+ .willReturn(
+
aResponse().withTransformers(JsonTransformCustomerObject.NAME)));
+
+ // Create source table with "id" column
+ String sourceTable =
+ "CREATE TABLE Orders ("
+ + " id STRING,"
+ + " id2 STRING,"
+ + " proc_time AS PROCTIME()"
+ + ") WITH ("
+ + "'connector' = 'datagen',"
+ + "'rows-per-second' = '1',"
+ + "'fields.id.kind' = 'sequence',"
+ + "'fields.id.start' = '1',"
+ + "'fields.id.end' = '4',"
+ + "'fields.id2.kind' = 'sequence',"
+ + "'fields.id2.start' = '2',"
+ + "'fields.id2.end' = '5'"
+ + ")";
+
+ // Create lookup table with:
+ // - "id" column for request (mapped to "customer" query param via
config)
+ // - "customer" column for response (ROW type from API)
+ String lookupTable =
+ "CREATE TABLE Customers ("
+ + "id STRING,"
+ + "id2 STRING,"
+ + "msg STRING,"
+ + "uuid STRING,"
+ + "customer ROW<"
+ + "isActive BOOLEAN,"
+ + "nestedDetails ROW<"
+ + "balance STRING"
+ + ">"
+ + ">"
+ + ") WITH ("
+ + "'format' = 'json',"
+ + "'connector' = 'http',"
+ + "'lookup-method' = 'GET',"
+ + "'url' = 'http://localhost:"
+ + serverPort2
+ + "/client',"
+ + "'http.source.lookup.header.Content-Type' =
'application/json',"
+ + "'asyncPolling' = 'true',"
+ + "'http.source.lookup.query-creator' =
'http-generic-json-url',"
+ + "'table.exec.async-lookup.buffer-capacity' = '50',"
+ + "'table.exec.async-lookup.timeout' = '120s',"
+ // Map "id" column to "customer" query parameter
+ + "'http.request.query-param-fields-with-key' =
'id:customer',"
+ + "'http.request.query-param-fields' = 'id2'"
+ + ")";
+
+ tEnv.executeSql(sourceTable);
+ tEnv.executeSql(lookupTable);
+
+ // WHEN - SQL query that performs JOIN
+ // Join on Orders.id = Customers.id (which sends "customer" query
param to API)
+ String joinQuery =
+ "SELECT o.id, o.id2, c.customer, c.msg, c.uuid FROM Orders AS
o "
+ + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS
c "
+ + "ON o.id = c.id "
+ + "AND o.id2 = c.id2";
+
+ TableResult result = tEnv.executeSql(joinQuery);
+ result.await(15, TimeUnit.SECONDS);
+
+ // THEN
+ SortedSet<Row> collectedRows = getCollectedRows(result);
+ assertThat(collectedRows.size()).isEqualTo(4);
+
+ // Verify that WireMock received requests with "customer" query
parameter
+ wireMockServer2.verify(
+ 4,
+ getRequestedFor(urlPathEqualTo(ENDPOINT))
+ .withQueryParam("customer", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+")));
+ }
+
+ @Test
+ void testLookupJoinWithBodyTemplateForNameConflict() throws Exception {
+ // GIVEN - Scenario demonstrating name conflict resolution with body
template:
+ // - Orders table has "id" column (string)
+ // - API expects POST body field named "customer" (string)
+ // - API response has field "customer" (ROW type with nested fields)
+ // Solution: Use http.request.body-template to map:
+ // - Lookup table column "body_customer" → request body field
"customer"
+ // - Lookup table column "customer" → response field "customer" (ROW
type)
+ wireMockServer.stubFor(
+ post(urlPathEqualTo(ENDPOINT))
+ .withRequestBody(matchingJsonPath("$.customer"))
+ .withRequestBody(matchingJsonPath("$.id2"))
+ .withHeader("Content-Type",
equalTo("application/json"))
+
.willReturn(aResponse().withTransformers(JsonTransformLookup.NAME)));
+ // Create source table with "id" column
+ String sourceTable =
+ "CREATE TABLE Orders ("
+ + "id STRING,"
+ + "id2 STRING,"
+ + " proc_time AS PROCTIME()"
+ + ") WITH ("
+ + "'connector' = 'datagen',"
+ + "'rows-per-second' = '1',"
+ + "'fields.id.kind' = 'sequence',"
+ + "'fields.id.start' = '1',"
+ + "'fields.id.end' = '4',"
+ + "'fields.id2.kind' = 'sequence',"
+ + "'fields.id2.start' = '2',"
+ + "'fields.id2.end' = '5'"
+ + ")";
+
+ // Create lookup table with both:
+ // - "body_customer" for request body field (avoids conflict)
+ // - "customer" for response field (object type from API)
+ String lookupTable =
+ "CREATE TABLE Customers ("
+ + "body_customer STRING,"
+ + "id2 STRING,"
+ + "customer ROW<name STRING, age STRING>,"
+ + "msg STRING,"
+ + "uuid STRING,"
+ + "details ROW<"
+ + "isActive BOOLEAN,"
+ + "customer ROW<"
+ + "balance STRING"
+ + ">"
+ + ">"
+ + ") WITH ("
+ + "'format' = 'json',"
+ + "'connector' = 'http',"
+ + "'lookup-method' = 'POST',"
+ + "'url' = 'http://localhost:"
+ + serverPort
+ + "/client',"
+ + "'http.source.lookup.header.Content-Type' =
'application/json',"
+ + "'asyncPolling' = 'true',"
+ + "'http.source.lookup.query-creator' =
'http-generic-json-url',"
+ + "'lookup-request.format' = 'json',"
+ + "'table.exec.async-lookup.buffer-capacity' = '50',"
+ + "'table.exec.async-lookup.timeout' = '120s',"
+ // Template maps body_customer to "customer" in
request body
+ + "'http.request.body-template' = '{\"customer\":
{{body_customer}}, \"id2\": {{id2}}}'"
+ + ")";
+
+ tEnv.executeSql(sourceTable);
+ tEnv.executeSql(lookupTable);
+
+ // WHEN - SQL query that performs JOIN
+ // Join on Orders.id = Customers.body_customer (which sends "customer"
in POST body)
+ String joinQuery =
+ "SELECT o.id, o.id2, c.customer, c.msg, c.uuid FROM Orders AS
o "
+ + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS
c "
+ + "ON o.id = c.body_customer "
+ + "AND o.id2 = c.id2";
+
+ TableResult result = tEnv.executeSql(joinQuery);
+ result.await(15, TimeUnit.SECONDS);
+
+ // THEN
+ SortedSet<Row> collectedRows = getCollectedRows(result);
+ assertThat(collectedRows.size()).isEqualTo(4);
+
+ // Verify that WireMock received requests with "customer" field in body
+ wireMockServer.verify(
+ 4,
+ postRequestedFor(urlEqualTo(ENDPOINT))
+ .withRequestBody(matchingJsonPath("$.customer"))
+ .withRequestBody(matchingJsonPath("$.id2")));
+ }
+
private SortedSet<Row> testLookupJoinWithMetadata(String lookupTable, int
maxRows)
throws Exception {
@@ -1270,7 +1475,7 @@ class HttpLookupTableSourceITCaseTest {
.withHeader("Content-Type",
equalTo("application/json"))
.withQueryParam("id", matching("[0-9]+"))
.withQueryParam("id2", matching("[0-9]+"))
-
.willReturn(aResponse().withTransformers(JsonTransform.NAME)));
+
.willReturn(aResponse().withTransformers(JsonTransformLookup.NAME)));
}
private void setupServerStubEmptyResponse(WireMockServer wireMockServer) {
@@ -1343,7 +1548,7 @@ class HttpLookupTableSourceITCaseTest {
} else {
methodStub.willReturn(
aResponse()
- .withTransformers(JsonTransform.NAME)
+ .withTransformers(JsonTransformLookup.NAME)
.withHeader("Content-Type",
"application/json"));
}
} else {
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientWithWireTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientWithWireTest.java
index cefb4b2..33421c2 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientWithWireTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientWithWireTest.java
@@ -90,7 +90,7 @@ public class JavaNetHttpPollingClientWithWireTest {
.needClientAuth(true)
.trustStorePath(trustStoreFile.getAbsolutePath())
.trustStorePassword("password")
- .extensions(JsonTransform.class));
+ .extensions(JsonTransformLookup.class));
wireMockServer.start();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransform.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransform.java
deleted file mode 100644
index 564349a..0000000
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransform.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.http.table.lookup;
-
-import com.github.tomakehurst.wiremock.common.FileSource;
-import com.github.tomakehurst.wiremock.extension.Parameters;
-import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
-import com.github.tomakehurst.wiremock.http.Request;
-import com.github.tomakehurst.wiremock.http.Response;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Wiremock Extension that prepares HTTP REST endpoint response body. This
extension is stateful,
- * every next response will have id == counter and id2 == counter + 1 value in
its response, where
- * counter is incremented for every subsequent request.
- */
-public class JsonTransform extends ResponseTransformer {
-
- public static final String NAME = "JsonTransform";
-
- private static final String RESULT_JSON =
- "{\n"
- + "\t\"id\": \"&COUNTER&\",\n"
- + "\t\"id2\": \"&COUNTER_2&\",\n"
- + "\t\"uuid\": \"fbb68a46-80a9-46da-9d40-314b5287079c\",\n"
- + "\t\"picture\": \"http://placehold.it/32x32\",\n"
- + "\t\"msg\": \"&PARAM&, cnt: &COUNTER&\",\n"
- + "\t\"age\": 30,\n"
- + "\t\"eyeColor\": \"green\",\n"
- + "\t\"name\": \"Marva Fischer\",\n"
- + "\t\"gender\": \"female\",\n"
- + "\t\"company\": \"SILODYNE\",\n"
- + "\t\"email\": \"[email protected]\",\n"
- + "\t\"phone\": \"+1 (990) 562-2120\",\n"
- + "\t\"address\": \"601 Auburn Place, Bynum, New York,
7057\",\n"
- + "\t\"about\": \"Proident Lorem et duis nisi tempor elit
occaecat laboris"
- + " dolore magna Lorem consequat. Deserunt velit minim
nisi consectetur duis "
- + "amet labore cupidatat. Pariatur sunt occaecat qui
reprehenderit ipsum ex culpa "
- + "ullamco ex duis adipisicing commodo sunt. Ad cupidatat
magna ad in officia "
- + "irure aute duis culpa et. Magna esse adipisicing
consequat occaecat. Excepteur amet "
- + "dolore occaecat sit officia dolore elit in cupidatat
non anim.\\r\\n\",\n"
- + "\t\"registered\": \"2020-07-11T11:13:32 -02:00\",\n"
- + "\t\"latitude\": -35.237843,\n"
- + "\t\"longitude\": 60.386104,\n"
- + "\t\"tags\": [\n"
- + "\t\t\"officia\",\n"
- + "\t\t\"eiusmod\",\n"
- + "\t\t\"labore\",\n"
- + "\t\t\"ex\",\n"
- + "\t\t\"aliqua\",\n"
- + "\t\t\"consectetur\",\n"
- + "\t\t\"excepteur\"\n"
- + "\t],\n"
- + "\t\"friends\": [\n"
- + "\t\t{\n"
- + "\t\t\t\"id\": 0,\n"
- + "\t\t\t\"name\": \"Kemp Newman\"\n"
- + "\t\t},\n"
- + "\t\t{\n"
- + "\t\t\t\"id\": 1,\n"
- + "\t\t\t\"name\": \"Sears Blackburn\"\n"
- + "\t\t},\n"
- + "\t\t{\n"
- + "\t\t\t\"id\": 2,\n"
- + "\t\t\t\"name\": \"Lula Rogers\"\n"
- + "\t\t}\n"
- + "\t],\n"
- + "\t\"details\": {\n"
- + "\t\t\"isActive\": true,\n"
- + "\t\t\"nestedDetails\": {\n"
- + "\t\t\t\"index\": 0,\n"
- + "\t\t\t\"guid\":
\"d81fc542-6b49-4d59-8fb9-d57430d4871d\",\n"
- + "\t\t\t\"balance\": \"$1,729.34\"\n"
- + "\t\t}\n"
- + "\t},\n"
- + "\t\"greeting\": \"Hello, Marva Fischer! You have 7
unread messages.\",\n"
- + "\t\"favoriteFruit\": \"banana\"\n"
- + "}";
- private final AtomicInteger counter = new AtomicInteger(0);
-
- @Override
- public Response transform(
- Request request, Response response, FileSource files, Parameters
parameters) {
- int cnt = counter.getAndIncrement();
-
- return Response.response()
- .body(generateResponse(request.getUrl(), cnt))
- .status(response.getStatus())
- .statusMessage(response.getStatusMessage())
- .build();
- }
-
- @Override
- public String getName() {
- return NAME;
- }
-
- private String generateResponse(String param, int counter) {
- return RESULT_JSON
- .replaceAll("&PARAM&", param)
- .replaceAll("&COUNTER&", String.valueOf(counter))
- .replaceAll("&COUNTER_2&", String.valueOf(counter + 1));
- }
-
- @Override
- public boolean applyGlobally() {
- return false;
- }
-}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransformLookup.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransformLookup.java
new file mode 100644
index 0000000..6e3d585
--- /dev/null
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JsonTransformLookup.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.http.table.lookup;
+
+import org.apache.flink.connector.http.app.AbstractJsonTransform;
+
+/**
+ * Wiremock Extension that prepares HTTP REST endpoint response body. This
extension is stateful,
+ * every next response will have id == counter and id2 == counter + 1 value in
its response, where
+ * counter is incremented for every subsequent request.
+ */
+public class JsonTransformLookup extends AbstractJsonTransform {
+
+ public static final String NAME = "JsonTransformLookup";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ protected String getNestedObjectName() {
+ return "details";
+ }
+}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
index 3fa5c27..31d28d1 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
@@ -147,7 +147,198 @@ class GenericJsonAndUrlQueryCreatorFactoryTest {
LookupQueryCreator creator =
new GenericJsonAndUrlQueryCreatorFactory()
.createLookupQueryCreator(config, lookupRow,
tableContext);
- assertThat(creator).isNotNull();
+ }
+
+ @Test
+ void testValidationRejectsNullColumnName() {
+ // GIVEN - Map with null column name
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put(null, "qp1");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Column name")
+ .hasMessageContaining("cannot be null or empty");
+ }
+
+ @Test
+ void testValidationRejectsEmptyColumnName() {
+ // GIVEN - Map with empty column name
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("", "qp1");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Column name")
+ .hasMessageContaining("cannot be null or empty");
+ }
+
+ @Test
+ void testValidationRejectsNullQueryParamKey() {
+ // GIVEN - Map with null query param key
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("key1", null);
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter key for column 'key1'")
+ .hasMessageContaining("cannot be null or empty");
+ }
+
+ @Test
+ void testValidationRejectsEmptyQueryParamKey() {
+ // GIVEN - Map with empty query param key
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("key1", " ");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter key for column 'key1'")
+ .hasMessageContaining("cannot be null or empty");
+ }
+
+ @Test
+ void testValidationRejectsConflictBetweenQueryParamKeyAndListFormat() {
+ // GIVEN - REQUEST_QUERY_PARAM_FIELDS_WITH_KEY with query param key
that conflicts with
+ // REQUEST_QUERY_PARAM_FIELDS
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("qp1", "qp2"));
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("key1", "qp1"); // qp1 is already in
REQUEST_QUERY_PARAM_FIELDS
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter key 'qp1'")
+ .hasMessageContaining("conflicts with existing columns");
+ }
+
+ @Test
+ void testValidationRejectsConflictBetweenColumnNameAndListFormat() {
+ // GIVEN - REQUEST_QUERY_PARAM_FIELDS_WITH_KEY with column name that
conflicts with
+ // REQUEST_QUERY_PARAM_FIELDS
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1", "qp2"));
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("key1", "customParam"); // key1 is already in
REQUEST_QUERY_PARAM_FIELDS
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Column name 'key1'")
+ .hasMessageContaining("conflicts with existing columns");
+ }
+
+ @Test
+ void testValidationRejectsDuplicateQueryParamKeys() {
+ // GIVEN - Map with duplicate query param keys (different columns
mapping to same key)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ java.util.Map<String, String> queryParamMap = new
java.util.LinkedHashMap<>();
+ queryParamMap.put("customerId", "id");
+ queryParamMap.put("orderId", "id"); // Same key "id" used twice
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "customerId",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "orderId",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 1)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Duplicate query parameter key 'id'")
+ .hasMessageContaining("must be unique");
}
@Test
@@ -223,6 +414,205 @@ class GenericJsonAndUrlQueryCreatorFactoryTest {
assertThat(creator).isNotNull();
}
+ @Test
+ void testValidationRejectsQueryParamsWithPost() {
+ // GIVEN - POST request with query param fields (old format)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "POST");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1"));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter configuration")
+ .hasMessageContaining("can only be used with GET method");
+ }
+
+ @Test
+ void testValidationRejectsQueryParamsWithKeyWithPost() {
+ // GIVEN - POST request with query param fields with key (new format)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "POST");
+ java.util.Map<String, String> queryParamMap = new
java.util.HashMap<>();
+ queryParamMap.put("key1", "qp1");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY,
+ queryParamMap);
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter configuration")
+ .hasMessageContaining("can only be used with GET method");
+ }
+
+ @Test
+ void testValidationRejectsQueryParamsWithPut() {
+ // GIVEN - PUT request with query param fields
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "PUT");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1"));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Query parameter configuration")
+ .hasMessageContaining("can only be used with GET method");
+ }
+
+ @Test
+ void testValidationRejectsBodyTemplateWithGet() {
+ // GIVEN - GET request with body template
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ config.set(REQUEST_BODY_TEMPLATE, "{\"userId\":{{key1}}}");
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Body template configuration")
+ .hasMessageContaining("cannot be used with GET method");
+ }
+
+ @Test
+ void testValidationAllowsQueryParamsWithGet() {
+ // GIVEN - GET request with query param fields (should succeed)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "GET");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1"));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.setLookupPhysicalRowDataType(
+ row(List.of(DataTypes.FIELD("key1", DataTypes.STRING()))));
+
+ // WHEN/THEN - Should succeed
+ LookupQueryCreator creator =
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config, lookupRow,
tableContext);
+ assertThat(creator).isNotNull();
+ }
+
+ @Test
+ void testValidationRejectsBodyTemplateWithDefaultGetMethod() {
+ // GIVEN - No lookup-method specified (defaults to GET) with body
template
+ Configuration config = new Configuration();
+ // Note: NOT setting LOOKUP_METHOD - should default to GET
+ config.set(REQUEST_BODY_TEMPLATE, "{\"userId\":{{key1}}}");
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+
+ // WHEN/THEN - Should throw IllegalArgumentException because default
GET cannot use body
+ // template
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config,
lookupRow, tableContext))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Body template configuration")
+ .hasMessageContaining("cannot be used with GET method");
+ }
+
+ @Test
+ void testValidationAllowsQueryParamsWithDefaultGetMethod() {
+ // GIVEN - No lookup-method specified (defaults to GET) with query
param fields
+ Configuration config = new Configuration();
+ // Note: NOT setting LOOKUP_METHOD - should default to GET
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1"));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.setLookupPhysicalRowDataType(
+ row(List.of(DataTypes.FIELD("key1", DataTypes.STRING()))));
+
+ // WHEN/THEN - Should succeed because default GET allows query
parameters
+ LookupQueryCreator creator =
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config, lookupRow,
tableContext);
+ assertThat(creator).isNotNull();
+ }
+
+ @Test
+ void testValidationAllowsBodyTemplateWithPost() {
+ // GIVEN - POST request with body template (should succeed)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "POST");
+ config.set(REQUEST_BODY_TEMPLATE, "{\"userId\":{{key1}}}");
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.setLookupPhysicalRowDataType(
+ row(List.of(DataTypes.FIELD("key1", DataTypes.STRING()))));
+
+ // WHEN/THEN - Should succeed
+ LookupQueryCreator creator =
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config, lookupRow,
tableContext);
+ assertThat(creator).isNotNull();
+ }
+
+ @Test
+ void testValidationAllowsBodyTemplateWithPut() {
+ // GIVEN - PUT request with body template (should succeed)
+ Configuration config = new Configuration();
+ config.set(HttpLookupConnectorOptions.LOOKUP_METHOD, "PUT");
+ config.set(REQUEST_BODY_TEMPLATE, "{\"userId\":{{key1}}}");
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.setLookupPhysicalRowDataType(
+ row(List.of(DataTypes.FIELD("key1", DataTypes.STRING()))));
+
+ // WHEN/THEN - Should succeed
+ LookupQueryCreator creator =
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(config, lookupRow,
tableContext);
+ assertThat(creator).isNotNull();
+ }
+
@Test
void testNoBodyTemplateForPostReturnsEmptyJson() {
// GIVEN - POST request with no body template
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index d19ca7e..9f7620d 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -47,6 +47,7 @@ import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOp
import static
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceFactory.row;
import static
org.apache.flink.connector.http.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_TEMPLATE;
import static
org.apache.flink.connector.http.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS;
+import static
org.apache.flink.connector.http.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS_WITH_KEY;
import static
org.apache.flink.connector.http.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP;
import static
org.apache.flink.connector.http.table.lookup.querycreators.QueryCreatorUtils.getTableContext;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,12 +72,30 @@ class GenericJsonAndUrlQueryCreatorTest {
ResolvedSchema.of(Column.physical(KEY_1, DataTypes.STRING()));
private static final RowData ROWDATA = getRowData(1, VALUE);
+ @Test
+ public void createLookupQueryTestGet() {
+ // GIVEN
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfigurationForGet();
+ GenericJsonAndUrlQueryCreator universalJsonQueryCreator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ // WHEN
+ var createdQuery =
universalJsonQueryCreator.createLookupQuery(ROWDATA);
+ // THEN
+ validateCreatedQueryForGet(createdQuery);
+ }
+
@ParameterizedTest
- @ValueSource(strings = {"GET", "PUT", "POST"})
- public void createLookupQueryTestStrAllOps(String operation) {
+ @ValueSource(strings = {"PUT", "POST"})
+ public void createLookupQueryTestPostAndPut(String operation) {
// GIVEN
LookupRow lookupRow = getLookupRow(KEY_1);
- Configuration config = getConfiguration(operation);
+ Configuration config = getConfigurationForPostAndPut(operation);
GenericJsonAndUrlQueryCreator universalJsonQueryCreator =
(GenericJsonAndUrlQueryCreator)
new GenericJsonAndUrlQueryCreatorFactory()
@@ -87,11 +106,7 @@ class GenericJsonAndUrlQueryCreatorTest {
// WHEN
var createdQuery =
universalJsonQueryCreator.createLookupQuery(ROWDATA);
// THEN
- if (operation.equals("GET")) {
- validateCreatedQueryForGet(createdQuery);
- } else {
- validateCreatedQueryForPutAndPost(createdQuery);
- }
+ validateCreatedQueryForPutAndPost(createdQuery);
}
@Test
@@ -101,7 +116,7 @@ class GenericJsonAndUrlQueryCreatorTest {
Configuration config = new Configuration();
config.set(REQUEST_QUERY_PARAM_FIELDS, QUERY_PARAMS);
config.set(REQUEST_URL_MAP, urlParams);
- config.set(LOOKUP_METHOD, "POST");
+ config.set(LOOKUP_METHOD, "GET");
lookupRow.setLookupPhysicalRowDataType(DATATYPE_1_2);
GenericJsonAndUrlQueryCreator genericJsonAndUrlQueryCreator =
(GenericJsonAndUrlQueryCreator)
@@ -115,8 +130,8 @@ class GenericJsonAndUrlQueryCreatorTest {
GenericRowData.of(StringData.fromString("val1"),
StringData.fromString("val2"));
LookupQueryInfo createdQuery =
genericJsonAndUrlQueryCreator.createLookupQuery(lookupRowData);
- // THEN
- assertThat(createdQuery.getLookupQuery()).isEqualTo("");
+ // THEN - Only KEY_1 is in QUERY_PARAMS, so only key1=val1 in query
string
+ assertThat(createdQuery.getLookupQuery()).isEqualTo("key1=val1");
}
@Test
@@ -124,13 +139,14 @@ class GenericJsonAndUrlQueryCreatorTest {
// GIVEN
LookupRow lookupRow = getLookupRow(KEY_1);
lookupRow.setLookupPhysicalRowDataType(DATATYPE_1);
+ Configuration config = getConfigurationForPostAndPut("POST");
GenericJsonAndUrlQueryCreator genericJsonAndUrlQueryCreator =
(GenericJsonAndUrlQueryCreator)
new GenericJsonAndUrlQueryCreatorFactory()
.createLookupQueryCreator(
- getConfiguration("POST"),
+ config,
lookupRow,
-
getTableContext(getConfiguration("POST"), RESOLVED_SCHEMA));
+ getTableContext(config,
RESOLVED_SCHEMA));
// Mock a failing serialization schema
SerializationSchema<RowData> failingSchema =
new SerializationSchema<RowData>() {
@@ -297,7 +313,7 @@ class GenericJsonAndUrlQueryCreatorTest {
@Test
public void testBodyTemplateNotAppliedToGet() {
- // GIVEN - GET request with body template (should be ignored)
+ // GIVEN - GET request with body template (should be rejected by
validation)
Configuration config = new Configuration();
config.set(LOOKUP_METHOD, "GET");
config.set(REQUEST_QUERY_PARAM_FIELDS, QUERY_PARAMS);
@@ -306,19 +322,17 @@ class GenericJsonAndUrlQueryCreatorTest {
LookupRow lookupRow = getLookupRow(KEY_1);
lookupRow.setLookupPhysicalRowDataType(DATATYPE_1);
- GenericJsonAndUrlQueryCreator creator =
- (GenericJsonAndUrlQueryCreator)
- new GenericJsonAndUrlQueryCreatorFactory()
- .createLookupQueryCreator(
- config,
- lookupRow,
- getTableContext(config,
RESOLVED_SCHEMA));
-
- // WHEN
- LookupQueryInfo createdQuery = creator.createLookupQuery(ROWDATA);
-
- // THEN - Should be query params, not body
- assertThat(createdQuery.getLookupQuery()).isEqualTo("key1=val1");
+ // WHEN/THEN - Should throw IllegalArgumentException due to validation
+ assertThatThrownBy(
+ () ->
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Body template configuration
(http.request.body-template) cannot be used with GET method");
}
@Test
@@ -550,6 +564,190 @@ class GenericJsonAndUrlQueryCreatorTest {
assertThat(literalObject.get("key").asText()).isEqualTo("value");
}
+ @Test
+ public void testQueryParamFieldsWithKeyForGet() {
+ // GIVEN - Use new map format to rename query params
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "GET");
+ Map<String, String> queryParamMap = Map.of("key1", "qp1");
+ config.set(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY, queryParamMap);
+
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ LookupQueryInfo createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN - Query params should use renamed key
+ assertThat(createdQuery.hasLookupQuery()).isTrue();
+ String queryString = createdQuery.getLookupQuery();
+ assertThat(queryString).isEqualTo("qp1=val1");
+ // Should NOT contain original column name
+ assertThat(queryString).doesNotContain("key1=");
+ }
+
+ @Test
+ public void testBackwardCompatibilityWithOldListFormat() {
+ // GIVEN - Use old list format (should still work)
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "GET");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of(KEY_1));
+
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ LookupQueryInfo createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN - Should work as before (column name = query param key)
+ assertThat(createdQuery.hasLookupQuery()).isTrue();
+ assertThat(createdQuery.getLookupQuery()).isEqualTo("key1=val1");
+ }
+
+ @Test
+ public void testBothFormatsAreMergedWhenNoConflict() {
+ // GIVEN - Both old list and new map are provided with different fields
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "GET");
+ config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key2")); // Old format
for key2
+ Map<String, String> queryParamMap = Map.of("key1", "renamed_key"); //
New format for key1
+ config.set(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY, queryParamMap);
+
+ // Create schema with both key1 and key2
+ ResolvedSchema schemaWithBothKeys =
+ ResolvedSchema.of(
+ Column.physical("key1", DataTypes.STRING()),
+ Column.physical("key2", DataTypes.STRING()));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key1",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "key2",
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 1)));
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
schemaWithBothKeys));
+
+ // WHEN
+ GenericRowData lookupRowData =
+ GenericRowData.of(StringData.fromString("val1"),
StringData.fromString("val2"));
+ LookupQueryInfo createdQuery =
creator.createLookupQuery(lookupRowData);
+
+ // THEN - Both formats should be merged: key1 renamed to renamed_key,
key2 stays as key2
+ assertThat(createdQuery.hasLookupQuery()).isTrue();
+ String queryString = createdQuery.getLookupQuery();
+ assertThat(queryString).contains("key2=val2"); // From old list format
+ assertThat(queryString).contains("renamed_key=val1"); // From new map
format
+ assertThat(queryString).doesNotContain("key1="); // key1 should be
renamed
+ }
+
+ @Test
+ public void testQueryParamFieldsWithKeyMultipleFields() {
+ // GIVEN - Multiple fields with different renamed keys
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "GET");
+ Map<String, String> queryParamMap =
+ Map.of(
+ "customerId", "cid",
+ "orderName", "oname",
+ "status", "st");
+ config.set(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY, queryParamMap);
+
+ // Create schema with all three fields
+ ResolvedSchema schemaWithAllFields =
+ ResolvedSchema.of(
+ Column.physical("customerId", DataTypes.STRING()),
+ Column.physical("orderName", DataTypes.STRING()),
+ Column.physical("status", DataTypes.STRING()));
+
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "customerId",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "orderName",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 1)));
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "status",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 2)));
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
schemaWithAllFields));
+
+ // WHEN
+ GenericRowData lookupRowData =
+ GenericRowData.of(
+ StringData.fromString("123"),
+ StringData.fromString("Order1"),
+ StringData.fromString("active"));
+ LookupQueryInfo createdQuery =
creator.createLookupQuery(lookupRowData);
+
+ // THEN - All fields should be renamed
+ assertThat(createdQuery.hasLookupQuery()).isTrue();
+ String queryString = createdQuery.getLookupQuery();
+ assertThat(queryString).contains("cid=123");
+ assertThat(queryString).contains("oname=Order1");
+ assertThat(queryString).contains("st=active");
+ // Should NOT contain original column names
+ assertThat(queryString).doesNotContain("customerId=");
+ assertThat(queryString).doesNotContain("orderName=");
+ assertThat(queryString).doesNotContain("status=");
+ }
+
+ @Test
+ public void testQueryParamFieldsWithKeyMissingColumn() {
+ // GIVEN - Query param map references a column that doesn't exist
+ Configuration config = new Configuration();
+ config.set(LOOKUP_METHOD, "GET");
+ Map<String, String> queryParamMap = new java.util.HashMap<>();
+ queryParamMap.put("misspelledCol", "customer"); // misspelledCol
doesn't exist
+ config.set(REQUEST_QUERY_PARAM_FIELDS_WITH_KEY, queryParamMap);
+
+ LookupRow lookupRow = getLookupRow(KEY_1); // Only has key1
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN/THEN - Should throw IllegalArgumentException with helpful
message
+ assertThatThrownBy(() -> creator.createLookupQuery(ROWDATA))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("misspelledCol")
+ .hasMessageContaining("does not exist")
+ .hasMessageContaining("Available columns:");
+ }
+
// Helper methods
private static GenericRowData getRowData(int numFields, String value) {
if (numFields == 1) {
@@ -571,11 +769,19 @@ class GenericJsonAndUrlQueryCreatorTest {
return lookupRow;
}
- private Configuration getConfiguration(String operation) {
+ private Configuration getConfigurationForGet() {
Configuration config = new Configuration();
config.set(REQUEST_QUERY_PARAM_FIELDS, QUERY_PARAMS);
config.set(REQUEST_URL_MAP, urlParams);
+ config.set(LOOKUP_METHOD, "GET");
+ return config;
+ }
+
+ private Configuration getConfigurationForPostAndPut(String operation) {
+ Configuration config = new Configuration();
+ config.set(REQUEST_URL_MAP, urlParams);
config.set(LOOKUP_METHOD, operation);
+ // POST/PUT should not have query params - they use body template
instead
return config;
}
@@ -590,7 +796,7 @@ class GenericJsonAndUrlQueryCreatorTest {
// When no template is provided, body is empty (no body sent)
assertThat(createdQuery.hasLookupQuery()).isFalse();
assertThat(createdQuery.getLookupQuery()).isEqualTo("");
- assertThat(createdQuery.hasBodyBasedUrlQueryParameters()).isTrue();
+ assertThat(createdQuery.hasBodyBasedUrlQueryParameters()).isFalse();
assertThat(createdQuery.hasPathBasedUrlParameters()).isTrue();
}
}