This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7485687d4b NIFI-11657 Removed Deprecated PutBigQueryBatch and 
PutBigQueryStreaming
7485687d4b is described below

commit 7485687d4bd71bf8637cffce7e86c4bc90857cf7
Author: exceptionfactory <exceptionfact...@apache.org>
AuthorDate: Tue Jun 6 14:44:27 2023 -0500

    NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #7351.
---
 .../gcp/bigquery/BigQueryAttributes.java           |  98 ------
 .../processors/gcp/bigquery/BigQueryUtils.java     |  83 -----
 .../nifi/processors/gcp/bigquery/PutBigQuery.java  |   2 -
 .../processors/gcp/bigquery/PutBigQueryBatch.java  | 364 ---------------------
 .../gcp/bigquery/PutBigQueryStreaming.java         | 217 ------------
 .../services/org.apache.nifi.processor.Processor   |   2 -
 .../gcp/bigquery/PutBigQueryBatchTest.java         | 158 ---------
 7 files changed, 924 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index babfb54ee6..f26f6cefe5 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
-
-import com.google.cloud.bigquery.JobInfo;
-
 /**
  * Attributes associated with the BigQuery processors
  */
@@ -30,79 +24,19 @@ public class BigQueryAttributes {
     private BigQueryAttributes() {
     }
 
-    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
-
     // Properties
-    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
-    public static final String SOURCE_TYPE_DESC = "Data type of the file to be 
loaded. Possible values: AVRO, "
-            + "NEWLINE_DELIMITED_JSON, CSV.";
-
     public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
     public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery 
should allow extra values that are not represented "
             + "in the table schema. If true, the extra values are ignored. If 
false, records with extra columns are treated as "
             + "bad records, and if there are too many bad records, an invalid 
error is returned in the job result. By default "
             + "unknown values are not allowed.";
 
-    public static final String WRITE_DISPOSITION_ATTR = 
"bq.load.write_disposition";
-    public static final String WRITE_DISPOSITION_DESC = "Sets the action that 
should occur if the destination table already exists.";
-
-    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
-    public static final String MAX_BADRECORDS_DESC = "Sets the maximum number 
of bad records that BigQuery can ignore when running "
-            + "the job. If the number of bad records exceeds this value, an 
invalid error is returned in the job result. By default "
-            + "no bad record is ignored.";
-
     public static final String DATASET_ATTR = "bq.dataset";
     public static final String DATASET_DESC = "BigQuery dataset name (Note - 
The dataset must exist in GCP)";
 
     public static final String TABLE_NAME_ATTR = "bq.table.name";
     public static final String TABLE_NAME_DESC = "BigQuery table name";
 
