This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 77e1e9447d NIFI-14928 Fixed PutBigQuery fails for sending data into
DATETIME columns (#10258)
77e1e9447d is described below
commit 77e1e9447dc297b3b55097c2948f2b92dce39310
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Sep 16 18:12:01 2025 +0200
NIFI-14928 Fixed PutBigQuery fails for sending data into DATETIME columns
(#10258)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/gcp/bigquery/PutBigQuery.java | 149 ++++++++++++++++-----
.../processors/gcp/bigquery/PutBigQueryIT.java | 6 +-
.../bigquery/schema-correct-data-with-date.avsc | 9 +-
...streaming-correct-data-with-date-formatted.json | 6 +-
.../bigquery/streaming-correct-data-with-date.json | 8 +-
5 files changed, 139 insertions(+), 39 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 969826c8e6..c801bdc56c 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -38,6 +38,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
@@ -76,6 +77,7 @@ import java.net.Proxy;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -281,7 +283,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
}
private DynamicMessage recordToProtoMessage(Record record,
Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema
tableSchema) {
- Map<String, Object> valueMap = convertMapRecord(record.toMap());
+ Map<String, Object> valueMap = convertMapRecord(record.toMap(),
tableSchema.getFieldsList());
DynamicMessage message = null;
try {
message = ProtoUtils.createMessage(descriptor, valueMap,
tableSchema);
@@ -383,10 +385,25 @@ public class PutBigQuery extends
AbstractBigQueryProcessor {
}
error.compareAndSet(null,
Optional.ofNullable(Exceptions.toStorageException(throwable))
- .map(RuntimeException.class::cast)
- .orElse(new RuntimeException(throwable)));
+ .map(RuntimeException.class::cast)
+ .orElse(new RuntimeException(throwable)));
+
+ // Provide detailed row-level error logging when available from
the client
+ if (throwable instanceof Exceptions.AppendSerializationError) {
+ try {
+ final Exceptions.AppendSerializationError
serializationError = (Exceptions.AppendSerializationError) throwable;
+ final Map<Integer, String> rowErrors =
serializationError.getRowIndexToErrorMessage();
+ if (rowErrors != null && !rowErrors.isEmpty()) {
+ final String firstError =
rowErrors.values().iterator().next();
+ getLogger().error("Failure during appending data.
First error: %s".formatted(firstError), throwable);
+ }
+ } catch (Throwable logError) {
+ getLogger().error("Failure during appending data",
throwable);
+ }
+ } else {
+ getLogger().error("Failure during appending data", throwable);
+ }
- getLogger().error("Failure during appending data", throwable);
inflightRequestCount.arriveAndDeregister();
}
}
@@ -482,39 +499,109 @@ public class PutBigQuery extends
AbstractBigQueryProcessor {
}
}
- private static Map<String, Object> convertMapRecord(Map<String, Object>
map) {
+ private static Map<String, Object> convertMapRecord(Map<String, Object>
map, List<TableFieldSchema> tableFields) {
Map<String, Object> result = new HashMap<>();
- for (String key : map.keySet()) {
- Object obj = map.get(key);
- // BigQuery is not case sensitive on the column names but the
protobuf message
- // expect all column names to be lower case
- key = key.toLowerCase();
- if (obj instanceof MapRecord) {
- result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
- } else if (obj instanceof Object[]
- && ((Object[]) obj).length > 0
- && ((Object[]) obj)[0] instanceof MapRecord) {
- List<Map<String, Object>> lmapr = new ArrayList<>();
- for (Object mapr : ((Object[]) obj)) {
- lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
+ for (String rawKey : map.keySet()) {
+ String key = rawKey.toLowerCase();
+ Object obj = map.get(rawKey);
+
+ TableFieldSchema fieldSchema = findFieldSchema(tableFields, key);
+ if (fieldSchema != null && fieldSchema.getType() ==
TableFieldSchema.Type.STRUCT) {
+ // Nested RECORD type
+ if (obj instanceof MapRecord) {
+ result.put(key, convertMapRecord(((MapRecord)
obj).toMap(), fieldSchema.getFieldsList()));
+ continue;
+ } else if (obj instanceof Object[] && ((Object[]) obj).length
> 0 && ((Object[]) obj)[0] instanceof MapRecord) {
+ List<Map<String, Object>> list = new ArrayList<>();
+ for (Object item : (Object[]) obj) {
+ list.add(convertMapRecord(((MapRecord) item).toMap(),
fieldSchema.getFieldsList()));
+ }
+ result.put(key, list);
+ continue;
+ }
+ } else if (obj instanceof Object[] && ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
+ // Repeated RECORDs without proper schema match; best effort
+ List<Map<String, Object>> list = new ArrayList<>();
+ for (Object item : (Object[]) obj) {
+ list.add(convertMapRecord(((MapRecord) item).toMap(),
fieldSchema != null ? fieldSchema.getFieldsList() : List.of()));
+ }
+ result.put(key, list);
+ continue;
+ }
+
+ // Scalar conversions guided by schema when available
+ if (fieldSchema != null) {
+ switch (fieldSchema.getType()) {
+ case TIMESTAMP:
+ if (obj instanceof Timestamp) {
+ result.put(key, ((Timestamp) obj).getTime() *
1000);
+ break;
+ }
+ // fall through to default if not Timestamp
+ case TIME:
+ if (obj instanceof Time) {
+ LocalTime time = ((Time) obj).toLocalTime();
+ org.threeten.bp.LocalTime localTime =
org.threeten.bp.LocalTime.of(
+ time.getHour(),
+ time.getMinute(),
+ time.getSecond());
+ result.put(key,
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
+ break;
+ }
+ // fall through
+ case DATE:
+ if (obj instanceof Date) {
+ result.put(key, (int) ((Date)
obj).toLocalDate().toEpochDay());
+ break;
+ }
+ // fall through
+ case DATETIME:
+ if (obj instanceof Timestamp) {
+ LocalDateTime ldt = ((Timestamp)
obj).toLocalDateTime();
+ org.threeten.bp.LocalDateTime civil =
org.threeten.bp.LocalDateTime.of(
+ ldt.getYear(),
+ ldt.getMonthValue(),
+ ldt.getDayOfMonth(),
+ ldt.getHour(),
+ ldt.getMinute(),
+ ldt.getSecond(),
+ ldt.getNano());
+ result.put(key,
CivilTimeEncoder.encodePacked64DatetimeMicros(civil));
+ break;
+ }
+ // fall through
+ default:
+ result.put(key, obj);
+ break;
}
- result.put(key, lmapr);
- } else if (obj instanceof Timestamp) {
- result.put(key, ((Timestamp) obj).getTime() * 1000);
- } else if (obj instanceof Time) {
- LocalTime time = ((Time) obj).toLocalTime();
- org.threeten.bp.LocalTime localTime =
org.threeten.bp.LocalTime.of(
- time.getHour(),
- time.getMinute(),
- time.getSecond());
- result.put(key,
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
- } else if (obj instanceof Date) {
- result.put(key, (int) ((Date) obj).toLocalDate().toEpochDay());
} else {
- result.put(key, obj);
+ // No schema available for field; apply basic conversions for
JDBC temporal types
+ if (obj instanceof Timestamp) {
+ result.put(key, ((Timestamp) obj).getTime() * 1000);
+ } else if (obj instanceof Time) {
+ LocalTime time = ((Time) obj).toLocalTime();
+ org.threeten.bp.LocalTime localTime =
org.threeten.bp.LocalTime.of(
+ time.getHour(),
+ time.getMinute(),
+ time.getSecond());
+ result.put(key,
CivilTimeEncoder.encodePacked64TimeMicros(localTime));
+ } else if (obj instanceof Date) {
+ result.put(key, (int) ((Date)
obj).toLocalDate().toEpochDay());
+ } else {
+ result.put(key, obj);
+ }
}
}
return result;
}
+
+ private static TableFieldSchema findFieldSchema(List<TableFieldSchema>
fields, String keyLower) {
+ for (TableFieldSchema f : fields) {
+ if (f.getName().equalsIgnoreCase(keyLower)) {
+ return f;
+ }
+ }
+ return null;
+ }
}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
index 25cbfb6e40..ba15106d21 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
@@ -218,7 +218,8 @@ public class PutBigQueryIT {
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT,
recordSchema);
runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT,
"MM/dd/yyyy");
runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
- runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT,
"MM-dd-yyyy HH:mm:ss Z");
+ // Use zone name format to parse values like "UTC"
+ runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT,
"MM-dd-yyyy HH:mm:ss z");
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
@@ -468,7 +469,8 @@ public class PutBigQueryIT {
Field date = Field.newBuilder("date",
LegacySQLTypeName.DATE).setMode(Field.Mode.NULLABLE).build();
Field time = Field.newBuilder("time",
LegacySQLTypeName.TIME).setMode(Field.Mode.NULLABLE).build();
Field full = Field.newBuilder("full",
LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build();
- Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD,
date, time, full).setMode(Field.Mode.NULLABLE).build();
+ Field datetime = Field.newBuilder("datetime",
StandardSQLTypeName.DATETIME).setMode(Field.Mode.NULLABLE).build();
+ Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD,
date, time, full, datetime).setMode(Field.Mode.NULLABLE).build();
Field numeric = Field.newBuilder("numeric",
StandardSQLTypeName.NUMERIC).setMode(Field.Mode.NULLABLE).build();
Field floatc = Field.newBuilder("floatc",
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
index 8ea7012508..3adb27d727 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
@@ -84,6 +84,13 @@
"type": "long",
"logicalType": "timestamp-millis"
} ]
+ },
+ {
+ "name": "datetime",
+ "type": ["null", {
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ } ]
}
]
}
@@ -101,4 +108,4 @@
"type": ["null", "string"]
}
]
-}
\ No newline at end of file
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
index b6ff43e4e9..924b0c828a 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
@@ -20,7 +20,8 @@
"birth": {
"date": "07/18/2021",
"time": "12:35:24",
- "full": "07-18-2021 12:35:24 UTC"
+ "full": "07-18-2021 12:35:24 UTC",
+ "datetime": "07-18-2021 12:35:24 UTC"
},
"numeric": 0,
"floatc": 0.1,
@@ -43,7 +44,8 @@
"birth": {
"date": "01/01/1992",
"time": "00:00:00",
- "full": "01-01-1992 00:00:00 UTC"
+ "full": "01-01-1992 00:00:00 UTC",
+ "datetime": "01-01-1992 00:00:00 UTC"
}
}
]
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
index ffe4050094..8f11938523 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date.json
@@ -20,7 +20,8 @@
"birth": {
"date": 1626611724000,
"time": 1626611724000,
- "full": 1626611724000
+ "full": 1626611724000,
+ "datetime": 1626611724000
}
},
{
@@ -40,7 +41,8 @@
"birth": {
"date": 694224000000,
"time": 694224000000,
- "full": 694224000000
+ "full": 694224000000,
+ "datetime": 694224000000
}
}
-]
\ No newline at end of file
+]