This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 234951a  [FLINK-38781] Pass format options to response using JSON 
format
234951a is described below

commit 234951a1835fc7bd1195a36486fac005110ae9b4
Author: David Radley <[email protected]>
AuthorDate: Thu Dec 11 15:57:12 2025 +0000

    [FLINK-38781] Pass format options to response using JSON format
---
 docs/content.zh/docs/connectors/table/http.md      | 23 +++++++-
 docs/content/docs/connectors/table/http.md         | 24 +++++++-
 .../table/lookup/HttpLookupTableSourceFactory.java | 16 ++++--
 .../http/table/lookup/HttpTableLookupFunction.java | 15 +++--
 .../lookup/HttpLookupTableSourceITCaseTest.java    | 65 +++++++++++++++++-----
 5 files changed, 118 insertions(+), 25 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index 4c6b94d..60a9fef 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -82,7 +82,7 @@ The Flink connector does have some changes that you need to 
be aware of if you a
 
 * Existing java applications will need to be recompiled to pick up the new 
flink package names.
 * Existing application and SQL need to be amended to use the new connector 
option names. The new option names do not have
-the _com.getindata.http_ prefix, the prefix is now _http_.
+* the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are 
discovered have been changed, so that the GetInData jar file can co-exist
 * with this connector's jar file. Be aware that if you have created custom 
pluggable components; you will need to recompile against
 * this connector.
@@ -241,6 +241,27 @@ POST, PUT and GET operations. This query creator allows 
you to issue json reques
 your own custom http connector. The mappings from columns to the json request 
are supplied in the query creator configuration
 parameters `http.request.query-param-fields`, `http.request.body-fields` and 
`http.request.url-map`.
 
+### Format considerations
+
+#### For HTTP requests
+In order to use a custom format, users have to specify the option 
`'lookup-request.format' = '{customFormatName}'`, where `{customFormatName}` is 
the identifier of the custom format factory.
+Additionally, it is possible to pass custom query format options from table's 
DDL.
+This can be done by: 
`'lookup-request.format.{customFormatName}.{customFormatProperty}' = 
'{propertyValue}'`, where {customFormatProperty} is the name of a custom
+property and {propertyValue} is the property value.
+For example:
+`'lookup-request.format.myCustomFormatName.foo' = 'baa'`.
+
+With the default configuration, flink-Json format is used for 
`GenericGetQueryCreator`; all options defined in 
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
 can be passed through the table DDL.
+For example:
+`'lookup-request.format.json.fail-on-missing-field' = 'true'`.
+
+#### For HTTP responses
+Specify your format options at the top level. For example:
+```roomsql
+       'format' = 'json',
+       'json.ignore-parse-errors' = 'true',
+```
+
 ### http-generic-json-url Query Creator
 
 The default Query Creator is called _http-generic-json-url_.  For body based 
queries such as POST/PUT requests, the
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index de70e5e..60a9fef 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -82,9 +82,10 @@ The Flink connector does have some changes that you need to 
be aware of if you a
 
 * Existing java applications will need to be recompiled to pick up the new 
flink package names.
 * Existing application and SQL need to be amended to use the new connector 
option names. The new option names do not have
-the _com.getindata.http_ prefix, the prefix is now _http_.
+* the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are 
discovered have been changed, so that the GetInData jar file can co-exist
 * with this connector's jar file. Be aware that if you have created custom 
pluggable components; you will need to recompile against
+* this connector.
 
 ## Working with HTTP lookup source tables
 
@@ -240,6 +241,27 @@ POST, PUT and GET operations. This query creator allows 
you to issue json reques
 your own custom http connector. The mappings from columns to the json request 
are supplied in the query creator configuration
 parameters `http.request.query-param-fields`, `http.request.body-fields` and 
`http.request.url-map`.
 
+### Format considerations
+
+#### For HTTP requests
+In order to use a custom format, users have to specify the option 
`'lookup-request.format' = '{customFormatName}'`, where `{customFormatName}` is 
the identifier of the custom format factory.
+Additionally, it is possible to pass custom query format options from table's 
DDL.
+This can be done by: 
`'lookup-request.format.{customFormatName}.{customFormatProperty}' = 
'{propertyValue}'`, where {customFormatProperty} is the name of a custom
+property and {propertyValue} is the property value.
+For example:
+`'lookup-request.format.myCustomFormatName.foo' = 'baa'`.
+
+With the default configuration, flink-Json format is used for 
`GenericGetQueryCreator`; all options defined in 
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
 can be passed through the table DDL.
+For example:
+`'lookup-request.format.json.fail-on-missing-field' = 'true'`.
+
+#### For HTTP responses
+Specify your format options at the top level. For example:
+```roomsql
+       'format' = 'json',
+       'json.ignore-parse-errors' = 'true',
+```
+
 ### http-generic-json-url Query Creator
 
 The default Query Creator is called _http-generic-json-url_.  For body based 
