This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new f8d3fcd66c NIFI-10707 Added proxy support in PutBigQuery f8d3fcd66c is described below commit f8d3fcd66cd53668d339ccbcb6d2175defb63393 Author: Peter Turcsanyi <turcsa...@apache.org> AuthorDate: Mon Mar 11 16:45:17 2024 +0100 NIFI-10707 Added proxy support in PutBigQuery Bumped GCP client library version Added grpc-* jars in service api nar in order to avoid CNFE warning in io.grpc.LoadBalancerRegistry Dependency clean-up in GCP modules Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8491. --- .../nifi-gcp-bundle/nifi-gcp-nar/pom.xml | 196 +++++++++++++++++++++ .../nifi-gcp-parameter-providers/pom.xml | 24 --- .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 32 ---- .../nifi/processors/gcp/bigquery/PutBigQuery.java | 41 ++++- .../processors/gcp/bigquery/PutBigQueryTest.java | 9 +- .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml | 29 ++- nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 2 +- 7 files changed, 249 insertions(+), 84 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml index b4a71b996e..d4a17e5b2c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml @@ -36,11 +36,207 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-gcp-processors</artifactId> <version>2.0.0-SNAPSHOT</version> + <exclusions> + <exclusion> + <groupId>org.codehaus.mojo</groupId> + <artifactId>animal-sniffer-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.android</groupId> + <artifactId>annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.checkerframework</groupId> + <artifactId>checker-qual</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>failureaccess</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-credentials</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-oauth2-http</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client-gson</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-api</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-context</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-util</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.j2objc</groupId> + <artifactId>j2objc-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>listenablefuture</artifactId> + </exclusion> + <exclusion> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-api</artifactId> + </exclusion> + <exclusion> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-contrib-http-util</artifactId> + </exclusion> + <exclusion> + <groupId>io.perfmark</groupId> + <artifactId>perfmark-api</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-gcp-parameter-providers</artifactId> <version>2.0.0-SNAPSHOT</version> + <exclusions> + <exclusion> + <groupId>org.codehaus.mojo</groupId> + <artifactId>animal-sniffer-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.android</groupId> + <artifactId>annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.checkerframework</groupId> + <artifactId>checker-qual</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>failureaccess</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-credentials</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-oauth2-http</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client-gson</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-api</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-context</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </exclusion> + <exclusion> + <groupId>io.grpc</groupId> + <artifactId>grpc-util</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.j2objc</groupId> + <artifactId>j2objc-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>listenablefuture</artifactId> + </exclusion> + <exclusion> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-api</artifactId> + </exclusion> + <exclusion> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-contrib-http-util</artifactId> + </exclusion> + <exclusion> + <groupId>io.perfmark</groupId> + <artifactId>perfmark-api</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> </project> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml index e53128d48d..d576e15cb6 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml @@ -42,33 +42,9 @@ <version>2.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-secretmanager</artifactId> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.google.auth</groupId> - <artifactId>google-auth-library-oauth2-http</artifactId> - <exclusions> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.nifi</groupId> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index d95d101cd1..24e2f86ec9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -94,12 +94,6 @@ <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-core</artifactId> - <exclusions> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>com.google.cloud</groupId> @@ -109,10 +103,6 @@ <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery</artifactId> <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> <exclusion> <groupId>org.json</groupId> <artifactId>json</artifactId> @@ -123,10 +113,6 @@ <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquerystorage</artifactId> <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> <exclusion> <groupId>org.json</groupId> <artifactId>json</artifactId> @@ -136,22 +122,10 @@ <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsublite</artifactId> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>com.google.apis</groupId> @@ -195,12 +169,6 @@ <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-vision</artifactId> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java index a7c27e8a7e..8536f232c7 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java @@ -21,6 +21,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor; @@ -42,6 +44,7 @@ import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.Status; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -59,7 +62,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils; +import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.MapRecord; @@ -67,6 +72,8 @@ import org.apache.nifi.serialization.record.Record; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -182,7 +189,8 @@ public class PutBigQuery extends AbstractBigQueryProcessor { TRANSFER_TYPE, APPEND_RECORD_COUNT, RETRY_COUNT, - SKIP_INVALID_ROWS + SKIP_INVALID_ROWS, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) ).collect(collectingAndThen(toList(), Collections::unmodifiableList)); @Override @@ -198,7 +206,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { maxRetryCount = context.getProperty(RETRY_COUNT).asInteger(); recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger(); endpoint = context.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue(); - writeClient = createWriteClient(getGoogleCredentials(context)); + writeClient = createWriteClient(getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context)); } @OnUnscheduled @@ -225,7 +233,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { writeStream = createWriteStream(tableName); tableSchema = writeStream.getTableSchema(); protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema); - streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context)); + streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context)); } catch (Descriptors.DescriptorValidationException | IOException e) { getLogger().error("Failed to create Big Query Stream Writer for writing", e); context.yield(); @@ -395,12 +403,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return writeClient.createWriteStream(createWriteStreamRequest); } - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { BigQueryWriteClient client; try { BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder(); builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); builder.setEndpoint(endpoint); + builder.setTransportChannelProvider(createTransportChannelProvider(proxyConfiguration)); client = BigQueryWriteClient.create(builder.build()); } catch (Exception e) { @@ -410,13 +419,35 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return client; } - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException { ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor); StreamWriter.Builder builder = StreamWriter.newBuilder(streamName); builder.setWriterSchema(protoSchema); builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); builder.setEndpoint(endpoint); + builder.setChannelProvider(createTransportChannelProvider(proxyConfiguration)); + + return builder.build(); + } + + private TransportChannelProvider createTransportChannelProvider(ProxyConfiguration proxyConfiguration) { + InstantiatingGrpcChannelProvider.Builder builder = InstantiatingGrpcChannelProvider.newBuilder(); + + if (proxyConfiguration != null) { + if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) { + builder.setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector( + targetServerAddress -> HttpConnectProxiedSocketAddress.newBuilder() + .setTargetAddress((InetSocketAddress) targetServerAddress) + .setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort())) + .setUsername(proxyConfiguration.getProxyUserName()) + .setPassword(proxyConfiguration.getProxyUserPassword()) + .build() + )); + } else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) { + getLogger().warn("Proxy type SOCKS is not supported, the proxy configuration will be ignored"); + } + } return builder.build(); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java index 10b238220d..3ff7edf68c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java @@ -47,6 +47,7 @@ import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.util.TestRunner; @@ -162,12 +163,12 @@ public class PutBigQueryTest { } @Override - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return streamWriter; } @Override - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return writeClient; } }; @@ -410,12 +411,12 @@ public class PutBigQueryTest { } @Override - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException { throw new IOException(); } @Override - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return writeClient; } }; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 3714d85228..354a54896f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,29 +33,22 @@ <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-oauth2-http</artifactId> - <exclusions> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-api</artifactId> </dependency> <dependency> - <groupId>com.github.stephenc.findbugs</groupId> - <artifactId>findbugs-annotations</artifactId> - <version>1.3.9-1</version> + <groupId>io.grpc</groupId> + <artifactId>grpc-context</artifactId> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> <!-- TODO: Remove this when the next version of google-auth-library-oauth2-http is released and brings this in--> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-util</artifactId> + </dependency> </dependencies> </project> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml index 5ef800b8ff..64568d562d 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -27,7 +27,7 @@ <packaging>pom</packaging> <properties> - <google.libraries.version>26.25.0</google.libraries.version> + <google.libraries.version>26.34.0</google.libraries.version> </properties> <dependencyManagement>