This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 77e1e9447d NIFI-14928 Fixed PutBigQuery fails for sending data into 
DATETIME columns (#10258)
77e1e9447d is described below

commit 77e1e9447dc297b3b55097c2948f2b92dce39310
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 16 18:12:01 2025 +0200

    NIFI-14928 Fixed PutBigQuery fails for sending data into DATETIME columns 
(#10258)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/gcp/bigquery/PutBigQuery.java  | 149 ++++++++++++++++-----
 .../processors/gcp/bigquery/PutBigQueryIT.java     |   6 +-
 .../bigquery/schema-correct-data-with-date.avsc    |   9 +-
 ...streaming-correct-data-with-date-formatted.json |   6 +-
 .../bigquery/streaming-correct-data-with-date.json |   8 +-
 5 files changed, 139 insertions(+), 39 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 969826c8e6..c801bdc56c 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -38,6 +38,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoSchema;
 import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
 import com.google.cloud.bigquery.storage.v1.StorageError;
 import com.google.cloud.bigquery.storage.v1.StreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
 import com.google.cloud.bigquery.storage.v1.TableName;
 import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
@@ -76,6 +77,7 @@ import java.net.Proxy;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -281,7 +283,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
     }
 
     private DynamicMessage recordToProtoMessage(Record record, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema 
tableSchema) {
-        Map<String, Object> valueMap = convertMapRecord(record.toMap());
+        Map<String, Object> valueMap = convertMapRecord(record.toMap(), 
tableSchema.getFieldsList());
         DynamicMessage message = null;
         try {
             message = ProtoUtils.createMessage(descriptor, valueMap, 
tableSchema);
@@ -383,10 +385,25 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
             }
 
             error.compareAndSet(null, 
Optional.ofNullable(Exceptions.toStorageException(throwable))
-                .map(RuntimeException.class::cast)
-                .orElse(new RuntimeException(throwable)));
+                    .map(RuntimeException.class::cast)
+                    .orElse(new RuntimeException(throwable)));
+
+            // Provide detailed row-level error logging when available from 
the client
+            if (throwable instanceof Exceptions.AppendSerializationError) {
+                try {
+                    final Exceptions.AppendSerializationError 
serializationError = (Exceptions.AppendSerializationError) throwable;
+                    final Map<Integer, String> rowErrors = 
serializationError.getRowIndexToErrorMessage();
+                    if (rowErrors != null && !rowErrors.isEmpty()) {
+                        final String firstError = 
rowErrors.values().iterator().next();
+                        getLogger().error("Failure during appending data. 
First error: %s".formatted(firstError), throwable);
+                    }
+                } catch (Throwable logError) {
+                    getLogger().error("Failure during appending data", 
throwable);
+                }
+            } else {
+                getLogger().error("Failure during appending data", throwable);
+            }
 
-            getLogger().error("Failure during appending data", throwable);
             inflightRequestCount.arriveAndDeregister();
         }
     }
@@ -482,39 +499,109 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
         }
     }
 
