This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 9fbe6aab74 NIFI-13630 Handle Map Avro Type in PutBigQuery 9fbe6aab74 is described below commit 9fbe6aab74f133f367c6cd386b926643d7a88828 Author: Juldrixx <juldr...@gmail.com> AuthorDate: Tue Aug 6 15:49:37 2024 +0200 NIFI-13630 Handle Map Avro Type in PutBigQuery This closes #9151 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../processors/gcp/bigquery/proto/ProtoUtils.java | 16 ++++- .../processors/gcp/bigquery/PutBigQueryTest.java | 82 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java index 823e0e11d0..8dfda739e0 100644 --- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java +++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java @@ -46,8 +46,20 @@ public class ProtoUtils { switch (field.getType()) { case MESSAGE: if (field.isRepeated()) { - Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value; - collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema))); + final Collection<Map<String, Object>> valueMaps; + if (value instanceof Object[] arrayValue) { + valueMaps = Arrays.stream(arrayValue) + .map(item -> (Map<String, Object>) item).toList(); + } else if (value instanceof Map<?, ?> mapValue) { + valueMaps = mapValue.entrySet().stream() + .map(entry -> Map.of( + "key", entry.getKey(), + "value", entry.getValue() + )).toList(); + } else { + valueMaps = (Collection<Map<String, Object>>) value; + } + valueMaps.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), act, tableSchema))); } else { builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value, tableSchema)); } diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java index 3ff7edf68c..6e6c513801 100644 --- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java +++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java @@ -44,6 +44,7 @@ import java.util.stream.Stream; import org.apache.nifi.csv.CSVReader; import org.apache.nifi.csv.CSVUtils; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; @@ -457,6 +458,26 @@ public class PutBigQueryTest { runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1); } + @Test + void testMapFieldSchema() throws Exception { + when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream); + + TableSchema myTableSchema = mockJsonTableSchema(); + + when(writeStream.getTableSchema()).thenReturn(myTableSchema); + + when(streamWriter.append(isA(ProtoRows.class), isA(Long.class))) + .thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build())); + + decorateWithJsonRecordReaderWithSchema(runner); + runner.setProperty(PutBigQuery.RECORD_READER, "jsonReader"); + + runner.enqueue(jsonContent()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS); + } + private void decorateWithRecordReader(TestRunner runner) throws InitializationException { CSVReader csvReader = new CSVReader(); runner.addControllerService("csvReader", csvReader); @@ -484,6 +505,30 @@ public class PutBigQueryTest { runner.enableControllerService(csvReader); } + private void decorateWithJsonRecordReaderWithSchema(TestRunner runner) throws InitializationException { + String recordReaderSchema = """ + { + "name": "recordFormatName", + "namespace": "nifi.examples", + "type": "record", + "fields": [ + { + "name": "field", + "type": { + "type": "map", + "values": "string" + } + } + ] + }"""; + + JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("jsonReader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordReaderSchema); + runner.enableControllerService(jsonReader); + } + private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, String name2, TableFieldSchema.Type type2) { TableSchema myTableSchema = mock(TableSchema.class); @@ -503,6 +548,30 @@ public class PutBigQueryTest { return myTableSchema; } + private TableSchema mockJsonTableSchema() { + TableSchema myTableSchema = mock(TableSchema.class); + + TableFieldSchema keyFieldSchema = mock(TableFieldSchema.class); + when(keyFieldSchema.getName()).thenReturn("key"); + when(keyFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING); + when(keyFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.REQUIRED); + + TableFieldSchema valueFieldSchema = mock(TableFieldSchema.class); + when(valueFieldSchema.getName()).thenReturn("value"); + when(valueFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING); + when(valueFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE); + + TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class); + when(tableFieldSchemaId.getName()).thenReturn("field"); + when(tableFieldSchemaId.getType()).thenReturn(TableFieldSchema.Type.STRUCT); + when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.REPEATED); + when(tableFieldSchemaId.getFieldsList()).thenReturn(List.of(keyFieldSchema, valueFieldSchema)); + + when(myTableSchema.getFieldsList()).thenReturn(List.of(tableFieldSchemaId)); + + return myTableSchema; + } + private String csvContentWithLines(int lineNum) { StringBuilder builder = new StringBuilder(); builder.append(CSV_HEADER); @@ -516,4 +585,17 @@ public class PutBigQueryTest { return builder.toString(); } + + private String jsonContent() { + return """ + { + "field": { + "FIELD_1": "field_1", + "FIELD_2": "field_2", + "FIELD_3": "field_3", + "FIELD_4": "field_4", + "FIELD_5": "field_5" + } + }"""; + } }