This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new fd9e218  [FLINK-38816] Use Flink 2 compatible deserialize in 
`‎JavaNetHttpPollingClient`
fd9e218 is described below

commit fd9e218f1ccb7419cb5083b5de53f3f6124df33c
Author: David Radley <[email protected]>
AuthorDate: Mon Jan 12 12:43:50 2026 +0000

    [FLINK-38816] Use Flink 2 compatible deserialize in 
`‎JavaNetHttpPollingClient`
---
 .gitignore                                         |   1 +
 .../table/lookup/JavaNetHttpPollingClient.java     |  40 +++-
 .../table/lookup/JavaNetHttpPollingClientTest.java | 224 +++++++++++++++++++++
 3 files changed, 255 insertions(+), 10 deletions(-)

diff --git a/.gitignore b/.gitignore
index 649e1b7..05a0152 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
 .gitignore.swp
 .project
 .settings
+.metals
 .DS_Store
 /.java-version
 .eslintcache
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
index dd9f60e..81260ed 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClient.java
@@ -326,22 +326,42 @@ public class JavaNetHttpPollingClient implements 
PollingClient {
         }
     }
 
-    private List<RowData> deserializeSingleValue(byte[] rawBytes) throws 
IOException {
-        return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
-                .map(Collections::singletonList)
-                .orElse(Collections.emptyList());
+    @VisibleForTesting
+    List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
+        List<RowData> result = new ArrayList<>();
+        responseBodyDecoder.deserialize(rawBytes, new ListCollector(result));
+        return Collections.unmodifiableList(result);
     }
 
