This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 6ee0bea306 NIFI-11823 - fix NUMERIC support in PutBigQuery
6ee0bea306 is described below

commit 6ee0bea30650e0203f46483396aa3c6a35a5e2ef
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Jul 17 16:55:24 2023 +0200

    NIFI-11823 - fix NUMERIC support in PutBigQuery
    
    This closes #7489.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |   6 +
 .../nifi/processors/gcp/bigquery/PutBigQuery.java  |  15 ++-
 .../processors/gcp/bigquery/proto/ProtoUtils.java  | 124 +++++++++++++------
 .../processors/gcp/bigquery/PutBigQueryIT.java     | 132 ++++++++++++++++++++-
 .../src/test/resources/bigquery/avrodecimal.avro   | Bin 0 -> 30 bytes
 .../src/test/resources/bigquery/avrodecimal.avsc   |  19 +++
 .../src/test/resources/bigquery/avrofloat.avro     | Bin 0 -> 126 bytes
 .../src/test/resources/bigquery/avrofloat.avsc     |  14 +++
 .../src/test/resources/bigquery/avroint.avro       | Bin 0 -> 2 bytes
 .../src/test/resources/bigquery/avroint.avsc       |  14 +++
 .../bigquery/schema-correct-data-with-date.avsc    |  12 ++
 ...streaming-correct-data-with-date-formatted.json |   7 +-
 12 files changed, 296 insertions(+), 47 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 08c4af371f..0781e29bbb 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -198,6 +198,12 @@
                         
<exclude>src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
                         
<exclude>src/test/resources/mock-gcp-service-account.json</exclude>
                         
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
+                        
<exclude>src/test/resources/bigquery/avrodecimal.avsc</exclude>
+                        
<exclude>src/test/resources/bigquery/avrodecimal.avro</exclude>
+                        
<exclude>src/test/resources/bigquery/avrofloat.avsc</exclude>
+                        
<exclude>src/test/resources/bigquery/avrofloat.avro</exclude>
+                        
<exclude>src/test/resources/bigquery/avroint.avsc</exclude>
+                        
<exclude>src/test/resources/bigquery/avroint.avro</exclude>
                         
<exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
                         
<exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
                         
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index d3a274b9b1..e7b51093ad 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -40,6 +40,7 @@ import 
com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
 import com.google.cloud.bigquery.storage.v1.StorageError;
 import com.google.cloud.bigquery.storage.v1.StreamWriter;
 import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
 import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
 import com.google.protobuf.Descriptors;
