This is an automated email from the ASF dual-hosted git repository.

ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 04b06e9  [FLINK-39689] Make template placeholders consistent
04b06e9 is described below

commit 04b06e9cb034af50ab7edd5bd9dc0319084a7492
Author: David Radley <[email protected]>
AuthorDate: Mon May 18 16:40:35 2026 +0100

    [FLINK-39689] Make template placeholders consistent
---
 docs/content.zh/docs/connectors/table/http.md      | 16 +++++------
 docs/content/docs/connectors/table/http.md         | 28 +++++++++----------
 .../http/config/HttpConnectorConfigConstants.java  |  6 +++++
 .../http/table/lookup/RequestFactoryBase.java      |  5 +++-
 .../GenericJsonAndUrlQueryCreator.java             | 31 ++++++++++++----------
 .../GenericJsonAndUrlQueryCreatorFactory.java      |  2 +-
 .../table/lookup/BodyBasedRequestFactoryTest.java  | 18 ++++++-------
 .../lookup/HttpLookupTableSourceITCaseTest.java    |  4 +--
 .../http/table/lookup/LookupQueryInfoTest.java     |  2 +-
 .../GenericJsonAndUrlQueryCreatorTest.java         | 25 +++++++++++++++++
 10 files changed, 87 insertions(+), 50 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index 2713343..13d7a54 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Timeouts](#timeouts)
     * [Source table HTTP status code](#source-table-http-status-code)
     * [Retries and handling errors (Lookup 
source)](#retries-and-handling-errors-lookup-source)
-      * [Retry strategy](#retry-strategy)
+        * [Retry strategy](#retry-strategy)
       * [Lookup multiple results](#lookup-multiple-results)
   * [Working with HTTP sink tables](#working-with-http-sink-tables)
     * [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@ The HTTP source connector supports [Lookup 
Joins](https://nightlies.apache.org/f
     * [Basic Authentication](#basic-authentication)
     * [OIDC Bearer Authentication](#oidc-bearer-authentication)
   * [Logging the HTTP content](#logging-the-http-content)
-    * [Restrictions at this time](#restrictions-at-this-time)
+      * [Restrictions at this time](#restrictions-at-this-time)
 <!-- TOC -->
 ## Dependencies
 
@@ -87,9 +87,9 @@ The Flink connector does have some changes that you need to 
be aware of if you a
 
 * Existing java applications will need to be recompiled to pick up the new 
flink package names.
 * Existing application and SQL need to be amended to use the new connector 
option names. The new option names do not have
-  the _com.getindata.http_ prefix, the prefix is now _http_.
+the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are 
discovered have been changed, so that the GetInData jar file can co-exist
-  with this connector's jar file. Be aware that if you have created custom 
pluggable components; you will need to recompile against this connector.
+with this connector's jar file. Be aware that if you have created custom 
pluggable components; you will need to recompile against this connector.
 * Note that the `http-generic-json-url` query creator now processes HTTP 
bodies differently using `http.request.body-template`.
 * Note that if you were incorrectly using 
`gid.connector.http.request.query-param-fields` with POST or PUT did not give 
an error. This connector corrects the behaviour so specifying 
`http.request.query-param-fields` with POST or PUT does give an error.
 * The GetInData HTTP connector was built against Flink version 1, so works 
with that level of Flink and also Flink version 2. This connector is built 
against and supports Flink 2.2.
@@ -142,7 +142,7 @@ CREATE TABLE Orders (
 Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the 
following SQL:
 
 ```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, 
c.details.nestedDetails.balance FROM Orders AS o 
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, 
c.details.nestedDetails.balance FROM Orders AS o
 JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 
= c.id2
 ```
 
@@ -249,8 +249,8 @@ parameters `http.request.body-template` and 
`http.request.url-map`.
 #### Mapping the URL
 
 The `http.request.url-map` option provides a flexible way to map table columns 
to parts of the URL, either URL segments or HTTP query parameters.
-Parses a string as a map of strings. For example if there are table columns 
called `customerId` and `orderId`,
-then specifying value `customerId:cid,orderID:oid` and a url of 
https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for 
the
+ Parses a string as a map of strings. For example if there are table columns 
called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of 
https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used 
for the
 lookup query will dynamically pickup the values for `customerId`, `orderId` 
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
 The expected format of the map is: `key1:value1,key2:value2`.
 
@@ -281,7 +281,7 @@ CREATE TABLE CustomerLookup (
 ) WITH (
     'connector' = 'http',
     'format' = 'json',
-    'url' = 
'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+    'url' = 
'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
     'lookup-method' = 'GET',
     'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
 )
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index 95089cb..13d7a54 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -90,9 +90,9 @@ The Flink connector does have some changes that you need to 
be aware of if you a
 the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are 
discovered have been changed, so that the GetInData jar file can co-exist
 with this connector's jar file. Be aware that if you have created custom 
pluggable components; you will need to recompile against this connector.
-* Note that the `http-generic-json-url` query creator now processes HTTP 
bodies differently using `http.request.body-template`.                
-* Note that if you were incorrectly using 
`gid.connector.http.request.query-param-fields` with POST or PUT did not give 
an error. This connector corrects the behaviour so specifying 
`http.request.query-param-fields` with POST or PUT does give an error. 
-* The GetInData HTTP connector was built against Flink version 1, so works 
with that level of Flink and also Flink version 2. This connector is built 
against and supports Flink 2.2. 
+* Note that the `http-generic-json-url` query creator now processes HTTP 
bodies differently using `http.request.body-template`.
+* Note that if you were incorrectly using 
`gid.connector.http.request.query-param-fields` with POST or PUT did not give 
an error. This connector corrects the behaviour so specifying 
`http.request.query-param-fields` with POST or PUT does give an error.
+* The GetInData HTTP connector was built against Flink version 1, so works 
with that level of Flink and also Flink version 2. This connector is built 
against and supports Flink 2.2.
 
 ## Working with HTTP lookup source tables
 
@@ -142,7 +142,7 @@ CREATE TABLE Orders (
 Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the 
following SQL:
 
 ```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, 
c.details.nestedDetails.balance FROM Orders AS o 
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, 
c.details.nestedDetails.balance FROM Orders AS o
 JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 
= c.id2
 ```
 
@@ -249,14 +249,14 @@ parameters `http.request.body-template` and 
`http.request.url-map`.
 #### Mapping the URL
 
 The `http.request.url-map` option provides a flexible way to map table columns 
to parts of the URL, either URL segments or HTTP query parameters.
- Parses a string as a map of strings. For example if there are table columns 
called `customerId` and `orderId`, 
-then specifying value `customerId:cid,orderID:oid` and a url of 
https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for 
the
-lookup query will dynamically pickup the values for `customerId`, `orderId` 
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1. 
-The expected format of the map is: `key1:value1,key2:value2`. 
+ Parses a string as a map of strings. For example if there are table columns 
called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of 
https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used 
for the
+lookup query will dynamically pickup the values for `customerId`, `orderId` 
and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
+The expected format of the map is: `key1:value1,key2:value2`.
 
 As these values are being supplied as URL segments or part or query 
parameters, the connector url encodes that content so characters like spaces
 do not appear invalidly in the URL. In the case where the complete url is the 
insert then url encoding is not performed; the url needs to be valid and already
-properly url encoded as appropriate.   
+properly url encoded as appropriate.
 
 **Example Scenario around clashing request and response columns:**
 
@@ -281,7 +281,7 @@ CREATE TABLE CustomerLookup (
 ) WITH (
     'connector' = 'http',
     'format' = 'json',
-    'url' = 
'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+    'url' = 
'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
     'lookup-method' = 'GET',
     'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
 )
@@ -360,10 +360,10 @@ This query creator allows you to populate API calls very 
flexibly. To do this ef
 8) If you start from an OpenAPI specification that contains nested content 
required as a lookup join key, then use `http.request.body-template` to map 
top-level columns into that structure.
 9) Response content is mapped to matching named top-level columns in the 
lookup table. You should arrange your table columns so that some are request 
columns (all top level) and some are response columns.
 10) Use single quotes for the value of `http.request.body-template` so you do 
not need to escape the double quotes, and add newline characters for 
readability.
-11) If you want to enrich every event with the same API content, you can 
specify a placeholder as the complete URL the `url`, then use 
`http.request.url-map` to map it. In this scenario switching on caching is 
advised to avoid repeated identical API calls. 
+11) If you want to enrich every event with the same API content, you can 
specify a placeholder as the complete URL the `url`, then use 
`http.request.url-map` to map it. In this scenario switching on caching is 
advised to avoid repeated identical API calls.
 12) Note that columns in SQL tables (the DDL) do not have a natural way to 
distinguish between request and response fields. Where possible, use the API 
field name as column names in the DDL; this minimizes the number of columns you 
need to define.
 13) The exception to 12) is when a response API field name is the same as a 
request API field **and** they have incompatible types. In this case, define 
the request column with a different name, then use 
`http.request.query-param-fields-with-key`, `http.request.body-template`, 
and/or `http.request.url-map` to provide the mapping to the API field.
-14) Note the columns representing the response are those that should be used 
for enrichment. 
+14) Note the columns representing the response are those that should be used 
for enrichment.
 
 ### Format considerations
 
@@ -654,8 +654,8 @@ Metadata columns can be specified and hold http 
information. They are optional r
 If the `error-string` metadata column is defined on the table and the call 
succeeds then it will have a null value.
 When the HTTP response cannot be deserialized, then the 
`http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
 and the `error-string` will be the response body.
-When the HTTP status code is in the 
`http.source.lookup.ignored-response-codes`, then the `http-completion-state` 
will 
-be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain 
information about the API call that 
+When the HTTP status code is in the 
`http.source.lookup.ignored-response-codes`, then the `http-completion-state` 
will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain 
information about the API call that
 occurred.
 
 When a HTTP lookup call fails and populates the metadata columns with the 
error information, the expected enrichment columns from the HTTP call
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 6dfb414..c17e4a8 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -29,6 +29,12 @@ public final class HttpConnectorConfigConstants {
 
     public static final String PROP_DELIM = ",";
 
+    /** Placeholder delimiter for template substitution (start). */
+    public static final String PLACEHOLDER_START = "{{";
+
+    /** Placeholder delimiter for template substitution (end). */
+    public static final String PLACEHOLDER_END = "}}";
+
     /** A property prefix for http connector. */
     public static final String FLINK_CONNECTOR_HTTP = "http.";
 
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 5379736..7d115c6 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -37,6 +37,9 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
 
+import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_END;
+import static 
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_START;
+
 /** Base class for {@link HttpRequest} factories. */
 @Slf4j
 public abstract class RequestFactoryBase implements HttpRequestFactory {
@@ -130,7 +133,7 @@ public abstract class RequestFactoryBase implements 
HttpRequestFactory {
         if (lookupQueryInfo.hasPathBasedUrlParameters()) {
             for (Map.Entry<String, String> entry :
                     lookupQueryInfo.getPathBasedUrlParameters().entrySet()) {
-                String pathParam = "{" + entry.getKey() + "}";
+                String pathParam = PLACEHOLDER_START + entry.getKey() + 
PLACEHOLDER_END;
                 int startIndex = resolvedUrl.indexOf(pathParam);
                 if (startIndex == -1) {
                     throw new FlinkRuntimeException(
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index b273dc5..318ddd9 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.http.LookupArg;
 import org.apache.flink.connector.http.LookupQueryCreator;
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
 import org.apache.flink.connector.http.table.lookup.LookupQueryInfo;
 import org.apache.flink.connector.http.table.lookup.LookupRow;
 import org.apache.flink.connector.http.utils.SerializationSchemaUtils;
@@ -51,23 +52,23 @@ import java.util.regex.Pattern;
  * Generic JSON and URL query creator; in addition to be able to map columns 
to json requests, it
  * allows url inserts to be mapped to column names using templating. <br>
  * <br>
- * For PUT and POST, parameters are mapped to the json body e.g. for the body 
template "id1;id2" and
- * url of http://base. At lookup time with values of id1=1 and id2=2 as call 
of http/base will be
- * issued with a json payload of {"id1":1,"id2":2} <br>
- * For all http methods, url segments and query parameters can be used to 
include lookup up values.
- * Using the map from 
<code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of
- * the insert name and the value of the associated column. e.g. for <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
- * </code> = "key1":"col1" and url of http://base/{key1}. At lookup time with 
values of col1="aaaa"
- * a call of http/base/aaaa will be issued. For query parameters, the query 
param should be supplied
- * in the URL with a place-holder that will be resolved using <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code>
+ * For PUT and POST, parameters are mapped to the json body e.g. for 
REQUEST_PARAM_FIELDS =
+ * "id1;id2" and url of http://base. At lookup time with values of id1=1 and 
id2=2 as call of
+ * http/base will be issued with a json payload of {"id1":1,"id2":2} <br>
+ * For all http methods, url segments can be used to include lookup up values. 
Using the map from
+ * <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key 
of the insert and the
+ * value of the associated column. e.g. for 
<code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
+ * </code> = "key1":"col1" and url of http://base/{{key1}}. At lookup time 
with values of
+ * col1="aaaa" a call of http/base/aaaa will be issued.
  */
 @Slf4j
 public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
     private static final long serialVersionUID = 1L;
     private static final Pattern TEMPLATE_PLACEHOLDER_PATTERN =
-            Pattern.compile("\\{\\{([^}]+)\\}\\}");
+            Pattern.compile(
+                    
Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_START)
+                            + "([^{}]+)"
+                            + 
Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_END));
 
     // not final so we can mutate for unit test
     private SerializationSchema<RowData> serializationSchema;
@@ -164,8 +165,10 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
             if (fieldValue == null) {
                 throw new IllegalArgumentException(
                         String.format(
-                                "Template placeholder {{%s}} references a 
field that does not exist in the lookup row",
-                                fieldName));
+                                "Template placeholder %s%s%s references a 
field that does not exist in the lookup row",
+                                HttpConnectorConfigConstants.PLACEHOLDER_START,
+                                fieldName,
+                                HttpConnectorConfigConstants.PLACEHOLDER_END));
             }
 
             String valueStr =
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index d7b08db..bf20e9c 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -66,7 +66,7 @@ public class GenericJsonAndUrlQueryCreatorFactory implements 
LookupQueryCreatorF
                                     + "<br>"
                                     + "For example if there are table columns 
called customerId"
                                     + " and orderId, then specifying value 
customerId:cid1,orderID:oid"
-                                    + " and a url of 
https://myendpoint/customers/{cid}/orders/{oid}";
+                                    + " and a url of 
https://myendpoint/customers/{{cid}}/orders/{{oid}}";
                                     + " will mean that the url used for the 
lookup query will"
                                     + " dynamically pickup the values for 
customerId, orderId"
                                     + " and use them in the url."
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
index 056aff5..5e0d973 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
@@ -125,14 +125,14 @@ public class BodyBasedRequestFactoryTest {
                 new TestSpec(
                         null,
                         Map.of("param1", "value1"),
-                        "http://service/{param1}";,
+                        "http://service/{{param1}}";,
                         lookupMethod,
                         "http://service/value1";),
                 // 2 path param
                 new TestSpec(
                         null,
                         Map.of("param1", "value1", "param2", "value2"),
-                        "http://service/{param1}/param2/{param2}";,
+                        "http://service/{{param1}}/param2/{{param2}}";,
                         lookupMethod,
                         "http://service/value1/param2/value2";),
                 // 1 query param
@@ -160,35 +160,35 @@ public class BodyBasedRequestFactoryTest {
                 new TestSpec(
                         Map.of("param3", "value3", "param4", "value4"),
                         Map.of("param1", "value1", "param2", "value2"),
-                        "http://service/{param1}/param2/{param2}";,
+                        "http://service/{{param1}}/param2/{{param2}}";,
                         lookupMethod,
                         
"http://service/value1/param2/value2?param3=value3&param4=value4";),
                 // URL encoding: path param with spaces
                 new TestSpec(
                         null,
                         Map.of("param1", "hello world"),
-                        "http://service/{param1}";,
+                        "http://service/{{param1}}";,
                         lookupMethod,
                         "http://service/hello+world";),
                 // URL encoding: path param with special characters
                 new TestSpec(
                         null,
                         Map.of("param1", "[email protected]"),
-                        "http://service/{param1}";,
+                        "http://service/{{param1}}";,
                         lookupMethod,
                         "http://service/user%40example.com";),
                 // URL encoding: path param with slash
                 new TestSpec(
                         null,
                         Map.of("param1", "path/to/resource"),
-                        "http://service/{param1}";,
+                        "http://service/{{param1}}";,
                         lookupMethod,
                         "http://service/path%2Fto%2Fresource";),
                 // URL encoding: multiple path params with special characters
                 new TestSpec(
                         null,
                         Map.of("param1", "hello world", "param2", 
"[email protected]"),
-                        "http://service/{param1}/users/{param2}";,
+                        "http://service/{{param1}}/users/{{param2}}";,
                         lookupMethod,
                         "http://service/hello+world/users/user%40example.com";),
                 // URL encoding: query param with special characters (?, &, ;, 
space)
@@ -209,7 +209,7 @@ public class BodyBasedRequestFactoryTest {
                 new TestSpec(
                         Map.of("query1", "value?test"),
                         Map.of("path1", "user@domain"),
-                        "http://service/{path1}";,
+                        "http://service/{{path1}}";,
                         lookupMethod,
                         "http://service/user%40domain?query1=value%3Ftest";),
                 // Complete URL replacement with URL-encoded parts
@@ -218,7 +218,7 @@ public class BodyBasedRequestFactoryTest {
                         Map.of(
                                 "url",
                                 
"https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc";),
-                        "{url}",
+                        "{{url}}",
                         lookupMethod,
                         
"https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc";));
     }
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index 769303d..5497f1d 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -969,7 +969,7 @@ class HttpLookupTableSourceITCaseTest {
                         + "  `email` STRING\n"
                         + ") WITH ("
                         + "'connector' = 'http',"
-                        + "'url' = '{url}',"
+                        + "'url' = '{{url}}',"
                         + "'http.request.url-map' = 'url:url',"
                         + "'format' = 'json',"
                         + "'asyncPolling' = 'false',"
@@ -1235,7 +1235,7 @@ class HttpLookupTableSourceITCaseTest {
                         + "'lookup-method' = 'GET',"
                         + "'url' = 'http://localhost:";
                         + serverPort2
-                        + "/client?customer={customer}&id2={id2}',"
+                        + "/client?customer={{customer}}&id2={{id2}}',"
                         + "'http.source.lookup.header.Content-Type' = 
'application/json',"
                         + "'asyncPolling' = 'true',"
                         + "'http.source.lookup.query-creator' = 
'http-generic-json-url',"
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
index c627746..0a7521f 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
@@ -68,7 +68,7 @@ class LookupQueryInfoTest {
         Map<String, String> pathBasedUrlPathParameters = Map.of("key1", 
"value1");
 
         lookupQueryInfo =
-                new LookupQueryInfo("http://service/{key1}";, null, 
pathBasedUrlPathParameters);
+                new LookupQueryInfo("http://service/{{key1}}";, null, 
pathBasedUrlPathParameters);
 
         assertThat(lookupQueryInfo.hasLookupQuery()).isTrue();
         assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index defb4ed..d54efd1 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -363,6 +363,31 @@ class GenericJsonAndUrlQueryCreatorTest {
                 .hasMessageContaining("does not exist");
     }
 
+    @Test
+    public void testBodyTemplateWithInvalidNestedPlaceholders() {
+        // GIVEN - Body template with invalid nested placeholders like 
{{aaa}}/{{bbb}}
+        Configuration config = new Configuration();
+        config.set(LOOKUP_METHOD, "POST");
+        config.set(REQUEST_BODY_TEMPLATE, "{\"path\":\"{{aaa}}/{{bbb}}\"}");
+
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        lookupRow.setLookupPhysicalRowDataType(DATATYPE_1);
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN/THEN - Should throw IllegalArgumentException because 
{{aaa}}/{{bbb}} is not a valid
+        // field name
+        assertThatThrownBy(() -> creator.createLookupQuery(ROWDATA))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not exist");
+    }
+
     @Test
     public void testBodyTemplateWithComplexNestedStructureAndTimestamps() 
throws Exception {
         // GIVEN - Complex template with primitives, arrays, nested objects, 
timestamps, and

Reply via email to