This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fbe6aab74 NIFI-13630 Handle Map Avro Type in PutBigQuery
9fbe6aab74 is described below

commit 9fbe6aab74f133f367c6cd386b926643d7a88828
Author: Juldrixx <juldr...@gmail.com>
AuthorDate: Tue Aug 6 15:49:37 2024 +0200

    NIFI-13630 Handle Map Avro Type in PutBigQuery
    
    This closes #9151
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../processors/gcp/bigquery/proto/ProtoUtils.java  | 16 ++++-
 .../processors/gcp/bigquery/PutBigQueryTest.java   | 82 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
index 823e0e11d0..8dfda739e0 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
@@ -46,8 +46,20 @@ public class ProtoUtils {
            switch (field.getType()) {
            case MESSAGE:
                if (field.isRepeated()) {
-                   Collection collection = value.getClass().isArray() ? 
Arrays.asList((Object[]) value) : (Collection) value;
-                   collection.forEach(act -> builder.addRepeatedField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
+                   final Collection<Map<String, Object>> valueMaps;
+                   if (value instanceof Object[] arrayValue) {
+                       valueMaps = Arrays.stream(arrayValue)
+                               .map(item -> (Map<String, Object>) 
item).toList();
+                   } else if (value instanceof Map<?, ?> mapValue) {
+                       valueMaps = mapValue.entrySet().stream()
+                               .map(entry -> Map.of(
+                                       "key", entry.getKey(),
+                                       "value", entry.getValue()
+                               )).toList();
+                   } else {
+                       valueMaps = (Collection<Map<String, Object>>) value;
+                   }
+                   valueMaps.forEach(act -> builder.addRepeatedField(field, 
createMessage(field.getMessageType(), act, tableSchema)));
                } else {
                    builder.setField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) value, 
tableSchema));
                }
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
index 3ff7edf68c..6e6c513801 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
@@ -44,6 +44,7 @@ import java.util.stream.Stream;
 import org.apache.nifi.csv.CSVReader;
 import org.apache.nifi.csv.CSVUtils;
 import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
@@ -457,6 +458,26 @@ public class PutBigQueryTest {
         runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
     }
 
+    @Test
+    void testMapFieldSchema() throws Exception {
+        
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
+
+        TableSchema myTableSchema = mockJsonTableSchema();
+
+        when(writeStream.getTableSchema()).thenReturn(myTableSchema);
+
+        when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
+                
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
+
+        decorateWithJsonRecordReaderWithSchema(runner);
+        runner.setProperty(PutBigQuery.RECORD_READER, "jsonReader");
+
+        runner.enqueue(jsonContent());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
+    }
+
     private void decorateWithRecordReader(TestRunner runner) throws 
InitializationException {
         CSVReader csvReader = new CSVReader();
         runner.addControllerService("csvReader", csvReader);
@@ -484,6 +505,30 @@ public class PutBigQueryTest {
         runner.enableControllerService(csvReader);
     }
 
+    private void decorateWithJsonRecordReaderWithSchema(TestRunner runner) 
throws InitializationException {
+        String recordReaderSchema = """
+                {
+                  "name": "recordFormatName",
+                  "namespace": "nifi.examples",
+                  "type": "record",
+                  "fields": [
+                    {
+                      "name": "field",
+                      "type": {
+                        "type": "map",
+                        "values": "string"
+                      }
+                    }
+                  ]
+                }""";
+
+        JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("jsonReader", jsonReader);
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
recordReaderSchema);
+        runner.enableControllerService(jsonReader);
+    }
+
     private TableSchema mockTableSchema(String name1, TableFieldSchema.Type 
type1, String name2, TableFieldSchema.Type type2) {
         TableSchema myTableSchema = mock(TableSchema.class);
 
@@ -503,6 +548,30 @@ public class PutBigQueryTest {
         return myTableSchema;
     }
 
+    private TableSchema mockJsonTableSchema() {
+        TableSchema myTableSchema = mock(TableSchema.class);
+
+        TableFieldSchema keyFieldSchema = mock(TableFieldSchema.class);
+        when(keyFieldSchema.getName()).thenReturn("key");
+        
when(keyFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
+        
when(keyFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.REQUIRED);
+
+        TableFieldSchema valueFieldSchema = mock(TableFieldSchema.class);
+        when(valueFieldSchema.getName()).thenReturn("value");
+        
when(valueFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
+        
when(valueFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
+
+        TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class);
+        when(tableFieldSchemaId.getName()).thenReturn("field");
+        
when(tableFieldSchemaId.getType()).thenReturn(TableFieldSchema.Type.STRUCT);
+        
when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.REPEATED);
+        
when(tableFieldSchemaId.getFieldsList()).thenReturn(List.of(keyFieldSchema, 
valueFieldSchema));
+
+        
when(myTableSchema.getFieldsList()).thenReturn(List.of(tableFieldSchemaId));
+
+        return myTableSchema;
+    }
+
     private String csvContentWithLines(int lineNum) {
         StringBuilder builder = new StringBuilder();
         builder.append(CSV_HEADER);
@@ -516,4 +585,17 @@ public class PutBigQueryTest {
 
         return builder.toString();
     }
+
+    private String jsonContent() {
+        return """
+                {
+                  "field": {
+                    "FIELD_1": "field_1",
+                    "FIELD_2": "field_2",
+                    "FIELD_3": "field_3",
+                    "FIELD_4": "field_4",
+                    "FIELD_5": "field_5"
+                  }
+                }""";
+    }
 }

Reply via email to