-    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
-    public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON 
format";
-
-    public static final String CREATE_DISPOSITION_ATTR = 
"bq.load.create_disposition";
-    public static final String CREATE_DISPOSITION_DESC = "Sets whether the job 
is allowed to create new tables";
-
-    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
-    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
-
-    public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = 
"bq.csv.allow.jagged.rows";
-    public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether 
BigQuery should accept rows that are missing "
-            + "trailing optional columns. If true, BigQuery treats missing 
trailing columns as null values. If false, "
-            + "records with missing trailing columns are treated as bad 
records, and if there are too many bad records, "
-            + "an invalid error is returned in the job result. By default, 
rows with missing trailing columns are "
-            + "considered bad records.";
-
-    public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = 
"bq.csv.allow.quoted.new.lines";
-    public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether 
BigQuery should allow quoted data sections "
-            + "that contain newline characters in a CSV file. By default 
quoted newline are not allowed.";
-
-    public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
-    public static final String CSV_CHARSET_DESC = "Sets the character encoding 
of the data.";
-
-    public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
-    public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator 
for fields in a CSV file. BigQuery converts "
-            + "the string to ISO-8859-1 encoding, and then uses the first byte 
of the encoded string to split the data in its "
-            + "raw, binary state. BigQuery also supports the escape sequence 
\"\t\" to specify a tab separator. The default "
-            + "value is a comma (',').";
-
-    public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
-    public static final String CSV_QUOTE_DESC = "Sets the value that is used 
to quote data sections in a CSV file. BigQuery "
-            + "converts the string to ISO-8859-1 encoding, and then uses the 
first byte of the encoded string to split the "
-            + "data in its raw, binary state. The default value is a 
double-quote ('\"'). If your data does not contain quoted "
-            + "sections, set the property value to an empty string. If your 
data contains quoted newline characters, you must "
-            + "also set the Allow Quoted New Lines property to true.";
-
-    public static final String CSV_SKIP_LEADING_ROWS_ATTR = 
"bq.csv.skip.leading.rows";
-    public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number 
of rows at the top of a CSV file that BigQuery "
-            + "will skip when reading the data. The default value is 0. This 
property is useful if you have header rows in the "
-            + "file that should be skipped.";
-
-    public static final String AVRO_USE_LOGICAL_TYPES_ATTR = 
"bq.avro.use.logical.types";
-    public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set 
to Avro and if this option is set to true, you "
-            + "can interpret logical types into their corresponding types 
(such as TIMESTAMP) instead of only using their raw "
-            + "types (such as INTEGER).";
-
     public static final String RECORD_READER_ATTR = "bq.record.reader";
     public static final String RECORD_READER_DESC = "Specifies the Controller 
Service to use for parsing incoming data.";
 
