This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5307c7138af Support MAP type in RowJson and fix datetime parsing with 
spaces (#38865)
5307c7138af is described below

commit 5307c7138af8b5daba7a5495aba87d53ae9b0ae7
Author: Danny McCormick <[email protected]>
AuthorDate: Thu Jun 11 16:38:30 2026 -0400

    Support MAP type in RowJson and fix datetime parsing with spaces (#38865)
---
 .../java/org/apache/beam/sdk/util/RowJson.java     | 65 +++++++++++++++++++++-
 .../beam/sdk/util/RowJsonValueExtractors.java      |  9 ++-
 .../java/org/apache/beam/sdk/util/RowJsonTest.java | 26 ++++++++-
 3 files changed, 97 insertions(+), 3 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index c63f673ade2..ccfbd87d450 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -26,6 +26,7 @@ import static 
org.apache.beam.sdk.schemas.Schema.TypeName.FLOAT;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
 import static 
org.apache.beam.sdk.util.RowJsonValueExtractors.booleanValueExtractor;
 import static 
org.apache.beam.sdk.util.RowJsonValueExtractors.byteValueExtractor;
@@ -57,6 +58,9 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.schemas.Schema;
@@ -95,7 +99,8 @@ import org.joda.time.ReadableInstant;
 })
 public class RowJson {
   private static final ImmutableSet<TypeName> SUPPORTED_TYPES =
-      ImmutableSet.of(BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, 
STRING, DECIMAL, DATETIME);
+      ImmutableSet.of(
+          BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL, 
DATETIME, MAP);
   private static final ImmutableSet<String> KNOWN_LOGICAL_TYPE_IDENTIFIERS =
       ImmutableSet.of(
           SqlTypes.DATE.getIdentifier(),
@@ -160,6 +165,14 @@ public class RowJson {
       return findUnsupportedFields(fieldType.getCollectionElementType(), 
fieldName + "[]");
     }
 
+    if (fieldTypeName.isMapType()) {
+      if (!STRING.equals(fieldType.getMapKeyType().getTypeName())) {
+        return ImmutableList.of(
+            new UnsupportedField(fieldName + ".key", 
fieldType.getMapKeyType().getTypeName()));
+      }
+      return findUnsupportedFields(fieldType.getMapValueType(), fieldName + 
"{}");
+    }
+
     if (fieldTypeName.isLogicalType()) {
       if 
(KNOWN_LOGICAL_TYPE_IDENTIFIERS.contains(fieldType.getLogicalType().getIdentifier()))
 {
         return ImmutableList.of();
@@ -303,6 +316,10 @@ public class RowJson {
         return jsonArrayToList(fieldValue);
       }
 
+      if (fieldValue.isMapType()) {
+        return jsonObjectToMap(fieldValue);
+      }
+
       if (fieldValue.typeName().isLogicalType()) {
         String identifier = fieldValue.type().getLogicalType().getIdentifier();
         if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
@@ -365,6 +382,32 @@ public class RowJson {
           .collect(toImmutableList());
     }
 
+    private Map<String, Object> jsonObjectToMap(FieldValue mapFieldValue) {
+      if (!mapFieldValue.isJsonObject()) {
+        throw new UnsupportedRowJsonException(
+            "Expected JSON object for field '"
+                + mapFieldValue.name()
+                + "'. Instead got "
+                + mapFieldValue.jsonNodeType().name());
+      }
+
+      Map<String, Object> result = new HashMap<>();
+      Iterator<Map.Entry<String, JsonNode>> fields = 
mapFieldValue.jsonValue().fields();
+      while (fields.hasNext()) {
+        Map.Entry<String, JsonNode> field = fields.next();
+        String key = field.getKey();
+        JsonNode value = field.getValue();
+
+        Object extractedValue =
+            extractJsonNodeValue(
+                FieldValue.of(
+                    mapFieldValue.name() + "['" + key + "']", 
mapFieldValue.mapValueType(), value));
+
+        result.put(key, extractedValue);
+      }
+      return result;
+    }
+
     private static Object extractJsonPrimitiveValue(FieldValue fieldValue) {
       try {
         return 
JSON_VALUE_GETTERS.get(fieldValue.typeName()).extractValue(fieldValue.jsonValue());
@@ -440,6 +483,18 @@ public class RowJson {
         return type().getRowSchema();
       }
 
+      boolean isMapType() {
+        return TypeName.MAP.equals(type().getTypeName());
+      }
+
+      FieldType mapKeyType() {
+        return type().getMapKeyType();
+      }
+
+      FieldType mapValueType() {
+        return type().getMapValueType();
+      }
+
       static FieldValue of(String name, FieldType type, JsonNode jsonValue) {
         return new AutoValue_RowJson_RowJsonDeserializer_FieldValue(name, 
type, jsonValue);
       }
@@ -538,6 +593,14 @@ public class RowJson {
         case ROW:
           writeRow((Row) value, type.getRowSchema(), gen);
           break;
+        case MAP:
+          gen.writeStartObject();
+          for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) 
value).entrySet()) {
+            gen.writeFieldName(entry.getKey().toString());
+            writeValue(gen, type.getMapValueType(), entry.getValue());
+          }
+          gen.writeEndObject();
+          break;
         case LOGICAL_TYPE:
           String identifier = type.getLogicalType().getIdentifier();
           if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
index f7a925d5c22..2179b20010d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
@@ -189,7 +189,14 @@ class RowJsonValueExtractors {
    */
   static ValueExtractor<DateTime> datetimeValueExtractor() {
     return ValidatingValueExtractor.<DateTime>builder()
-        .setExtractor(jsonNode -> DateTime.parse(jsonNode.textValue()))
+        .setExtractor(
+            jsonNode -> {
+              String text = jsonNode.textValue();
+              if (text.contains(" ")) {
+                text = text.replace(' ', 'T');
+              }
+              return DateTime.parse(text);
+            })
         .setValidator(JsonNode::isTextual)
         .build();
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
index 328765bf7f1..81f69b62c53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
@@ -80,7 +82,8 @@ public class RowJsonTest {
           makeArrayOfArraysTestCase(),
           makeNestedRowTestCase(),
           makeDoublyNestedRowTestCase(),
-          makeNullsTestCase());
+          makeNullsTestCase(),
+          makeMapFieldTestCase());
     }
 
     private static Object[] makeFlatRowTestCase() {
@@ -244,6 +247,21 @@ public class RowJsonTest {
       return new Object[] {"Nulls", schema, rowString, expectedRow};
     }
 
+    private static Object[] makeMapFieldTestCase() {
+      Schema schema =
+          Schema.builder().addMapField("f_map", FieldType.STRING, 
FieldType.INT32).build();
+
+      String rowString = "{\n" + "\"f_map\" : {\"key1\": 1, \"key2\": 2}\n" + 
"}";
+
+      Map<String, Integer> expectedMap = new HashMap<>();
+      expectedMap.put("key1", 1);
+      expectedMap.put("key2", 2);
+
+      Row expectedRow = Row.withSchema(schema).addValues(expectedMap).build();
+
+      return new Object[] {"Map field", schema, rowString, expectedRow};
+    }
+
     @Test
     public void testDeserialize() throws IOException {
       Row parsedRow =
@@ -564,6 +582,12 @@ public class RowJsonTest {
       testSupportedConversion(FieldType.DATETIME, quoted(DATETIME_STRING), 
DATETIME_VALUE);
     }
 
+    @Test
+    public void testSupportedDatetimeWithSpaceConversions() throws Exception {
+      String datetimeWithSpace = DATETIME_STRING.replace('T', ' ');
+      testSupportedConversion(FieldType.DATETIME, quoted(datetimeWithSpace), 
DATETIME_VALUE);
+    }
+
     private void testSupportedConversion(
         FieldType fieldType, String jsonFieldValue, Object 
expectedRowFieldValue) throws Exception {
 

Reply via email to