-    private static Map<String, Object> convertMapRecord(Map<String, Object> 
map) {
+    private static Map<String, Object> convertMapRecord(Map<String, Object> 
map, List<TableFieldSchema> tableFields) {
         Map<String, Object> result = new HashMap<>();
-        for (String key : map.keySet()) {
-            Object obj = map.get(key);
-            // BigQuery is not case sensitive on the column names but the 
protobuf message
-            // expect all column names to be lower case
-            key = key.toLowerCase();
-            if (obj instanceof MapRecord) {
-                result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
-            } else if (obj instanceof Object[]
-                && ((Object[]) obj).length > 0
-                && ((Object[]) obj)[0] instanceof MapRecord) {
-                List<Map<String, Object>> lmapr = new ArrayList<>();
-                for (Object mapr : ((Object[]) obj)) {
-                    lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
+        for (String rawKey : map.keySet()) {
+            String key = rawKey.toLowerCase();
+            Object obj = map.get(rawKey);
+
+            TableFieldSchema fieldSchema = findFieldSchema(tableFields, key);
+            if (fieldSchema != null && fieldSchema.getType() == 
TableFieldSchema.Type.STRUCT) {
+                // Nested RECORD type
+                if (obj instanceof MapRecord) {
+                    result.put(key, convertMapRecord(((MapRecord) 
obj).toMap(), fieldSchema.getFieldsList()));
+                    continue;
+                } else if (obj instanceof Object[] && ((Object[]) obj).length 
> 0 && ((Object[]) obj)[0] instanceof MapRecord) {
+                    List<Map<String, Object>> list = new ArrayList<>();
+                    for (Object item : (Object[]) obj) {
+                        list.add(convertMapRecord(((MapRecord) item).toMap(), 
fieldSchema.getFieldsList()));
+                    }
+                    result.put(key, list);
+                    continue;
+                }
+            } else if (obj instanceof Object[] && ((Object[]) obj).length > 0 
&& ((Object[]) obj)[0] instanceof MapRecord) {
+                // Repeated RECORDs without proper schema match; best effort
+                List<Map<String, Object>> list = new ArrayList<>();
+                for (Object item : (Object[]) obj) {
+                    list.add(convertMapRecord(((MapRecord) item).toMap(), 
fieldSchema != null ? fieldSchema.getFieldsList() : List.of()));
+                }
+                result.put(key, list);
+                continue;
+            }
+
+            // Scalar conversions guided by schema when available
+            if (fieldSchema != null) {
+                switch (fieldSchema.getType()) {
+                    case TIMESTAMP:
+                        if (obj instanceof Timestamp) {
+                            result.put(key, ((Timestamp) obj).getTime() * 
1000);
+                            break;
+                        }
+                        // fall through to default if not Timestamp
+                    case TIME:
+                        if (obj instanceof Time) {
+                            LocalTime time = ((Time) obj).toLocalTime();
+                            org.threeten.bp.LocalTime localTime = 
org.threeten.bp.LocalTime.of(
+                                time.getHour(),
+                                time.getMinute(),
+                                time.getSecond());
+                            result.put(key, 
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
+                            break;
+                        }
+                        // fall through
+                    case DATE:
+                        if (obj instanceof Date) {
+                            result.put(key, (int) ((Date) 
obj).toLocalDate().toEpochDay());
+                            break;
+                        }
+                        // fall through
+                    case DATETIME:
+                        if (obj instanceof Timestamp) {
+                            LocalDateTime ldt = ((Timestamp) 
obj).toLocalDateTime();
+                            org.threeten.bp.LocalDateTime civil = 
org.threeten.bp.LocalDateTime.of(
+                                    ldt.getYear(),
+                                    ldt.getMonthValue(),
+                                    ldt.getDayOfMonth(),
+                                    ldt.getHour(),
+                                    ldt.getMinute(),
+                                    ldt.getSecond(),
+                                    ldt.getNano());
+                            result.put(key, 
CivilTimeEncoder.encodePacked64DatetimeMicros(civil));
+                            break;
+                        }
+                        // fall through
+                    default:
+                        result.put(key, obj);
+                        break;
                 }
-                result.put(key, lmapr);
-            } else if (obj instanceof Timestamp) {
-                result.put(key, ((Timestamp) obj).getTime() * 1000);
-            } else if (obj instanceof Time) {
-                LocalTime time = ((Time) obj).toLocalTime();
-                org.threeten.bp.LocalTime localTime = 
org.threeten.bp.LocalTime.of(
-                    time.getHour(),
-                    time.getMinute(),
-                    time.getSecond());
-                result.put(key, 
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
-            } else if (obj instanceof Date) {
-                result.put(key, (int) ((Date) obj).toLocalDate().toEpochDay());
             } else {
-                result.put(key, obj);
+                // No schema available for field; apply basic conversions for 
JDBC temporal types
+                if (obj instanceof Timestamp) {
+                    result.put(key, ((Timestamp) obj).getTime() * 1000);
+                } else if (obj instanceof Time) {
+                    LocalTime time = ((Time) obj).toLocalTime();
+                    org.threeten.bp.LocalTime localTime = 
org.threeten.bp.LocalTime.of(
+                            time.getHour(),
+                            time.getMinute(),
+                            time.getSecond());
+                    result.put(key, 
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
+                } else if (obj instanceof Date) {
+                    result.put(key, (int) ((Date) 
obj).toLocalDate().toEpochDay());
+                } else {
+                    result.put(key, obj);
+                }
             }
         }
 
         return result;
     }
+
+    private static TableFieldSchema findFieldSchema(List<TableFieldSchema> 
fields, String keyLower) {
+        for (TableFieldSchema f : fields) {
+            if (f.getName().equalsIgnoreCase(keyLower)) {
+                return f;
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
index 25cbfb6e40..ba15106d21 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
@@ -218,7 +218,8 @@ public class PutBigQueryIT {
         runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
recordSchema);
         runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, 
"MM/dd/yyyy");
         runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
-        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"MM-dd-yyyy HH:mm:ss Z");
+        // Use zone name format to parse values like "UTC"
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"MM-dd-yyyy HH:mm:ss z");
         runner.enableControllerService(jsonReader);
 
         runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
@@ -468,7 +469,8 @@ public class PutBigQueryIT {
         Field date = Field.newBuilder("date", 
LegacySQLTypeName.DATE).setMode(Field.Mode.NULLABLE).build();
         Field time = Field.newBuilder("time", 
LegacySQLTypeName.TIME).setMode(Field.Mode.NULLABLE).build();
         Field full = Field.newBuilder("full", 
LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build();
-        Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, 
date, time, full).setMode(Field.Mode.NULLABLE).build();
+        Field datetime = Field.newBuilder("datetime", 
StandardSQLTypeName.DATETIME).setMode(Field.Mode.NULLABLE).build();
+        Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, 
date, time, full, datetime).setMode(Field.Mode.NULLABLE).build();
 
         Field numeric = Field.newBuilder("numeric", 
StandardSQLTypeName.NUMERIC).setMode(Field.Mode.NULLABLE).build();
         Field floatc = Field.newBuilder("floatc", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
index 8ea7012508..3adb27d727 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
@@ -84,6 +84,13 @@
                             "type": "long",
                             "logicalType": "timestamp-millis"
                         } ]
+                    },
+                    {
+                        "name": "datetime",
+                        "type": ["null", {
+                            "type": "long",
+                            "logicalType": "timestamp-millis"
+                        } ]
                     }
                 ]
             }
@@ -101,4 +108,4 @@
             "type": ["null", "string"]
         }
     ]
-}
\ No newline at end of file
+}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
index b6ff43e4e9..924b0c828a 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
@@ -20,7 +20,8 @@
     "birth": {
       "date": "07/18/2021",
       "time": "12:35:24",
-      "full": "07-18-2021 12:35:24 UTC"
+      "full": "07-18-2021 12:35:24 UTC",
+      "datetime": "07-18-2021 12:35:24 UTC"
     },
     "numeric": 0,
     "floatc": 0.1,
@@ -43,7 +44,8 @@
     "birth": {
       "date": "01/01/1992",
       "time": "00:00:00",
-      "full": "01-01-1992 00:00:00 UTC"
+      "full": "01-01-1992 00:00:00 UTC",
+      "datetime": "01-01-1992 00:00:00 UTC"
     }
   }
 ]
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
index ffe4050094..8f11938523 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
@@ -20,7 +20,8 @@
     "birth": {
       "date": 1626611724000,
       "time": 1626611724000,
-      "full": 1626611724000
+      "full": 1626611724000,
+      "datetime": 1626611724000
     }
   },
   {
@@ -40,7 +41,8 @@
     "birth": {
       "date": 694224000000,
       "time": 694224000000,
-      "full": 694224000000
+      "full": 694224000000,
+      "datetime": 694224000000
     }
   }
-]
\ No newline at end of file
+]

Reply via email to