-    private List<RowData> deserializeArray(byte[] rawBytes) throws IOException 
{
+    private static class ListCollector implements 
org.apache.flink.util.Collector<RowData> {
+        private final List<RowData> list;
+
+        ListCollector(List<RowData> list) {
+            this.list = list;
+        }
+
+        @Override
+        public void collect(RowData record) {
+            list.add(record);
+        }
+
+        @Override
+        public void close() {
+            // No-op
+        }
+    }
+
+    @VisibleForTesting
+    List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
         List<JsonNode> rawObjects = objectMapper.readValue(rawBytes, new 
TypeReference<>() {});
         List<RowData> result = new ArrayList<>();
         for (JsonNode rawObject : rawObjects) {
             if (!(rawObject instanceof NullNode)) {
-                RowData deserialized =
-                        
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
-                // deserialize() returns null if deserialization fails
-                if (deserialized != null) {
-                    result.add(deserialized);
+                List<RowData> deserialized =
+                        
deserializeSingleValue(rawObject.toString().getBytes());
+                // deserialize() may return empty list if deserialization fails
+                if (deserialized != null && !deserialized.isEmpty()) {
+                    result.addAll(deserialized);
                 }
             }
         }
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
index 9eb7cf6..14689b5 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/JavaNetHttpPollingClientTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.ConfigurationException;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -234,4 +236,226 @@ public class JavaNetHttpPollingClientTest {
                 "no-cache, no-store, max-age=0, must-revalidate");
         assertPropertyArray(headersAndValues, "Access-Control-Allow-Origin", 
"*");
     }
+
+    @Test
+    public void deserializeSingleValueTest() throws ConfigurationException, 
IOException {
+        // GIVEN
+        DeserializationSchema<RowData> mockDecoder =
+                new DeserializationSchema<RowData>() {
+                    @Override
+                    public RowData deserialize(byte[] message) throws 
IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public void deserialize(byte[] message, Collector<RowData> 
out)
+                            throws IOException {
+                        String msg = new String(message);
+                        
out.collect(GenericRowData.of(StringData.fromString(msg)));
+                        out.close();
+                    }
+
+                    @Override
+                    public boolean isEndOfStream(RowData nextElement) {
+                        return false;
+                    }
+
+                    @Override
+                    public 
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+                            getProducedType() {
+                        return null;
+                    }
+                };
+
+        JavaNetHttpPollingClient client =
+                new JavaNetHttpPollingClient(
+                        httpClient,
+                        mockDecoder,
+                        options,
+                        new GetRequestFactory(
+                                new GenericGetQueryCreator(lookupRow),
+                                headerPreprocessor,
+                                options));
+
+        // WHEN
+        String testString = "Test1";
+        List<RowData> result = 
client.deserializeSingleValue(testString.getBytes());
+
+        // THEN
+        assertThat(result).hasSize(1);
+        assertThat(((StringData) 
result.get(0).getString(0)).toString()).isEqualTo(testString);
+    }
+
+    @Test
+    public void shouldDeserializeArrayWithValidObjects() throws Exception {
+        // GIVEN
+        DeserializationSchema<RowData> mockDecoder =
+                new DeserializationSchema<RowData>() {
+                    @Override
+                    public RowData deserialize(byte[] message) throws 
IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public void deserialize(byte[] message, Collector<RowData> 
out)
+                            throws IOException {
+                        String msg = new String(message);
+                        if (msg.contains("value1")) {
+                            
out.collect(GenericRowData.of(StringData.fromString("row1")));
+                        } else if (msg.contains("value2")) {
+                            
out.collect(GenericRowData.of(StringData.fromString("row2")));
+                        }
+                        out.close();
+                    }
+
+                    @Override
+                    public boolean isEndOfStream(RowData nextElement) {
+                        return false;
+                    }
+
+                    @Override
+                    public 
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+                            getProducedType() {
+                        return null;
+                    }
+                };
+
+        Properties properties = new Properties();
+        properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, 
"array");
+
+        HttpLookupConfig lookupConfig =
+                
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+        JavaNetHttpPollingClient client =
+                new JavaNetHttpPollingClient(
+                        httpClient,
+                        mockDecoder,
+                        lookupConfig,
+                        new GetRequestFactory(
+                                new GenericGetQueryCreator(lookupRow),
+                                headerPreprocessor,
+                                lookupConfig));
+
+        // WHEN
+        String jsonArray = "[{\"key\":\"value1\"},{\"key\":\"value2\"}]";
+        List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+        // THEN
+        assertThat(result).isNotNull();
+        assertThat(result).hasSize(2);
+    }
+
+    @Test
+    public void shouldHandleNullNodesInArray() throws Exception {
+        // GIVEN
+        DeserializationSchema<RowData> mockDecoder =
+                new DeserializationSchema<RowData>() {
+                    @Override
+                    public RowData deserialize(byte[] message) throws 
IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public void deserialize(byte[] message, Collector<RowData> 
out)
+                            throws IOException {
+                        
out.collect(GenericRowData.of(StringData.fromString("valid")));
+                        out.close();
+                    }
+
+                    @Override
+                    public boolean isEndOfStream(RowData nextElement) {
+                        return false;
+                    }
+
+                    @Override
+                    public 
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+                            getProducedType() {
+                        return null;
+                    }
+                };
+
+        Properties properties = new Properties();
+        properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, 
"array");
+
+        HttpLookupConfig lookupConfig =
+                
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+        JavaNetHttpPollingClient client =
+                new JavaNetHttpPollingClient(
+                        httpClient,
+                        mockDecoder,
+                        lookupConfig,
+                        new GetRequestFactory(
+                                new GenericGetQueryCreator(lookupRow),
+                                headerPreprocessor,
+                                lookupConfig));
+
+        // WHEN
+        String jsonArray = "[{\"key\":\"value1\"},null,{\"key\":\"value2\"}]";
+        List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+        // THEN - null nodes should be skipped
+        assertThat(result).isNotNull();
+        assertThat(result).hasSize(2);
+    }
+
+    @Test
+    public void shouldHandleEmptyDeserializationInArray() throws Exception {
+        // GIVEN
+        DeserializationSchema<RowData> mockDecoder =
+                new DeserializationSchema<RowData>() {
+                    @Override
+                    public RowData deserialize(byte[] message) throws 
IOException {
+                        return null;
+                    }
+
+                    @Override
+                    public void deserialize(byte[] message, Collector<RowData> 
out)
+                            throws IOException {
+                        String msg = new String(message);
+                        // Only collect for specific messages, return empty 
for others
+                        if (msg.contains("\"status\":\"valid\"")) {
+                            
out.collect(GenericRowData.of(StringData.fromString("data")));
+                        }
+                        // Don't collect anything for other messages
+                        out.close();
+                    }
+
+                    @Override
+                    public boolean isEndOfStream(RowData nextElement) {
+                        return false;
+                    }
+
+                    @Override
+                    public 
org.apache.flink.api.common.typeinfo.TypeInformation<RowData>
+                            getProducedType() {
+                        return null;
+                    }
+                };
+
+        Properties properties = new Properties();
+        properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, 
"array");
+
+        HttpLookupConfig lookupConfig =
+                
HttpLookupConfig.builder().url(BASE_URL).properties(properties).build();
+
+        JavaNetHttpPollingClient client =
+                new JavaNetHttpPollingClient(
+                        httpClient,
+                        mockDecoder,
+                        lookupConfig,
+                        new GetRequestFactory(
+                                new GenericGetQueryCreator(lookupRow),
+                                headerPreprocessor,
+                                lookupConfig));
+
+        // WHEN
+        String jsonArray =
+                
"[{\"status\":\"invalid\"},{\"status\":\"valid\"},{\"status\":\"invalid\"}]";
+        List<RowData> result = client.deserializeArray(jsonArray.getBytes());
+
+        // THEN - only valid deserialization should be included
+        assertThat(result).isNotNull();
+        assertThat(result).hasSize(1);
+    }
 }

Reply via email to