@@ -111,44 +45,12 @@ public class BigQueryAttributes {
             + "rows exist. If not set the entire insert request will fail if 
it contains an invalid row.";
 
     // Batch Attributes
-    public static final String JOB_CREATE_TIME_ATTR = 
"bq.job.stat.creation_time";
-    public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
-
-    public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
-    public static final String JOB_END_TIME_DESC = "Time load job ended";
-
-    public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
-    public static final String JOB_START_TIME_DESC = "Time load job started";
-
-    public static final String JOB_LINK_ATTR = "bq.job.link";
-    public static final String JOB_LINK_DESC = "API Link to load job";
-
-    public static final String JOB_ID_ATTR = "bq.job.id";
-    public static final String JOB_ID_DESC = "ID of the BigQuery job";
-
     public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
     public static final String JOB_NB_RECORDS_DESC = "Number of records 
successfully inserted";
 
     public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
-    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
 
     public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
-    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
 
     public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
-    public static final String JOB_ERROR_LOCATION_DESC = "Load job error 
location";
-
-    // Allowable values
-    public static final AllowableValue CREATE_IF_NEEDED = new 
AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
-            JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the 
job to create the table if it does not exist.");
-    public static final AllowableValue CREATE_NEVER = new 
AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
-            JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job 
to fail with a not-found error if the table does not exist.");
-
-    public static final AllowableValue WRITE_EMPTY = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
-            JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job 
to fail with a duplicate error if the table already exists.");
-    public static final AllowableValue WRITE_APPEND = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
-            JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job 
to append data to the table if it already exists.");
-    public static final AllowableValue WRITE_TRUNCATE = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
-            JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the 
job to overwrite the table data if table already exists.");
-
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
deleted file mode 100644
index 1b621e60db..0000000000
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.cloud.bigquery.Field;
-import com.google.cloud.bigquery.LegacySQLTypeName;
-import com.google.cloud.bigquery.Schema;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Util class for schema manipulation
- */
-public class BigQueryUtils {
-
-    private final static Type gsonSchemaType = new TypeToken<List<Map>>() { 
}.getType();
-
-    public static Field mapToField(Map fMap) {
-        String typeStr = fMap.get("type").toString();
-        String nameStr = fMap.get("name").toString();
-        String modeStr = fMap.get("mode").toString();
-        LegacySQLTypeName type = null;
-
-        if (typeStr.equals("BOOLEAN")) {
-            type = LegacySQLTypeName.BOOLEAN;
-        } else if (typeStr.equals("STRING")) {
-            type = LegacySQLTypeName.STRING;
-        } else if (typeStr.equals("BYTES")) {
-            type = LegacySQLTypeName.BYTES;
-        } else if (typeStr.equals("INTEGER")) {
-            type = LegacySQLTypeName.INTEGER;
-        } else if (typeStr.equals("FLOAT")) {
-            type = LegacySQLTypeName.FLOAT;
-        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
-                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
-            type = LegacySQLTypeName.TIMESTAMP;
-        } else if (typeStr.equals("RECORD")) {
-            type = LegacySQLTypeName.RECORD;
-        }
-
-        return Field.newBuilder(nameStr, 
type).setMode(Field.Mode.valueOf(modeStr)).build();
-    }
-
-    public static List<Field> listToFields(List<Map> m_fields) {
-        List<Field> fields = new ArrayList(m_fields.size());
-        for (Map m : m_fields) {
-            fields.add(mapToField(m));
-        }
-
-        return fields;
-    }
-
-    public static Schema schemaFromString(String schemaStr) {
-        if (schemaStr == null) {
-            return null;
-        } else {
-            Gson gson = new Gson();
-            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
-            return Schema.of(BigQueryUtils.listToFields(fields));
-        }
-    }
-
-}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 54105d9592..c013dd81f0 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -52,7 +52,6 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicReference;
     "The processor is record based so the used schema is driven by the 
RecordReader. Attributes that are not matched to the target schema" +
     "are skipped. Exactly once delivery semantics are achieved via stream 
offsets. The Storage Write API is more efficient than the older " +
     "insertAll method because it uses gRPC streaming rather than REST over 
HTTP")
-@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @WritesAttributes({
     @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
deleted file mode 100644
index 6b229fe914..0000000000
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
-import org.apache.nifi.processors.gcp.storage.PutGCSObject;
-import org.apache.nifi.util.StringUtils;
-import org.threeten.bp.Duration;
-import org.threeten.bp.temporal.ChronoUnit;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A processor for batch loading data into a Google BigQuery table
- * @deprecated use {@link PutBigQuery} instead which uses the Write API
- */
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This 
processor is deprecated and may be removed in future releases.")
-@Tags({ "google", "google cloud", "bq", "bigquery" })
-@CapabilityDescription("Please be aware this processor is deprecated and may 
be removed in the near future. Use PutBigQuery instead. Batch loads flow files 
content to a Google BigQuery table.")
-@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
-@WritesAttributes({
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, 
description = BigQueryAttributes.JOB_END_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, 
description = BigQueryAttributes.JOB_START_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, 
description = BigQueryAttributes.JOB_LINK_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, 
description = BigQueryAttributes.JOB_ID_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
-        @WritesAttribute(attribute = 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = 
BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
-})
-@Deprecated
-public class PutBigQueryBatch extends AbstractBigQueryProcessor {
-
-    private static final List<String> TYPES = 
Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), 
FormatOptions.avro().getType());
-
-    private static final Validator FORMAT_VALIDATOR = new Validator() {
-        @Override
-        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
-            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
-            builder.subject(subject).input(input);
-            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
-                return builder.valid(true).explanation("Contains Expression 
Language").build();
-            }
-
-            if (TYPES.contains(input.toUpperCase())) {
-                builder.valid(true);
-            } else {
-                builder.valid(false).explanation("Load File Type must be one 
of the following options: " + StringUtils.join(TYPES, ", "));
-            }
-
-            return builder.build();
-        }
-    };
-
-    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
-            .displayName("Read Timeout")
-            .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
-            .required(true)
-            .defaultValue("5 minutes")
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor TABLE_SCHEMA = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
-            .displayName("Table Schema")
-            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor SOURCE_TYPE = new 
PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
-            .displayName("Load file type")
-            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
-            .required(true)
-            .addValidator(FORMAT_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor CREATE_DISPOSITION = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
-            .displayName("Create Disposition")
-            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
-            .required(true)
-            .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, 
BigQueryAttributes.CREATE_NEVER)
-            .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor WRITE_DISPOSITION = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
-            .displayName("Write Disposition")
-            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
-            .required(true)
-            .allowableValues(BigQueryAttributes.WRITE_EMPTY, 
BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
-            .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAXBAD_RECORDS = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
-            .displayName("Max Bad Records")
-            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
-            .required(true)
-            .defaultValue("0")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
-            .displayName("CSV Input - Allow Jagged Rows")
-            .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
-
-    public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
-            .displayName("CSV Input - Allow Quoted New Lines")
-            .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
-
-    public static final PropertyDescriptor CSV_CHARSET = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_CHARSET_ATTR)
-            .displayName("CSV Input - Character Set")
-            .description(BigQueryAttributes.CSV_CHARSET_DESC)
-            .required(true)
-            .allowableValues("UTF-8", "ISO-8859-1")
-            .defaultValue("UTF-8")
-            .build();
-
-    public static final PropertyDescriptor CSV_FIELD_DELIMITER = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
-            .displayName("CSV Input - Field Delimiter")
-            .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
-            .required(true)
-            .defaultValue(",")
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor CSV_QUOTE = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_QUOTE_ATTR)
-            .displayName("CSV Input - Quote")
-            .description(BigQueryAttributes.CSV_QUOTE_DESC)
-            .required(true)
-            .defaultValue("\"")
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
-            .displayName("CSV Input - Skip Leading Rows")
-            .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
-            .required(true)
-            .defaultValue("0")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR)
-            .displayName("Avro Input - Use Logical Types")
-            .description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC)
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
-        descriptors.add(TABLE_SCHEMA);
-        descriptors.add(READ_TIMEOUT);
-        descriptors.add(SOURCE_TYPE);
-        descriptors.add(CREATE_DISPOSITION);
-        descriptors.add(WRITE_DISPOSITION);
-        descriptors.add(MAXBAD_RECORDS);
-        descriptors.add(CSV_ALLOW_JAGGED_ROWS);
-        descriptors.add(CSV_ALLOW_QUOTED_NEW_LINES);
-        descriptors.add(CSV_CHARSET);
-        descriptors.add(CSV_FIELD_DELIMITER);
-        descriptors.add(CSV_QUOTE);
-        descriptors.add(CSV_SKIP_LEADING_ROWS);
-        descriptors.add(AVRO_USE_LOGICAL_TYPES);
-        return Collections.unmodifiableList(descriptors);
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final String type = 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
-        final TableId tableId = getTableId(context, flowFile.getAttributes());
-
-        try {
-
-            FormatOptions formatOption;
-
-            if (type.equals(FormatOptions.csv().getType())) {
-                formatOption = FormatOptions.csv().toBuilder()
-                        
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
-                        
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
-                        
.setEncoding(context.getProperty(CSV_CHARSET).getValue())
-                        
.setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
-                        
.setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
-                        
.setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
-                        .build();
-            } else {
-                formatOption = FormatOptions.of(type);
-            }
-
-            final Schema schema = 
BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
-            final WriteChannelConfiguration writeChannelConfiguration = 
WriteChannelConfiguration.newBuilder(tableId)
-                    
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
-                    
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
-                    
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
-                    
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
-                    
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
-                    .setSchema(schema)
-                    .setFormatOptions(formatOption)
-                    .build();
-
-            try (TableDataWriteChannel writer = 
getCloudService().writer(writeChannelConfiguration)) {
-
-                session.read(flowFile, rawIn -> {
-                    ReadableByteChannel readableByteChannel = 
Channels.newChannel(rawIn);
-                    ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(BUFFER_SIZE);
-                    while (readableByteChannel.read(byteBuffer) >= 0) {
-                        byteBuffer.flip();
-                        writer.write(byteBuffer);
-                        byteBuffer.clear();
-                    }
-                });
-
-                // writer must be closed to get the job
-                writer.close();
-
-                Job job = writer.getJob();
-                Long timePeriod = 
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
-                Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
-                job = job.waitFor(RetryOption.totalTimeout(waitFor));
-
-                if (job != null) {
-                    final Map<String, String> attributes = new HashMap<>();
-
-                    attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
Long.toString(job.getStatistics().getCreationTime()));
-                    attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, 
Long.toString(job.getStatistics().getEndTime()));
-                    attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, 
Long.toString(job.getStatistics().getStartTime()));
-                    attributes.put(BigQueryAttributes.JOB_LINK_ATTR, 
job.getSelfLink());
-                    attributes.put(BigQueryAttributes.JOB_ID_ATTR, 
job.getJobId().getJob());
-
-                    boolean jobError = (job.getStatus().getError() != null);
-
-                    if (jobError) {
-                        attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
job.getStatus().getError().getMessage());
-                        
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
job.getStatus().getError().getReason());
-                        
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, 
job.getStatus().getError().getLocation());
-                    } else {
-                        // in case it got looped back from error
-                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_MSG_ATTR);
-                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_REASON_ATTR);
-                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
-
-                        // add the number of records successfully added
-                        if (job.getStatistics() instanceof LoadStatistics) {
-                            final LoadStatistics stats = (LoadStatistics) 
job.getStatistics();
-                            
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
Long.toString(stats.getOutputRows()));
-                        }
-                    }
-
-                    if (!attributes.isEmpty()) {
-                        flowFile = session.putAllAttributes(flowFile, 
attributes);
-                    }
-
-                    if (jobError) {
-                        getLogger().log(LogLevel.WARN, 
job.getStatus().getError().getMessage());
-                        flowFile = session.penalize(flowFile);
-                        session.transfer(flowFile, REL_FAILURE);
-                    } else {
-                        session.getProvenanceReporter().send(flowFile, 
job.getSelfLink(), job.getStatistics().getEndTime() - 
job.getStatistics().getStartTime());
-                        session.transfer(flowFile, REL_SUCCESS);
-                    }
-                }
-            }
-
-        } catch (Exception ex) {
-            getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
-
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
deleted file mode 100644
index 7e00cc2359..0000000000
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.bigquery.BigQueryError;
-import com.google.cloud.bigquery.InsertAllRequest;
-import com.google.cloud.bigquery.InsertAllResponse;
-import com.google.cloud.bigquery.TableId;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-
-import java.io.InputStream;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A processor for streaming loading data into a Google BigQuery table. It 
uses the BigQuery
- * streaming insert API to insert data. This provides the lowest-latency 
insert path into BigQuery,
- * and therefore is the default method when the input is unbounded. BigQuery 
will make a strong
- * effort to ensure no duplicates when using this path, however there are some 
scenarios in which
- * BigQuery is unable to make this guarantee (see
- * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query 
can be run over the
- * output table to periodically clean these rare duplicates. Alternatively, 
using the Batch insert
- * method does guarantee no duplicates, though the latency for the insert into 
BigQuery will be much
- * higher.
- *
- * @deprecated use {@link PutBigQuery} instead which uses the Write API
- */
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This 
processor is deprecated and may be removed in future releases.")
-@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
-@CapabilityDescription("Please be aware this processor is deprecated and may 
be removed in the near future. Use PutBigQuery instead. "
-        + "Load data into Google BigQuery table using the streaming API. This 
processor "
-        + "is not intended to load large flow files as it will load the full 
content into memory. If "
-        + "you need to insert large flow files, consider using 
PutBigQueryBatch instead.")
-@SeeAlso({ PutBigQueryBatch.class })
-@SystemResourceConsideration(resource = SystemResource.MEMORY)
-@WritesAttributes({
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
-})
-@Deprecated
-public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
-
-    private static final DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
-    private static final DateTimeFormatter timeFormatter = 
DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
-
-    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.RECORD_READER_ATTR)
-            .displayName("Record Reader")
-            .description(BigQueryAttributes.RECORD_READER_DESC)
-            .identifiesControllerService(RecordReaderFactory.class)
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor SKIP_INVALID_ROWS = new 
PropertyDescriptor.Builder()
-            .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
-            .displayName("Skip Invalid Rows")
-            .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
-            .required(true)
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .defaultValue("false")
-            .build();
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
-        descriptors.add(RECORD_READER);
-        descriptors.add(SKIP_INVALID_ROWS);
-        return Collections.unmodifiableList(descriptors);
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        super.onScheduled(context);
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-        final TableId tableId = getTableId(context, flowFile.getAttributes());
-
-        try {
-
-            InsertAllRequest.Builder request = 
InsertAllRequest.newBuilder(tableId);
-            int nbrecord = 0;
-
-            try (final InputStream in = session.read(flowFile)) {
-                final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-                try (final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
-                    Record currentRecord;
-                    while ((currentRecord = reader.nextRecord()) != null) {
-                        
request.addRow(convertMapRecord(currentRecord.toMap()));
-                        nbrecord++;
-                    }
-                }
-            }
-
-            
request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
-            
request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
-
-            InsertAllResponse response = 
getCloudService().insertAll(request.build());
-
-            final Map<String, String> attributes = new HashMap<>();
-
-            if (response.hasErrors()) {
-                getLogger().log(LogLevel.WARN, "Failed to insert {} of {} 
records into BigQuery {} table.", new Object[] { 
response.getInsertErrors().size(), nbrecord, tableName });
-                if (getLogger().isDebugEnabled()) {
-                    for (long index : response.getInsertErrors().keySet()) {
-                        for (BigQueryError e : 
response.getInsertErrors().get(index)) {
-                            getLogger().log(LogLevel.DEBUG, "Failed to insert 
record #{}: {}", new Object[] { index, e.getMessage() });
-                        }
-                    }
-                }
-
-                attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
Long.toString(nbrecord - response.getInsertErrors().size()));
-
-                flowFile = session.penalize(flowFile);
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_FAILURE);
-            } else {
-                attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
Long.toString(nbrecord));
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
-            }
-
-        } catch (Exception ex) {
-            getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        }
-    }
-
-    private Map<String, Object> convertMapRecord(Map<String, Object> map) {
-        Map<String, Object> result = new HashMap<String, Object>();
-        for (String key : map.keySet()) {
-            Object obj = map.get(key);
-            if (obj instanceof MapRecord) {
-                result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
-            } else if (obj instanceof Object[]
-                && ((Object[]) obj).length > 0
-                && ((Object[]) obj)[0] instanceof MapRecord) {
-                List<Map<String, Object>> lmapr = new ArrayList<Map<String, 
Object>>();
-                for (Object mapr : ((Object[]) obj)) {
-                    lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
-                }
-                result.put(key, lmapr);
-            } else if (obj instanceof Timestamp) {
-                // ZoneOffset.UTC time zone is necessary due to implicit time 
zone conversion in Record Readers from
-                // the local system time zone to the GMT time zone
-                LocalDateTime dateTime = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), 
ZoneOffset.UTC);
-                result.put(key, dateTime.format(timestampFormatter));
-            } else if (obj instanceof Time) {
-                // ZoneOffset.UTC time zone is necessary due to implicit time 
zone conversion in Record Readers from
-                // the local system time zone to the GMT time zone
-                LocalDateTime dateTime = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), 
ZoneOffset.UTC);
-                result.put(key, dateTime.format(timeFormatter));
-            } else if (obj instanceof Date) {
-                result.put(key, obj.toString());
-            } else {
-                result.put(key, obj);
-            }
-        }
-        return result;
-    }
-
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 1fe8dd6535..fbcb0d907a 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -21,8 +21,6 @@ org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
 org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
 org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
 org.apache.nifi.processors.gcp.bigquery.PutBigQuery
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
 org.apache.nifi.processors.gcp.drive.ListGoogleDrive
 org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
 org.apache.nifi.processors.gcp.drive.PutGoogleDrive
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
deleted file mode 100644
index 4ad8ae3eb6..0000000000
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryException;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobId;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics;
-import com.google.cloud.bigquery.JobStatus;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit tests for {@link PutBigQueryBatch}.
- */
-public class PutBigQueryBatchTest extends AbstractBQTest {
-    private static final String TABLE_NAME = "test_table";
-    private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", 
\"name\": \"data\", \"type\": \"STRING\" }]";
-    private static final String SOURCE_TYPE = FormatOptions.json().getType();
-    private static final String CREATE_DISPOSITION = 
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
-    private static final String WRITE_DISPOSITION = 
JobInfo.WriteDisposition.WRITE_EMPTY.name();
-    private static final String MAX_BAD_RECORDS = "0";
-    private static final String IGNORE_UNKNOWN = "true";
-    private static final String READ_TIMEOUT = "5 minutes";
-
-    @Mock
-    Job job;
-
-    @Mock
-    JobId jobId;
-
-    @Mock
-    JobStatus jobStatus;
-
-    @Mock
-    JobStatistics stats;
-
-    @Mock
-    TableDataWriteChannel tableDataWriteChannel;
-
-    @Override
-    public AbstractBigQueryProcessor getProcessor() {
-        return new PutBigQueryBatch() {
-            @Override
-            protected BigQuery getCloudService() {
-                return bq;
-            }
-
-            @Override
-            protected BigQuery getCloudService(final ProcessContext context) {
-                return bq;
-            }
-        };
-    }
-
-    @Override
-    protected void addRequiredPropertiesToRunner(TestRunner runner) {
-        runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
-        runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLE_NAME);
-        runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
-        runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
-        runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, 
CREATE_DISPOSITION);
-        runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, 
WRITE_DISPOSITION);
-        runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAX_BAD_RECORDS);
-        runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
-        runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
-    }
-
-    @Test
-    public void testSuccessfulLoad() throws Exception {
-        
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
-        when(tableDataWriteChannel.getJob()).thenReturn(job);
-        
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
-        when(job.getStatus()).thenReturn(jobStatus);
-        when(job.getStatistics()).thenReturn(stats);
-
-        when(stats.getCreationTime()).thenReturn(0L);
-        when(stats.getStartTime()).thenReturn(1L);
-        when(stats.getEndTime()).thenReturn(2L);
-        when(job.getJobId()).thenReturn(jobId);
-        when(jobId.getJob()).thenReturn("job-id");
-
-        final AbstractBigQueryProcessor processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
-        runner.enqueue("{ \"data\": \"datavalue\" }");
-
-        runner.run();
-
-        when(bq.testIamPermissions(any(), 
any())).thenReturn(Collections.singletonList("permission"));
-        final List<ConfigVerificationResult> verificationResults = 
((VerifiableProcessor) processor).verify(runner.getProcessContext(), 
runner.getLogger(), Collections.emptyMap());
-        assertEquals(2, verificationResults.size());
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
verificationResults.get(1).getOutcome());
-
-        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
-    }
-
-    @Test
-    public void testFailedLoad() throws Exception {
-        
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
-        when(tableDataWriteChannel.getJob()).thenReturn(job);
-        
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
-
-        final TestRunner runner = buildNewRunner(getProcessor());
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
-        runner.enqueue("{ \"data\": \"datavalue\" }");
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
-    }
-
-    @Test
-    public void testMandatoryProjectId() throws Exception {
-        final TestRunner runner = buildNewRunner(getProcessor());
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
-        runner.removeProperty(PutBigQueryBatch.PROJECT_ID);
-        runner.assertNotValid();
-    }
-}
\ No newline at end of file

Reply via email to