@@ -224,9 +225,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor 
{
 
         WriteStream writeStream;
         Descriptors.Descriptor protoDescriptor;
+        TableSchema tableSchema;
         try {
             writeStream = createWriteStream(tableName);
-            protoDescriptor = 
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
+            tableSchema = writeStream.getTableSchema();
+            protoDescriptor = 
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
             streamWriter = createStreamWriter(writeStream.getName(), 
protoDescriptor, getGoogleCredentials(context));
         } catch (Descriptors.DescriptorValidationException | IOException e) {
             getLogger().error("Failed to create Big Query Stream Writer for 
writing", e);
@@ -242,7 +245,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
         try {
             try (InputStream in = session.read(flowFile);
                     RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
-                recordNumWritten = writeRecordsToStream(reader, 
protoDescriptor, skipInvalidRows);
+                recordNumWritten = writeRecordsToStream(reader, 
protoDescriptor, skipInvalidRows, tableSchema);
             }
             flowFile = session.putAttribute(flowFile, 
BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
         } catch (Exception e) {
@@ -252,13 +255,13 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
         }
     }
 
-    private int writeRecordsToStream(RecordReader reader, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception {
+    private int writeRecordsToStream(RecordReader reader, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema 
tableSchema) throws Exception {
         Record currentRecord;
         int offset = 0;
         int recordNum = 0;
         ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
         while ((currentRecord = reader.nextRecord()) != null) {
-            DynamicMessage message = recordToProtoMessage(currentRecord, 
descriptor, skipInvalidRows);
+            DynamicMessage message = recordToProtoMessage(currentRecord, 
descriptor, skipInvalidRows, tableSchema);
 
             if (message == null) {
                 continue;
@@ -280,11 +283,11 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
         return recordNum;
     }
 
-    private DynamicMessage recordToProtoMessage(Record record, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows) {
+    private DynamicMessage recordToProtoMessage(Record record, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema 
tableSchema) {
         Map<String, Object> valueMap = convertMapRecord(record.toMap());
         DynamicMessage message = null;
         try {
-            message = ProtoUtils.createMessage(descriptor, valueMap);
+            message = ProtoUtils.createMessage(descriptor, valueMap, 
tableSchema);
         } catch (RuntimeException e) {
             getLogger().error("Cannot convert record to message", e);
             if (!skipInvalidRows) {
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
index e275933a12..823e0e11d0 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
@@ -17,50 +17,98 @@
 
 package org.apache.nifi.processors.gcp.bigquery.proto;
 
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
+
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
 /**
- * Util class for protocol buffer messaging
- */
+* Util class for protocol buffer messaging
+*/
 public class ProtoUtils {
 
-    public static DynamicMessage createMessage(Descriptors.Descriptor 
descriptor, Map<String, Object> valueMap) {
-        DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
-
-        for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
-            String name = field.getName();
-            Object value = valueMap.get(name);
-            if (value == null) {
-                continue;
-            }
-
-            if 
(Descriptors.FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
-                if (field.isRepeated()) {
-                    Collection collection = value.getClass().isArray() ? 
Arrays.asList((Object[]) value) : (Collection) value;
-                    collection.forEach(act -> builder.addRepeatedField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) act)));
-                } else {
-                    builder.setField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) value));
-                }
-            } else {
-                // Integer in the bigquery table schema maps back to INT64 
which is considered to be Long on Java side:
-                // https://developers.google.com/protocol-buffers/docs/proto3
-                if (value instanceof Integer && (field.getType() == 
Descriptors.FieldDescriptor.Type.INT64)) {
-                    value = Long.valueOf((Integer) value);
-                }
-
-                if (field.isRepeated()) {
-                    Collection collection = value.getClass().isArray() ? 
Arrays.asList((Object[]) value) : (Collection) value;
-                    collection.forEach(act -> builder.addRepeatedField(field, 
act));
-                } else {
-                    builder.setField(field, value);
-                }
-            }
-        }
-
-        return builder.build();
-    }
-}
+   public static DynamicMessage createMessage(Descriptors.Descriptor 
descriptor, Map<String, Object> valueMap, TableSchema tableSchema) {
+       final DynamicMessage.Builder builder = 
DynamicMessage.newBuilder(descriptor);
+
+       for (final Descriptors.FieldDescriptor field : descriptor.getFields()) {
+           final String name = field.getName();
+           Object value = valueMap.get(name);
+           if (value == null) {
+               continue;
+           }
+
+           switch (field.getType()) {
+           case MESSAGE:
+               if (field.isRepeated()) {
+                   Collection collection = value.getClass().isArray() ? 
Arrays.asList((Object[]) value) : (Collection) value;
+                   collection.forEach(act -> builder.addRepeatedField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
+               } else {
+                   builder.setField(field, 
createMessage(field.getMessageType(), (Map<String, Object>) value, 
tableSchema));
+               }
+               break;
+
+           // INT64 with alias INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT
+           case INT64:
+               // Integer in the bigquery table schema maps back to INT64 
which is considered to be Long on Java side:
+               // https://developers.google.com/protocol-buffers/docs/proto3
+               if (value instanceof Integer) {
+                   value = Long.valueOf((Integer) value);
+               }
+
+               setField(value, field, builder);
+               break;
+
+           // FLOAT64
+           case DOUBLE:
+               if (value instanceof Float) {
+                   value = Double.valueOf(value.toString());
+               }
+               setField(value, field, builder);
+               break;
+
+           // matches NUMERIC and BIGNUMERIC types in BigQuery
+           // BQTableSchemaToProtoDescriptor.class
+           case BYTES:
+               if (value instanceof Integer) {
+                   value = new BigDecimal((int) value);
+               } else if (value instanceof Long) {
+                   value = new BigDecimal((long) value);
+               } else if (value instanceof Float || value instanceof Double) {
+                   value = new BigDecimal(value.toString());
+               }
+
+               if (value instanceof BigDecimal) {
+                   if 
(tableSchema.getFields(field.getIndex()).getType().equals(Type.BIGNUMERIC)) {
+                       value = 
BigDecimalByteStringEncoder.encodeToBigNumericByteString((BigDecimal) value);
+                   } else if 
(tableSchema.getFields(field.getIndex()).getType().equals(Type.NUMERIC)) {
+                       value = 
BigDecimalByteStringEncoder.encodeToNumericByteString((BigDecimal) value);
+                   }
+               }
+
+               setField(value, field, builder);
+               break;
+
+           default:
+               setField(value, field, builder);
+               break;
+           }
+       }
+
+       return builder.build();
+   }
+
+   private static void setField(final Object value, final 
Descriptors.FieldDescriptor field, final DynamicMessage.Builder builder) {
+       if (field.isRepeated()) {
+           Collection collection = value.getClass().isArray() ? 
Arrays.asList((Object[]) value) : (Collection) value;
+           collection.forEach(act -> builder.addRepeatedField(field, act));
+       } else {
+           builder.setField(field, value);
+       }
+   }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
index c25cd80b6e..2f31f40c2a 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java
@@ -21,6 +21,7 @@ import com.google.cloud.bigquery.Field;
 import com.google.cloud.bigquery.FieldValueList;
 import com.google.cloud.bigquery.LegacySQLTypeName;
 import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
 import com.google.cloud.bigquery.StandardTableDefinition;
 import com.google.cloud.bigquery.TableDefinition;
 import com.google.cloud.bigquery.TableId;
@@ -29,6 +30,8 @@ import com.google.cloud.bigquery.TableResult;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
+
+import org.apache.nifi.avro.AvroReader;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
@@ -227,6 +230,129 @@ public class PutBigQueryIT extends AbstractBigQueryIT {
         
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR,
 Integer.toString(recordCount));
     }
 
+    @Test
+    public void testAvroDecimalType() throws InitializationException, 
IOException {
+        String tableName = UUID.randomUUID().toString();
+        TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), 
tableName);
+        Field avrodecimal = Field.newBuilder("avrodecimal", 
StandardSQLTypeName.BIGNUMERIC).setMode(Field.Mode.NULLABLE).build();
+
+        // Table schema definition
+        schema = Schema.of(avrodecimal);
+        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
+        TableInfo tableInfo = TableInfo.newBuilder(tableId, 
tableDefinition).build();
+
+        // create table
+        bigquery.create(tableInfo);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+        runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
+
+        AvroReader reader = new AvroReader();
+        runner.addControllerService("reader", reader);
+
+        final String recordSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrodecimal.avsc")));
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, 
recordSchema);
+
+        runner.enableControllerService(reader);
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        
runner.enqueue(Paths.get("src/test/resources/bigquery/avrodecimal.avro"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
+
+        TableResult result = 
bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+        FieldValueList firstElt = iterator.next();
+        assertEquals(firstElt.get(0).getNumericValue().intValue(), 0);
+
+        deleteTable(tableName);
+    }
+
+    @Test
+    public void testAvroFloatType() throws InitializationException, 
IOException {
+        String tableName = UUID.randomUUID().toString();
+        TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), 
tableName);
+        Field avrofloat = Field.newBuilder("avrofloat", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
+
+        // Table schema definition
+        schema = Schema.of(avrofloat);
+        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
+        TableInfo tableInfo = TableInfo.newBuilder(tableId, 
tableDefinition).build();
+
+        // create table
+        bigquery.create(tableInfo);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+        runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
+
+        AvroReader reader = new AvroReader();
+        runner.addControllerService("reader", reader);
+
+        final String recordSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrofloat.avsc")));
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, 
recordSchema);
+
+        runner.enableControllerService(reader);
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        
runner.enqueue(Paths.get("src/test/resources/bigquery/avrofloat.avro"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
+
+        TableResult result = 
bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+        FieldValueList firstElt = iterator.next();
+        assertEquals(firstElt.get(0).getDoubleValue(), 1.0);
+
+        deleteTable(tableName);
+    }
+
+    @Test
+    public void testAvroIntType() throws InitializationException, IOException {
+        String tableName = UUID.randomUUID().toString();
+        TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), 
tableName);
+        Field avrofloat = Field.newBuilder("avroint", 
StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build();
+
+        // Table schema definition
+        schema = Schema.of(avrofloat);
+        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
+        TableInfo tableInfo = TableInfo.newBuilder(tableId, 
tableDefinition).build();
+
+        // create table
+        bigquery.create(tableInfo);
+
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+        runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
+
+        AvroReader reader = new AvroReader();
+        runner.addControllerService("reader", reader);
+
+        final String recordSchema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avroint.avsc")));
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, 
recordSchema);
+
+        runner.enableControllerService(reader);
+        runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+        runner.enqueue(Paths.get("src/test/resources/bigquery/avroint.avro"));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
+
+        TableResult result = 
bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+        Iterator<FieldValueList> iterator = result.getValues().iterator();
+        FieldValueList firstElt = iterator.next();
+        assertEquals(firstElt.get(0).getDoubleValue(), 1.0);
+
+        deleteTable(tableName);
+    }
+
     private String prepareTable(AllowableValue transferType) {
         String tableName = UUID.randomUUID().toString();
 
@@ -284,8 +410,12 @@ public class PutBigQueryIT extends AbstractBigQueryIT {
         Field full = Field.newBuilder("full", 
LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build();
         Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, 
date, time, full).setMode(Field.Mode.NULLABLE).build();
 
+        Field numeric = Field.newBuilder("numeric", 
StandardSQLTypeName.NUMERIC).setMode(Field.Mode.NULLABLE).build();
+        Field floatc = Field.newBuilder("floatc", 
StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
+        Field json = Field.newBuilder("json", 
StandardSQLTypeName.JSON).setMode(Field.Mode.NULLABLE).build();
+
         // Table schema definition
-        schema = Schema.of(id, name, alias, addresses, job, birth);
+        schema = Schema.of(id, name, alias, addresses, job, birth, numeric, 
floatc, json);
         TableDefinition tableDefinition = StandardTableDefinition.of(schema);
         TableInfo tableInfo = TableInfo.newBuilder(tableId, 
tableDefinition).build();
 
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro
new file mode 100644
index 0000000000..f4cb65349b
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc
new file mode 100644
index 0000000000..f8d9356462
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc
@@ -0,0 +1,19 @@
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "avrodecimal",
+      "type": [
+        {
+          "type": "bytes",
+          "logicalType": "decimal",
+          "precision": 10,
+          "scale": 0
+        },
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro
new file mode 100644
index 0000000000..64cca75166
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc
new file mode 100644
index 0000000000..a946de0213
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc
@@ -0,0 +1,14 @@
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "avrofloat",
+      "type": [
+        "float",
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro
new file mode 100644
index 0000000000..8835708590
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc
new file mode 100644
index 0000000000..bc9addeb10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc
@@ -0,0 +1,14 @@
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "avroint",
+      "type": [
+        "int",
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
index f27f5edfba..8ea7012508 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc
@@ -87,6 +87,18 @@
                     }
                 ]
             }
+        },
+        {
+            "name": "numeric",
+            "type": ["null", "long"]
+        },
+        {
+            "name": "floatc",
+            "type": ["null", "double"]
+        },
+        {
+            "name": "json",
+            "type": ["null", "string"]
         }
     ]
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
index 3fa425a8e1..b6ff43e4e9 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json
@@ -21,7 +21,10 @@
       "date": "07/18/2021",
       "time": "12:35:24",
       "full": "07-18-2021 12:35:24 UTC"
-    }
+    },
+    "numeric": 0,
+    "floatc": 0.1,
+    "json": "{\"key\":\"value\"}"
   },
   {
     "id": 2,
@@ -43,4 +46,4 @@
       "full": "01-01-1992 00:00:00 UTC"
     }
   }
-]
\ No newline at end of file
+]

Reply via email to