queries such as POST/PUT requests, the
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
index 934c570..3a1e0e7 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
@@ -97,18 +97,24 @@ public class HttpLookupTableSourceFactory implements 
DynamicTableSourceFactory {
                 FactoryUtil.createTableFactoryHelper(this, 
dynamicTableContext);
 
         ReadableConfig readable = helper.getOptions();
+
+        DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                helper.discoverDecodingFormat(
+                        DeserializationFormatFactory.class, 
FactoryUtil.FORMAT);
+
+        // Validate connector options, excluding:
+        // - "table.*" (Flink execution config options)
+        // - "lookup-request.*" (dynamic lookup-specific properties)
+        // - "gid.connector.http.*" (dynamic connector-specific properties)
+        // - LOOKUP_REQUEST_FORMAT (custom lookup format option)
+        // Format options are already validated by discoverDecodingFormat() 
above
         helper.validateExcept(
-                // properties coming from 
org.apache.flink.table.api.config.ExecutionConfigOptions
                 "table.",
                 "lookup-request.",
                 HttpConnectorConfigConstants.FLINK_CONNECTOR_HTTP,
                 LOOKUP_REQUEST_FORMAT.key());
         validateHttpLookupSourceOptions(readable);
 
-        DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
-                helper.discoverDecodingFormat(
-                        DeserializationFormatFactory.class, 
FactoryUtil.FORMAT);
-
         HttpLookupConfig lookupConfig = 
getHttpLookupOptions(dynamicTableContext, readable);
 
         ResolvedSchema resolvedSchema = 
dynamicTableContext.getCatalogTable().getResolvedSchema();
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
index 750e883..3b73544 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
@@ -120,11 +120,16 @@ public class HttpTableLookupFunction extends 
LookupFunction {
             }
         }
         // if we did not get the physical arity from the http response 
physical row then get it from
-        // the producedDataType, which is set when we have metadata
-        if (physicalArity == -1 && producedDataType != null) {
-            List<LogicalType> childrenLogicalTypes =
-                    producedDataType.getLogicalType().getChildren();
-            physicalArity = childrenLogicalTypes.size() - metadataArity;
+        // the producedDataType. which is set when we have metadata or when 
there's no data
+        if (physicalArity == -1) {
+            if (producedDataType == null) {
+                // If producedDataType is null and we have no data, return the 
same way as ignore.
+                return Collections.emptyList();
+            } else {
+                List<LogicalType> childrenLogicalTypes =
+                        producedDataType.getLogicalType().getChildren();
+                physicalArity = childrenLogicalTypes.size() - metadataArity;
+            }
         }
         // if there was no data, create an empty producedRow
         if (producedRow == null) {
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index 1ffbf63..b126968 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -1050,7 +1050,11 @@ class HttpLookupTableSourceITCaseTest {
         if (spec.badStatus) {
             assertEnrichedRowsNoDataBadStatus(rows);
         } else if (spec.deserError) {
-            assertEnrichedRowsDeserException(rows);
+            if (spec.ignoreParseErrors) {
+                assertEnrichedRowsNoDataGoodStatus(rows);
+            } else {
+                assertEnrichedRowsDeserException(rows);
+            }
         } else if (spec.connectionError) {
             assertEnrichedRowsException(rows);
         } else if (spec.useMetadata) {
@@ -1123,6 +1127,35 @@ class HttpLookupTableSourceITCaseTest {
                 });
     }
 
+    private void assertEnrichedRowsNoDataGoodStatus(Collection<Row> 
collectedRows) {
+
+        final int rowArity = 10;
+        // validate every row and its column.
+
+        assertAll(
+                () -> {
+                    assertThat(collectedRows.size()).isEqualTo(4);
+                    int intElement = 0;
+                    for (Row row : collectedRows) {
+                        intElement++;
+                        assertThat(row.getArity()).isEqualTo(rowArity);
+                        // "id" and "id2" columns should be different for 
every row.
+                        
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
+                        
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
+                        assertThat(row.getField("uuid")).isNull();
+                        assertThat(row.getField("isActive")).isNull();
+                        assertThat(row.getField("balance")).isNull();
+                        // metadata
+                        assertThat(row.getField("errStr")).isNull();
+                        assertThat(row.getField("headers")).isNotNull();
+                        assertThat(row.getField("statusCode")).isEqualTo(200);
+                        assertEquals(
+                                row.getField("completionState"),
+                                HttpCompletionState.SUCCESS.name());
+                    }
+                });
+    }
+
     private void assertEnrichedRowsDeserException(Collection<Row> 
collectedRows) {
 
         final int rowArity = 10;
@@ -1393,17 +1426,20 @@ class HttpLookupTableSourceITCaseTest {
         for (String method : Arrays.asList("GET", "POST", "PUT")) {
             for (boolean asyncFlag : Arrays.asList(false, true)) {
                 for (boolean continueOnError : Arrays.asList(false, true)) {
-                    specs.add(
-                            TestSpec.builder()
-                                    .testName(
-                                            "HTTP Lookup Join With Metadata 
Deserialization Error")
-                                    .methodName(method)
-                                    .useMetadata(true)
-                                    .maxRows(4)
-                                    .useAsync(asyncFlag)
-                                    .deserError(true)
-                                    .continueOnError(continueOnError)
-                                    .build());
+                    for (boolean ignoreParseErrors : Arrays.asList(false, 
true)) {
+                        specs.add(
+                                TestSpec.builder()
+                                        .testName(
+                                                "HTTP Lookup Join With 
Metadata Deserialization Error")
+                                        .methodName(method)
+                                        .useMetadata(true)
+                                        .maxRows(4)
+                                        .useAsync(asyncFlag)
+                                        .deserError(true)
+                                        .ignoreParseErrors(ignoreParseErrors)
+                                        .continueOnError(continueOnError)
+                                        .build());
+                    }
                 }
             }
         }
@@ -1446,6 +1482,7 @@ class HttpLookupTableSourceITCaseTest {
         final int maxRows;
         final boolean useAsync;
         final boolean continueOnError;
+        final boolean ignoreParseErrors;
 
         @Override
         public String toString() {
@@ -1529,7 +1566,9 @@ class HttpLookupTableSourceITCaseTest {
         }
 
         sql.append(") WITH (").append("'format' = 
'json',").append("'connector' = 'http',");
-
+        if (spec.ignoreParseErrors) {
+            sql.append("'json.ignore-parse-errors' = 'true',");
+        }
         if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) {
             sql.append("'lookup-method' = 
'").append(spec.methodName).append("',");
         }

Reply via email to