This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 234951a [FLINK-38781] Pass format options to response using JSON
format
234951a is described below
commit 234951a1835fc7bd1195a36486fac005110ae9b4
Author: David Radley <[email protected]>
AuthorDate: Thu Dec 11 15:57:12 2025 +0000
[FLINK-38781] Pass format options to response using JSON format
---
docs/content.zh/docs/connectors/table/http.md | 23 +++++++-
docs/content/docs/connectors/table/http.md | 24 +++++++-
.../table/lookup/HttpLookupTableSourceFactory.java | 16 ++++--
.../http/table/lookup/HttpTableLookupFunction.java | 15 +++--
.../lookup/HttpLookupTableSourceITCaseTest.java | 65 +++++++++++++++++-----
5 files changed, 118 insertions(+), 25 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index 4c6b94d..60a9fef 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -82,7 +82,7 @@ The Flink connector does have some changes that you need to
be aware of if you a
* Existing java applications will need to be recompiled to pick up the new
flink package names.
* Existing application and SQL need to be amended to use the new connector
option names. The new option names do not have
-the _com.getindata.http_ prefix, the prefix is now _http_.
+* the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
* with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against
* this connector.
@@ -241,6 +241,27 @@ POST, PUT and GET operations. This query creator allows
you to issue json reques
your own custom http connector. The mappings from columns to the json request
are supplied in the query creator configuration
parameters `http.request.query-param-fields`, `http.request.body-fields` and
`http.request.url-map`.
+### Format considerations
+
+#### For HTTP requests
+In order to use a custom format, users have to specify the option
`'lookup-request.format' = '{customFormatName}'`, where `{customFormatName}` is
the identifier of the custom format factory.
+Additionally, it is possible to pass custom query format options from table's
DDL.
+This can be done by:
`'lookup-request.format.{customFormatName}.{customFormatProperty}' =
'{propertyValue}'`, where {customFormatProperty} is the name of a custom
+property and {propertyValue} is the property value.
+For example:
+`'lookup-request.format.myCustomFormatName.foo' = 'baa'`.
+
+With the default configuration, flink-Json format is used for
`GenericGetQueryCreator`; all options defined in
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
can be passed through the table DDL.
+For example:
+`'lookup-request.format.json.fail-on-missing-field' = 'true'`.
+
+#### For HTTP responses
+Specify your format options at the top level. For example:
+```roomsql
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'true',
+```
+
### http-generic-json-url Query Creator
The default Query Creator is called _http-generic-json-url_. For body based
queries such as POST/PUT requests, the
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index de70e5e..60a9fef 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -82,9 +82,10 @@ The Flink connector does have some changes that you need to
be aware of if you a
* Existing java applications will need to be recompiled to pick up the new
flink package names.
* Existing application and SQL need to be amended to use the new connector
option names. The new option names do not have
-the _com.getindata.http_ prefix, the prefix is now _http_.
+* the _com.getindata.http_ prefix, the prefix is now _http_.
* The name of the connector and the identifiers of components that are
discovered have been changed, so that the GetInData jar file can co-exist
* with this connector's jar file. Be aware that if you have created custom
pluggable components; you will need to recompile against
+* this connector.
## Working with HTTP lookup source tables
@@ -240,6 +241,27 @@ POST, PUT and GET operations. This query creator allows
you to issue json reques
your own custom http connector. The mappings from columns to the json request
are supplied in the query creator configuration
parameters `http.request.query-param-fields`, `http.request.body-fields` and
`http.request.url-map`.
+### Format considerations
+
+#### For HTTP requests
+In order to use a custom format, users have to specify the option
`'lookup-request.format' = '{customFormatName}'`, where `{customFormatName}` is
the identifier of the custom format factory.
+Additionally, it is possible to pass custom query format options from table's
DDL.
+This can be done by:
`'lookup-request.format.{customFormatName}.{customFormatProperty}' =
'{propertyValue}'`, where {customFormatProperty} is the name of a custom
+property and {propertyValue} is the property value.
+For example:
+`'lookup-request.format.myCustomFormatName.foo' = 'baa'`.
+
+With the default configuration, flink-Json format is used for
`GenericGetQueryCreator`; all options defined in
[json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
can be passed through the table DDL.
+For example:
+`'lookup-request.format.json.fail-on-missing-field' = 'true'`.
+
+#### For HTTP responses
+Specify your format options at the top level. For example:
+```roomsql
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'true',
+```
+
### http-generic-json-url Query Creator
The default Query Creator is called _http-generic-json-url_. For body based
queries such as POST/PUT requests, the
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
index 934c570..3a1e0e7 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
@@ -97,18 +97,24 @@ public class HttpLookupTableSourceFactory implements
DynamicTableSourceFactory {
FactoryUtil.createTableFactoryHelper(this,
dynamicTableContext);
ReadableConfig readable = helper.getOptions();
+
+ DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
+
+ // Validate connector options, excluding:
+ // - "table.*" (Flink execution config options)
+ // - "lookup-request.*" (dynamic lookup-specific properties)
+ // - "gid.connector.http.*" (dynamic connector-specific properties)
+ // - LOOKUP_REQUEST_FORMAT (custom lookup format option)
+ // Format options are already validated by discoverDecodingFormat()
above
helper.validateExcept(
- // properties coming from
org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
"lookup-request.",
HttpConnectorConfigConstants.FLINK_CONNECTOR_HTTP,
LOOKUP_REQUEST_FORMAT.key());
validateHttpLookupSourceOptions(readable);
- DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
- helper.discoverDecodingFormat(
- DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
-
HttpLookupConfig lookupConfig =
getHttpLookupOptions(dynamicTableContext, readable);
ResolvedSchema resolvedSchema =
dynamicTableContext.getCatalogTable().getResolvedSchema();
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
index 750e883..3b73544 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpTableLookupFunction.java
@@ -120,11 +120,16 @@ public class HttpTableLookupFunction extends
LookupFunction {
}
}
// if we did not get the physical arity from the http response
physical row then get it from
- // the producedDataType, which is set when we have metadata
- if (physicalArity == -1 && producedDataType != null) {
- List<LogicalType> childrenLogicalTypes =
- producedDataType.getLogicalType().getChildren();
- physicalArity = childrenLogicalTypes.size() - metadataArity;
+ // the producedDataType. which is set when we have metadata or when
there's no data
+ if (physicalArity == -1) {
+ if (producedDataType == null) {
+ // If producedDataType is null and we have no data, return the
same way as ignore.
+ return Collections.emptyList();
+ } else {
+ List<LogicalType> childrenLogicalTypes =
+ producedDataType.getLogicalType().getChildren();
+ physicalArity = childrenLogicalTypes.size() - metadataArity;
+ }
}
// if there was no data, create an empty producedRow
if (producedRow == null) {
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index 1ffbf63..b126968 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -1050,7 +1050,11 @@ class HttpLookupTableSourceITCaseTest {
if (spec.badStatus) {
assertEnrichedRowsNoDataBadStatus(rows);
} else if (spec.deserError) {
- assertEnrichedRowsDeserException(rows);
+ if (spec.ignoreParseErrors) {
+ assertEnrichedRowsNoDataGoodStatus(rows);
+ } else {
+ assertEnrichedRowsDeserException(rows);
+ }
} else if (spec.connectionError) {
assertEnrichedRowsException(rows);
} else if (spec.useMetadata) {
@@ -1123,6 +1127,35 @@ class HttpLookupTableSourceITCaseTest {
});
}
+ private void assertEnrichedRowsNoDataGoodStatus(Collection<Row>
collectedRows) {
+
+ final int rowArity = 10;
+ // validate every row and its column.
+
+ assertAll(
+ () -> {
+ assertThat(collectedRows.size()).isEqualTo(4);
+ int intElement = 0;
+ for (Row row : collectedRows) {
+ intElement++;
+ assertThat(row.getArity()).isEqualTo(rowArity);
+ // "id" and "id2" columns should be different for
every row.
+
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
+
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
+ assertThat(row.getField("uuid")).isNull();
+ assertThat(row.getField("isActive")).isNull();
+ assertThat(row.getField("balance")).isNull();
+ // metadata
+ assertThat(row.getField("errStr")).isNull();
+ assertThat(row.getField("headers")).isNotNull();
+ assertThat(row.getField("statusCode")).isEqualTo(200);
+ assertEquals(
+ row.getField("completionState"),
+ HttpCompletionState.SUCCESS.name());
+ }
+ });
+ }
+
private void assertEnrichedRowsDeserException(Collection<Row>
collectedRows) {
final int rowArity = 10;
@@ -1393,17 +1426,20 @@ class HttpLookupTableSourceITCaseTest {
for (String method : Arrays.asList("GET", "POST", "PUT")) {
for (boolean asyncFlag : Arrays.asList(false, true)) {
for (boolean continueOnError : Arrays.asList(false, true)) {
- specs.add(
- TestSpec.builder()
- .testName(
- "HTTP Lookup Join With Metadata
Deserialization Error")
- .methodName(method)
- .useMetadata(true)
- .maxRows(4)
- .useAsync(asyncFlag)
- .deserError(true)
- .continueOnError(continueOnError)
- .build());
+ for (boolean ignoreParseErrors : Arrays.asList(false,
true)) {
+ specs.add(
+ TestSpec.builder()
+ .testName(
+ "HTTP Lookup Join With
Metadata Deserialization Error")
+ .methodName(method)
+ .useMetadata(true)
+ .maxRows(4)
+ .useAsync(asyncFlag)
+ .deserError(true)
+ .ignoreParseErrors(ignoreParseErrors)
+ .continueOnError(continueOnError)
+ .build());
+ }
}
}
}
@@ -1446,6 +1482,7 @@ class HttpLookupTableSourceITCaseTest {
final int maxRows;
final boolean useAsync;
final boolean continueOnError;
+ final boolean ignoreParseErrors;
@Override
public String toString() {
@@ -1529,7 +1566,9 @@ class HttpLookupTableSourceITCaseTest {
}
sql.append(") WITH (").append("'format' =
'json',").append("'connector' = 'http',");
-
+ if (spec.ignoreParseErrors) {
+ sql.append("'json.ignore-parse-errors' = 'true',");
+ }
if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) {
sql.append("'lookup-method' =
'").append(spec.methodName).append("',");
}