This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 72b45f3114 NIFI-13202 Removed Accumulo Processors and Services This closes #8794. 72b45f3114 is described below commit 72b45f311488a40e518e97a039dd7ecc5378a3f3 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu May 9 15:45:16 2024 -0500 NIFI-13202 Removed Accumulo Processors and Services This closes #8794. Signed-off-by: Joseph Witt <joew...@apache.org> --- nifi-assembly/pom.xml | 30 - nifi-code-coverage/pom.xml | 10 - nifi-docs/src/main/asciidoc/developer-guide.adoc | 1 - .../nifi-accumulo-bundle/README.md | 23 - .../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml | 41 -- .../nifi-accumulo-processors/pom.xml | 66 --- .../accumulo/data/AccumuloRecordConfiguration.java | 159 ----- .../org/apache/nifi/accumulo/data/KeySchema.java | 136 ----- .../accumulo/processors/BaseAccumuloProcessor.java | 86 --- .../accumulo/processors/PutAccumuloRecord.java | 658 --------------------- .../nifi/accumulo/processors/ScanAccumulo.java | 390 ------------ .../services/org.apache.nifi.processor.Processor | 16 - .../nifi-accumulo-services-api-nar/pom.xml | 40 -- .../nifi-accumulo-services-api/pom.xml | 35 -- .../controllerservices/BaseAccumuloService.java | 33 -- .../nifi-accumulo-services-nar/pom.xml | 41 -- .../nifi-accumulo-services/pom.xml | 70 --- .../controllerservices/AccumuloService.java | 324 ---------- .../org.apache.nifi.controller.ControllerService | 15 - .../controllerservices/TestAccumuloService.java | 214 ------- .../nifi-accumulo-bundle/pom.xml | 85 --- nifi-extension-bundles/pom.xml | 1 - 22 files changed, 2474 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 6501ad0e66..e06bf20559 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -1094,36 +1094,6 @@ language governing permissions and limitations under the License. --> </dependency> </dependencies> </profile> - <profile> - <id>include-accumulo</id> - <!-- This profile handles the inclusion of nifi-accumulo artifacts. --> - <activation> - <activeByDefault>false</activeByDefault> - <property> - <name>allProfiles</name> - </property> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - </dependencies> - </profile> <profile> <id>targz</id> <activation> diff --git a/nifi-code-coverage/pom.xml b/nifi-code-coverage/pom.xml index c8e3f5a23e..50e5767338 100644 --- a/nifi-code-coverage/pom.xml +++ b/nifi-code-coverage/pom.xml @@ -701,16 +701,6 @@ </dependency> <!-- NAR Bundles --> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-processors</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-airtable-processors</artifactId> diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index 5f6cc76a27..7dd0ddb961 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -2698,7 +2698,6 @@ deprecationLogger.warn( [options="header,footer"] |================================================================================================================================================== | Package | Maven Profile | Description -| Apache Accumulo Bundle | include-accumulo | Adds support for https://accumulo.apache.org[Apache Accumulo]. | Apache Hadoop Bundle | include-hadoop | Adds support for Apache Hadoop with HDFS and Parquet components | Apache HBase Bundle | include-hbase | Adds support for Apache HBase | Apache IoTDB Bundle | include-iotdb | Adds support for Apache IoTDB diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/README.md b/nifi-extension-bundles/nifi-accumulo-bundle/README.md deleted file mode 100644 index e431586c24..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/README.md +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -# nifi-accumulo - -This is a basic NiFi->Accumulo integration. Running `mvn install` will create your NAR, which can be added -to Apache NiFi. This is intended to be created with Apache Accumulo 2.x. - -The resulting NAR will be named 'nifi-accumulo-nar' - - -Note that some of this code was modeled after the HBase work. diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml deleted file mode 100644 index 000091b414..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-accumulo-nar</artifactId> - <packaging>nar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-processors</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - </dependencies> -</project> \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml deleted file mode 100644 index b1e3d370da..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml +++ /dev/null @@ -1,66 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-accumulo-processors</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record-serialization-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-record-path</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - </dependencies> -</project> diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java deleted file mode 100644 index 164f9d5284..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.data; - -/** - * Encapsulates configuring the session with some required parameters. - * - * Justification: Generally not a fan of this fluent API to configure other objects, but there is a lot encapsulated here - * so it helps minimize what we pass between the current set of classes and the upcoming features. - */ -public class AccumuloRecordConfiguration { - private String tableName; - private String rowFieldName; - private String columnFamily; - private String columnFamilyField; - private String timestampField; - private String fieldDelimiter; - private boolean encodeFieldDelimiter; - private boolean qualifierInKey; - private boolean deleteKeys; - - - protected AccumuloRecordConfiguration(final String tableName, final String rowFieldName, final String columnFamily, - final String columnFamilyField, - final String timestampField, final String fieldDelimiter, - final boolean encodeFieldDelimiter, - final boolean qualifierInKey, final boolean deleteKeys) { - this.tableName = tableName; - this.rowFieldName = rowFieldName; - this.columnFamily = columnFamily; - this.columnFamilyField = columnFamilyField; - this.timestampField = timestampField; - this.fieldDelimiter = fieldDelimiter; - this.encodeFieldDelimiter = encodeFieldDelimiter; - this.qualifierInKey = qualifierInKey; - this.deleteKeys = deleteKeys; - } - - public String getTableName(){ - return tableName; - } - - public String getColumnFamily() { - return columnFamily; - } - - public String getColumnFamilyField() { - return columnFamilyField; - } - - public boolean getEncodeDelimiter(){ - return encodeFieldDelimiter; - } - - public String getTimestampField(){ - - return timestampField; - } - - public String getFieldDelimiter(){ - return fieldDelimiter; - } - - public boolean getQualifierInKey(){ - return qualifierInKey; - } - - public boolean isDeleteKeys(){ - return deleteKeys; - } - - - public String getRowField(){ - return rowFieldName; - } - - public static class Builder{ - - public static final Builder newBuilder(){ - return new Builder(); - } - - public Builder setRowField(final String rowFieldName){ - this.rowFieldName = rowFieldName; - return this; - } - - public Builder setTableName(final String tableName){ - this.tableName = tableName; - return this; - } - - public Builder setEncodeFieldDelimiter(final boolean encodeFieldDelimiter){ - this.encodeFieldDelimiter = encodeFieldDelimiter; - return this; - } - - - public Builder setColumnFamily(final String columnFamily){ - this.columnFamily = columnFamily; - return this; - } - - public Builder setColumnFamilyField(final String columnFamilyField){ - this.columnFamilyField = columnFamilyField; - return this; - } - - public Builder setTimestampField(final String timestampField){ - this.timestampField = timestampField; - return this; - } - - public Builder setQualifierInKey(final boolean qualifierInKey){ - this.qualifierInKey = qualifierInKey; - return this; - } - - public Builder setFieldDelimiter(final String fieldDelimiter){ - this.fieldDelimiter = fieldDelimiter; - return this; - } - - public Builder setDelete(final boolean deleteKeys){ - this.deleteKeys = deleteKeys; - return this; - } - - public AccumuloRecordConfiguration build(){ - return new AccumuloRecordConfiguration(tableName,rowFieldName,columnFamily,columnFamilyField,timestampField,fieldDelimiter,encodeFieldDelimiter,qualifierInKey,deleteKeys); - } - - - private String tableName; - private String rowFieldName; - private String columnFamily; - private String columnFamilyField; - private String fieldDelimiter; - private boolean qualifierInKey=false; - private boolean encodeFieldDelimiter=false; - private String timestampField; - private boolean deleteKeys=false; - } -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java deleted file mode 100644 index adb7da8d4f..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.data; - - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldRemovalPath; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - -public class KeySchema implements RecordSchema { - private static final List<RecordField> KEY_FIELDS = new ArrayList<>(); - - private static final List<DataType> DATA_TYPES = new ArrayList<>(); - - private static final List<String> FIELD_NAMES = new ArrayList<>(); - - static { - KEY_FIELDS.add(new RecordField("row", RecordFieldType.STRING.getDataType(),false)); - KEY_FIELDS.add(new RecordField("columnFamily",RecordFieldType.STRING.getDataType(),true)); - KEY_FIELDS.add(new RecordField("columnQualifier",RecordFieldType.STRING.getDataType(),true)); - KEY_FIELDS.add(new RecordField("columnVisibility",RecordFieldType.STRING.getDataType(),true)); - KEY_FIELDS.add(new RecordField("timestamp",RecordFieldType.LONG.getDataType(),true)); - DATA_TYPES.add(RecordFieldType.STRING.getDataType()); - DATA_TYPES.add(RecordFieldType.LONG.getDataType()); - FIELD_NAMES.addAll(KEY_FIELDS.stream().map( x-> x.getFieldName()).collect(Collectors.toList())); - } - @Override - public List<RecordField> getFields() { - return KEY_FIELDS; - } - - @Override - public int getFieldCount() { - return KEY_FIELDS.size(); - } - - @Override - public RecordField getField(int i) { - return KEY_FIELDS.get(i); - } - - @Override - public List<DataType> getDataTypes() { - return DATA_TYPES; - } - - @Override - public List<String> getFieldNames() { - return FIELD_NAMES; - } - - @Override - public Optional<DataType> getDataType(String s) { - if (s.equalsIgnoreCase("timestamp")){ - return Optional.of( RecordFieldType.LONG.getDataType() ); - } else{ - if (FIELD_NAMES.stream().filter(x -> s.equalsIgnoreCase(s)).count() > 0){ - return Optional.of(RecordFieldType.STRING.getDataType()); - } - } - return Optional.empty(); - } - - @Override - public Optional<String> getSchemaText() { - return Optional.empty(); - } - - @Override - public Optional<String> getSchemaFormat() { - return Optional.empty(); - } - - @Override - public Optional<RecordField> getField(final String s) { - return KEY_FIELDS.stream().filter(x -> x.getFieldName().equalsIgnoreCase(s)).findFirst(); - } - - @Override - public SchemaIdentifier getIdentifier() { - return SchemaIdentifier.builder().name("AccumuloKeySchema").version(1).branch("nifi-accumulo").build(); - } - - @Override - public Optional<String> getSchemaName() { - return Optional.of("AccumuloKeySchema"); - } - - @Override - public Optional<String> getSchemaNamespace() { - return Optional.of("nifi-accumulo"); - } - - @Override - public void removeField(String fieldName) { - throw new NotImplementedException("Field removal from Accumulo KeySchema is not implemented."); - } - - @Override - public void removePath(RecordFieldRemovalPath path) { - throw new NotImplementedException("Path removal from Accumulo KeySchema is not implemented."); - } - - @Override - public boolean renameField(final String currentName, final String newName) { - throw new NotImplementedException("Field renaming from Accumulo KeySchema is not implemented."); - } - - @Override - public boolean isRecursive() { - throw new NotImplementedException("Determining if an Accumulo KeySchema is recursive is not implemented."); - } -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java deleted file mode 100644 index 7c8ed9735f..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.processors; - -import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.util.StandardValidators; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Base Accumulo class that provides connector services, table name, and thread - * properties - */ -public abstract class BaseAccumuloProcessor extends AbstractProcessor { - - protected static final PropertyDescriptor ACCUMULO_CONNECTOR_SERVICE = new PropertyDescriptor.Builder() - .name("accumulo-connector-service") - .displayName("Accumulo Connector Service") - .description("Specifies the Controller Service to use for accessing Accumulo.") - .required(true) - .identifiesControllerService(BaseAccumuloService.class) - .build(); - - - protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() - .name("Table Name") - .description("The name of the Accumulo Table into which data will be placed") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder() - .name("Create Table") - .description("Creates a table if it does not exist. This property will only be used when EL is not present in 'Table Name'") - .required(true) - .defaultValue("False") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - - protected static final PropertyDescriptor THREADS = new PropertyDescriptor.Builder() - .name("Threads") - .description("Number of threads used for reading and writing") - .required(false) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10") - .build(); - - protected static final PropertyDescriptor ACCUMULO_TIMEOUT = new PropertyDescriptor.Builder() - .name("accumulo-timeout") - .displayName("Accumulo Timeout") - .description("Max amount of time to wait for an unresponsive server. Set to 0 sec for no timeout. Entered value less than 1 second may be converted to 0 sec.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 sec") - .build(); - - /** - * Implementations can decide to include all base properties or individually include them. List is immutable - * so that implementations must constructor their own lists knowingly - */ - - protected static final List<PropertyDescriptor> baseProperties = Collections.unmodifiableList(Arrays.asList(ACCUMULO_CONNECTOR_SERVICE, TABLE_NAME, CREATE_TABLE, THREADS, ACCUMULO_TIMEOUT)); -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java deleted file mode 100644 index 09b094153e..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java +++ /dev/null @@ -1,658 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.processors; - - -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; -import org.apache.nifi.annotation.behavior.DynamicProperties; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.record.path.FieldValue; -import org.apache.nifi.record.path.RecordPath; -import org.apache.nifi.record.path.RecordPathResult; -import org.apache.nifi.record.path.util.RecordPathCache; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.util.StringUtils; -import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService; -import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.HexFormat; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -@SupportsBatching -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@Tags({"hadoop", "accumulo", "put", "record"}) -@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " + - "configured 'Record Reader' and writes them to Apache Accumulo.") -@DynamicProperties({ - @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " + - "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - value = "visibility label for <COLUMN FAMILY>" - ), - @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " + - "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>." - ) -}) -/** - * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class - * simply extends BaseAccumuloProcessor to extract records from a flow file. The location of a record field value can be - * placed into the value or part of the column qualifier ( this can/may change ) - * - * Supports deletes. If the delete flag is used we'll delete keys found within that flow file. - */ -public class PutAccumuloRecord extends BaseAccumuloProcessor { - - protected static final PropertyDescriptor MEMORY_SIZE = new PropertyDescriptor.Builder() - .name("Memory Size") - .description("The maximum memory size Accumulo at any one time from the record set.") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("10 MB") - .build(); - - protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder() - .name("Column Family") - .description("The Column Family to use when inserting data into Accumulo") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new PropertyDescriptor.Builder() - .name("Column Family Field") - .description("Field name used as the column family if one is not specified above.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - protected static final PropertyDescriptor DELETE_KEY = new PropertyDescriptor.Builder() - .name("delete-key") - .displayName("Delete Key") - .description("Deletes the key") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new PropertyDescriptor.Builder() - .name("record-value-in-qualifier") - .displayName("Record Value In Qualifier") - .description("Places the record value into the column qualifier instead of the value.") - .required(false) - .defaultValue("False") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new PropertyDescriptor.Builder() - .name("flush-on-flow-file") - .displayName("Flush Every FlowFile") - .description("Flushes the table writer on every flow file.") - .required(true) - .defaultValue("True") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new PropertyDescriptor.Builder() - .name("field-delimiter-as-hex") - .displayName("Hex Encode Field Delimiter") - .description("Allows you to hex encode the delimiter as a character. So 0x00 places a null character between the record name and value.") - .required(false) - .defaultValue("False") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - protected static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder() - .name("field-delimiter") - .displayName("Field Delimiter") - .description("Delimiter between the record value and name. ") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - - protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() - .name("record-reader") - .displayName("Record Reader") - .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") - .identifiesControllerService(RecordReaderFactory.class) - .required(true) - .build(); - - protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() - .name("Row Identifier Field Name") - .description("Specifies the name of a record field whose value should be used as the row id for the given record." + - " If EL defines a value that is not a field name that will be used as the row identifier.") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - - protected static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder() - .name("timestamp-field") - .displayName("Timestamp Field") - .description("Specifies the name of a record field whose value should be used as the timestamp. If empty a timestamp will be recorded as the time of insertion") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - - protected static final PropertyDescriptor VISIBILITY_PATH = new PropertyDescriptor.Builder() - .name("visibility-path") - .displayName("Visibility String Record Path Root") - .description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths") - .required(false) - .addValidator(Validator.VALID) - .build(); - - protected static final PropertyDescriptor DEFAULT_VISIBILITY = new PropertyDescriptor.Builder() - .name("default-visibility") - .displayName("Default Visibility") - .description("Default visibility when VISIBILITY_PATH is not defined. ") - .required(false) - .addValidator(Validator.VALID) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after it has been successfully stored in Accumulo") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be sent to Accumulo") - .build(); - - - /** - * Connector service which provides us a connector if the configuration is correct. - */ - protected BaseAccumuloService accumuloConnectorService; - - /** - * Connector that we need to persist while we are operational. - */ - protected AccumuloClient client; - - /** - * Table writer that will close when we shutdown or upon error. - */ - private MultiTableBatchWriter tableWriter = null; - - /** - * Record path cache - */ - protected RecordPathCache recordPathCache; - - - /** - * Flushes the tableWriter on every flow file if true. - */ - protected boolean flushOnEveryFlow; - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - return rels; - } - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - Collection<ValidationResult> set = new HashSet<>(); - if (!validationContext.getProperty(COLUMN_FAMILY).isSet() && !validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet()) - set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined").build()); - else if (validationContext.getProperty(COLUMN_FAMILY).isSet() && validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet()) - set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined, but not both").build()); - return set; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class); - final Double maxBytes = context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B); - this.client = accumuloConnectorService.getClient(); - BatchWriterConfig writerConfig = new BatchWriterConfig(); - writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger()); - writerConfig.setMaxMemory(maxBytes.longValue()); - writerConfig.setTimeout(context.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS); - tableWriter = client.createMultiTableBatchWriter(writerConfig); - flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean(); - if (!flushOnEveryFlow){ - writerConfig.setMaxLatency(60, TimeUnit.SECONDS); - } - - if (context.getProperty(CREATE_TABLE).asBoolean() && !context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) { - final Map<String, String> flowAttributes = new HashMap<>(); - final String table = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue(); - final TableOperations tableOps = this.client.tableOperations(); - if (!tableOps.exists(table)) { - getLogger().info("Creating " + table + " table."); - try { - tableOps.create(table); - } catch (TableExistsException te) { - // can safely ignore - } catch (AccumuloSecurityException | AccumuloException e) { - getLogger().info("Accumulo or Security error creating. Continuing... " + table + ". ", e); - } - } - } - } - - - @OnUnscheduled - @OnDisabled - public synchronized void shutdown(){ - /** - * Close the writer when we are shut down. - */ - if (null != tableWriter){ - try { - tableWriter.close(); - } catch (MutationsRejectedException e) { - getLogger().error("Mutations were rejected",e); - } - tableWriter = null; - } - } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties); - properties.add(RECORD_READER_FACTORY); - properties.add(ROW_FIELD_NAME); - properties.add(ROW_FIELD_NAME); - properties.add(COLUMN_FAMILY); - properties.add(COLUMN_FAMILY_FIELD); - properties.add(DELETE_KEY); - properties.add(FLUSH_ON_FLOWFILE); - properties.add(FIELD_DELIMITER); - properties.add(FIELD_DELIMITER_AS_HEX); - properties.add(MEMORY_SIZE); - properties.add(RECORD_IN_QUALIFIER); - properties.add(TIMESTAMP_FIELD); - properties.add(VISIBILITY_PATH); - properties.add(DEFAULT_VISIBILITY); - return properties; - } - - - @Override - public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { - final FlowFile flowFile = processSession.get(); - if (flowFile == null) { - return; - } - - final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY) - .asControllerService(RecordReaderFactory.class); - - final String recordPathText = processContext.getProperty(VISIBILITY_PATH).getValue(); - final String defaultVisibility = processContext.getProperty(DEFAULT_VISIBILITY).isSet() ? processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null; - - final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - - accumuloConnectorService.renewTgtIfNecessary(); - - // create the table if EL is present, create table is true and the table does not exist. - if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) { - final TableOperations tableOps = this.client.tableOperations(); - if (!tableOps.exists(tableName)) { - getLogger().info("Creating " + tableName + " table."); - try { - tableOps.create(tableName); - } catch (TableExistsException te) { - // can safely ignore, though we shouldn't arrive here due to table.exists called, but it's possible - // that with multiple threads two could attempt table creation concurrently. We don't want that - // to be a failure. - } catch (AccumuloSecurityException | AccumuloException e) { - throw new ProcessException("Accumulo or Security error creating. Continuing... " + tableName + ". ",e); - } - } - } - - AccumuloRecordConfiguration builder = AccumuloRecordConfiguration.Builder.newBuilder() - .setTableName(tableName) - .setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue()) - .setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue()) - .setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue()) - .setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean()) - .setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ? processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue() : "") - .setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ? processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false) - .setDelete(processContext.getProperty(DELETE_KEY).isSet() ? processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean() : false) - .setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build(); - - - RecordPath recordPath = null; - if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) { - recordPath = recordPathCache.getCompiled(recordPathText); - } - - boolean failed = false; - Mutation prevMutation=null; - try (final InputStream in = processSession.read(flowFile); - final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) { - Record record; - /** - * HBase supports a restart point. This may be something that we can/should add if needed. - */ - while ((record = reader.nextRecord()) != null) { - prevMutation = createMutation(prevMutation, processContext, record, reader.getSchema(), recordPath, flowFile,defaultVisibility, builder); - - } - addMutation(builder.getTableName(),prevMutation); - } catch (Exception ex) { - getLogger().error("Failed to put records to Accumulo.", ex); - failed = true; - } - - if (flushOnEveryFlow){ - try { - tableWriter.flush(); - } catch (MutationsRejectedException e) { - throw new ProcessException(e); - } - } - - - if (failed) { - processSession.transfer(processSession.penalize(flowFile), REL_FAILURE); - } else { - processSession.transfer(flowFile, REL_SUCCESS); - } - } - - /** - * Adapted from HBASEUtils. Their approach seemed ideal for what our intent is here. - * @param columnFamily column family from which to extract the visibility or to execute an expression against - * @param columnQualifier column qualifier from which to extract the visibility or to execute an expression against - * @param flowFile flow file being written - * @param context process context - * @return Visibility - */ - public static String produceVisibility(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) { - if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) { - return null; - } - String lookupKey = String.format("visibility.%s%s%s", columnFamily, !org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "", columnQualifier); - String fromAttribute = flowFile.getAttribute(lookupKey); - - if (fromAttribute == null && !org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) { - String lookupKeyFam = String.format("visibility.%s", columnFamily); - fromAttribute = flowFile.getAttribute(lookupKeyFam); - } - - if (fromAttribute != null) { - return fromAttribute; - } else { - PropertyValue descriptor = context.getProperty(lookupKey); - if (descriptor == null || !descriptor.isSet()) { - descriptor = context.getProperty(String.format("visibility.%s", columnFamily)); - } - - String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null; - - return retVal; - } - } - - private void addMutation(final String tableName, final Mutation m) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { - tableWriter.getBatchWriter(tableName).addMutation(m); - - } - - /** - * Returns the row provided the record schema - * @param record record against which we are evaluating - * @param schema Record schema - * @param rowOrFieldName Row identifier or field name - * @return Text object containing the resulting row. - */ - private Text getRow(final Record record, - final RecordSchema schema, - final String rowOrFieldName){ - if ( !schema.getFieldNames().contains(rowOrFieldName) ){ - return new Text(rowOrFieldName); - } else{ - return new Text(record.getAsString(rowOrFieldName)); - } - } - - /** - * Creates a mutation with the provided arguments - * @param prevMutation previous mutation, to append to if in the same row. - * @param context process context. - * @param record record object extracted from the flow file - * @param schema schema for this record - * @param recordPath record path for visibility extraction - * @param flowFile flow file - * @param defaultVisibility default visibility - * @param config configuration of this instance. - * @return Returns the Mutation to insert - * @throws AccumuloSecurityException Error accessing Accumulo - * @throws AccumuloException Non security ( or table ) related Accumulo exceptions writing to the store. - * @throws TableNotFoundException Table not found on the cluster - */ - protected Mutation createMutation(final Mutation prevMutation, - final ProcessContext context, - final Record record, - final RecordSchema schema, - final RecordPath recordPath, - final FlowFile flowFile, - final String defaultVisibility, - AccumuloRecordConfiguration config) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { - Mutation m=null; - if (record != null) { - - final Long timestamp; - Set<String> fieldsToSkip = new HashSet<>(); - if (!StringUtils.isBlank(config.getTimestampField())) { - try { - timestamp = record.getAsLong(config.getTimestampField()); - fieldsToSkip.add(config.getTimestampField()); - } catch (Exception e) { - throw new AccumuloException("Could not convert " + config.getTimestampField() + " to a long", e); - } - - if (timestamp == null) { - getLogger().warn("The value of timestamp field " + config.getTimestampField() + " was null, record will be inserted with latest timestamp"); - } - } else { - timestamp = null; - } - - - - RecordField visField = null; - Map visSettings = null; - if (recordPath != null) { - final RecordPathResult result = recordPath.evaluate(record); - FieldValue fv = result.getSelectedFields().findFirst().get(); - visField = fv.getField(); - if (null != visField) - fieldsToSkip.add(visField.getFieldName()); - visSettings = (Map)fv.getValue(); - } - - - if (null != prevMutation){ - Text row = new Text(prevMutation.getRow()); - Text curRow = getRow(record,schema,config.getRowField()); - if (row.equals(curRow)){ - m = prevMutation; - } else{ - m = new Mutation(curRow); - addMutation(config.getTableName(),prevMutation); - } - } else{ - Text row = getRow(record,schema,config.getRowField()); - m = new Mutation(row); - } - - fieldsToSkip.add(config.getRowField()); - - String columnFamily = config.getColumnFamily(); - if (StringUtils.isBlank(columnFamily) && !StringUtils.isBlank(config.getColumnFamilyField())) { - final String cfField = config.getColumnFamilyField(); - columnFamily = record.getAsString(cfField); - fieldsToSkip.add(cfField); - } else if (StringUtils.isBlank(columnFamily) && StringUtils.isBlank(config.getColumnFamilyField())){ - throw new IllegalArgumentException("Invalid configuration for column family " + columnFamily + " and " + config.getColumnFamilyField()); - } - final Text cf = new Text(columnFamily); - - for (String name : schema.getFieldNames().stream().filter(p->!fieldsToSkip.contains(p)).collect(Collectors.toList())) { - String visString = (visField != null && visSettings != null && visSettings.containsKey(name)) - ? (String)visSettings.get(name) : defaultVisibility; - - Text cq = new Text(name); - final Value value; - String recordValue = record.getAsString(name); - if (config.getQualifierInKey()){ - final String delim = config.getFieldDelimiter(); - if (!StringUtils.isEmpty(delim)) { - if (config.getEncodeDelimiter()) { - byte [] asHex = HexFormat.of().parseHex(delim); - cq.append(asHex, 0, asHex.length); - }else{ - cq.append(delim.getBytes(), 0, delim.length()); - } - } - cq.append(recordValue.getBytes(),0,recordValue.length()); - value = new Value(); - } else{ - value = new Value(recordValue.getBytes()); - } - - if (StringUtils.isBlank(visString)) { - visString = produceVisibility(cf.toString(), cq.toString(), flowFile, context); - } - - ColumnVisibility cv = new ColumnVisibility(); - if (StringUtils.isBlank(visString)) { - if (!StringUtils.isBlank(defaultVisibility)) { - cv = new ColumnVisibility(defaultVisibility); - } - } else { - cv = new ColumnVisibility(visString); - } - - if (null != timestamp) { - if (config.isDeleteKeys()) { - m.putDelete(cf, cq, cv, timestamp); - } else { - m.put(cf, cq, cv, timestamp, value); - } - } else{ - if (config.isDeleteKeys()) - m.putDelete(cf, cq, cv); - else - m.put(cf, cq, cv, value); - } - } - - - - } - - return m; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - /** - * Adapted from HBase puts. This is a good approach and one that we should adopt here, too. - */ - if (propertyDescriptorName.startsWith("visibility.")) { - String[] parts = propertyDescriptorName.split("\\."); - String displayName; - String description; - - if (parts.length == 2) { - displayName = String.format("Column Family %s Default Visibility", parts[1]); - description = String.format("Default visibility setting for %s", parts[1]); - } else if (parts.length == 3) { - displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]); - description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]); - } else { - return null; - } - - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .displayName(displayName) - .description(description) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dynamic(true) - .build(); - } - - return null; - } -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java deleted file mode 100644 index e9ad151be2..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.processors; - -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService; -import org.apache.nifi.accumulo.data.KeySchema; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.configuration.DefaultSchedule; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.util.StringUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; - -@SupportsBatching -@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) -@CapabilityDescription("Scan the given table and writes result in a flowfile. Value will be represented as UTF-8 Encoded String.") -@Tags({"hadoop", "accumulo", "scan", "record"}) -@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") -/** - * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class - * simply extends BaseAccumuloProcessor to scan accumulo based on aspects and expression executed against - * a flow file - * - */ -public class ScanAccumulo extends BaseAccumuloProcessor { - - static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder() - .displayName("Start key") - .name("start-key") - .description("Start row key") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - static final PropertyDescriptor START_KEY_INCLUSIVE = new PropertyDescriptor.Builder() - .displayName("Start key Inclusive") - .name("start-key-inclusive") - .description("Determines if the start key is inclusive ") - .required(false) - .defaultValue("True") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder() - .displayName("End key") - .name("end-key") - .description("End row key for this. If not specified or empty this will be infinite") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - static final PropertyDescriptor END_KEY_INCLUSIVE = new PropertyDescriptor.Builder() - .displayName("End key Inclusive") - .name("end-key-inclusive") - .description("Determines if the end key is inclusive") - .required(false) - .defaultValue("False") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder() - .name("accumulo-authorizations") - .displayName("Authorizations") - .description("The comma separated list of authorizations to pass to the scanner.") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - static final PropertyDescriptor COLUMNFAMILY = new PropertyDescriptor.Builder() - .name("column-family") - .displayName("Start Column Family") - .description("The column family that is part of the start key. If no column key is defined only this column family will be selected") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - static final PropertyDescriptor COLUMNFAMILY_END = new PropertyDescriptor.Builder() - .name("column-family-end") - .displayName("End Column Family") - .description("The column family to select is part of end key") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(Validator.VALID) - .build(); - - static final PropertyDescriptor VALUE_INCLUDED_IN_RESULT = new PropertyDescriptor.Builder() - .displayName("Value Included in Result") - .name("accumulo-value-inclusive") - .description("Beside keys and their values, accumulo value field will also be included in the result as UTF-8 Encoded String.") - .required(false) - .defaultValue("True") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after it has been successfully retrieved from Accumulo") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be retrieved fromAccumulo") - .build(); - - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() - .name("record-writer") - .displayName("Record Writer") - .description("Specifies the Controller Service to use for writing out the records") - .identifiesControllerService(RecordSetWriterFactory.class) - .required(true) - .build(); - - /** - * Connector service which provides us a connector if the configuration is correct. - */ - protected BaseAccumuloService accumuloConnectorService; - - /** - * Connector that we need to persist while we are operational. - */ - protected AccumuloClient client; - - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - return rels; - } - - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - Collection<ValidationResult> set = new ArrayList<>(); - if ((validationContext.getProperty(COLUMNFAMILY).isSet() && !validationContext.getProperty(COLUMNFAMILY_END).isSet()) - || !validationContext.getProperty(COLUMNFAMILY).isSet() && validationContext.getProperty(COLUMNFAMILY_END).isSet() ) - set.add(new ValidationResult.Builder().explanation("Column Family and Column family end must be defined").build()); - return set; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class); - this.client = accumuloConnectorService.getClient(); - } - - private Authorizations stringToAuth(final String authorizations){ - if (!StringUtils.isBlank(authorizations)) - return new Authorizations(authorizations.split(",")); - else - return new Authorizations(); - } - - - protected long scanAccumulo(final RecordSetWriterFactory writerFactory, final ProcessContext processContext, final ProcessSession processSession, final Optional<FlowFile> incomingFlowFile){ - - final Map<String, String> flowAttributes = incomingFlowFile.isPresent() ? incomingFlowFile.get().getAttributes() : new HashMap<>(); - final String table = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue(); - final String startKey = processContext.getProperty(START_KEY).evaluateAttributeExpressions(flowAttributes).getValue(); - final boolean startKeyInclusive = processContext.getProperty(START_KEY_INCLUSIVE).asBoolean(); - final boolean endKeyInclusive = processContext.getProperty(END_KEY_INCLUSIVE).asBoolean(); - final String endKey = processContext.getProperty(END_KEY).evaluateAttributeExpressions(flowAttributes).getValue(); - final String authorizations = processContext.getProperty(AUTHORIZATIONS).isSet() - ? processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowAttributes).getValue() : ""; - final int threads = processContext.getProperty(THREADS).asInteger(); - final String startKeyCf = processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue(); - final String endKeyCf = processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue(); - final boolean valueIncluded = processContext.getProperty(VALUE_INCLUDED_IN_RESULT).asBoolean(); - - final Authorizations auths = stringToAuth(authorizations); - - final LongAdder recordCounter = new LongAdder(); - - final Range lookupRange = buildRange(startKey,startKeyCf,startKeyInclusive,endKey,endKeyCf,endKeyInclusive); - - boolean cloneFlowFile = incomingFlowFile.isPresent(); - - accumuloConnectorService.renewTgtIfNecessary(); - - try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) { - if (!StringUtils.isBlank(startKeyCf) && StringUtils.isBlank(endKeyCf)) - scanner.fetchColumnFamily(new Text(startKeyCf)); - scanner.setRanges(Collections.singleton(lookupRange)); - scanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS); - - final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator(); - if (!kvIter.hasNext()){ - /** - * Create a flow file with a record count of zero. - */ - final Map<String, String> attributes = new HashMap<>(); - attributes.put("record.count", String.valueOf(0)); - final FlowFile newFlow = processSession.create(); - processSession.putAllAttributes(newFlow,attributes); - processSession.transfer(newFlow, REL_SUCCESS); - return 0; - } else{ - - while (kvIter.hasNext()) { - FlowFile iterationFlowFile = cloneFlowFile ? processSession.clone(incomingFlowFile.get()) : processSession.create(); - - final int keysPerFlowFile = 1000; - final Map<String, String> attributes = new HashMap<>(); - iterationFlowFile = processSession.write(iterationFlowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - - try{ - final RecordSchema writeSchema = determineRecordSchema(writerFactory, flowAttributes, valueIncluded); - - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, Collections.emptyMap())) { - - int i = 0; - writer.beginRecordSet(); - for (; i < keysPerFlowFile && kvIter.hasNext(); i++) { - - Map.Entry<Key, Value> kv = kvIter.next(); - - final Key key = kv.getKey(); - - Map<String, Object> data = new HashMap<>(); - data.put("row", key.getRow().toString()); - data.put("columnFamily", key.getColumnFamily().toString()); - data.put("columnQualifier", key.getColumnQualifier().toString()); - data.put("columnVisibility", key.getColumnVisibility().toString()); - data.put("timestamp", key.getTimestamp()); - if (valueIncluded) { - data.put("value", Objects.isNull(kv.getValue()) ? null : kv.getValue().toString()); - } - - Record record = new MapRecord(writeSchema, data); - writer.write(record); - - - } - recordCounter.add(i); - - final WriteResult writeResult = writer.finishRecordSet(); - attributes.put("record.count", String.valueOf(i)); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); - } - } catch (SchemaNotFoundException e) { - getLogger().error("Failed to process {}; will route to failure", new Object[] { - incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e}); - - throw new IOException(e); - } - } - - }); - processSession.putAllAttributes(iterationFlowFile,attributes); - processSession.transfer(iterationFlowFile, REL_SUCCESS); - } - } - } catch (final Exception e) { - getLogger().error("Failed to process {}; will route to failure", new Object[] {incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e}); - if (cloneFlowFile) { - processSession.transfer(incomingFlowFile.get(), REL_FAILURE); - } - return 0; - } - - if (cloneFlowFile) { - processSession.remove(incomingFlowFile.get()); - } - - getLogger().info("Successfully converted {} records for {}", new Object[] {recordCounter.longValue(), incomingFlowFile.toString()}); - - return recordCounter.longValue(); - } - - private RecordSchema determineRecordSchema(RecordSetWriterFactory writerFactory, Map<String, String> flowAttributes, boolean valueIncluded) throws SchemaNotFoundException, IOException { - final RecordSchema writeSchema = writerFactory.getSchema(flowAttributes, new KeySchema()); - - if (valueIncluded) { - final List<RecordField> recordSchemaFields = new ArrayList<>(); - recordSchemaFields.addAll(writeSchema.getFields()); - recordSchemaFields.add(new RecordField("value", RecordFieldType.STRING.getDataType())); - return new SimpleRecordSchema(recordSchemaFields); - } - return writeSchema; - } - - - Range buildRange(final String startRow, final String startKeyCf,boolean startKeyInclusive, final String endRow, final String endKeyCf,boolean endKeyInclusive){ - Key start = StringUtils.isBlank(startRow) ? null : StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new Key(startRow,startKeyCf); - Key end = StringUtils.isBlank(endRow) ? null : StringUtils.isBlank(endKeyCf) ? new Key(endRow) : new Key(endRow,endKeyCf); - return new Range(start,startKeyInclusive,end,endKeyInclusive); - } - - @Override - public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { - FlowFile flowFile = processSession.get(); - - final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - - long recordCount = scanAccumulo(writerFactory,processContext,processSession,Optional.ofNullable(flowFile)); - - processSession.adjustCounter("Records Processed", recordCount, false); - } - - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties); - properties.add(START_KEY); - properties.add(START_KEY_INCLUSIVE); - properties.add(END_KEY); - properties.add(COLUMNFAMILY); - properties.add(COLUMNFAMILY_END); - properties.add(END_KEY_INCLUSIVE); - properties.add(VALUE_INCLUDED_IN_RESULT); - properties.add(RECORD_WRITER); - properties.add(AUTHORIZATIONS); - return properties; - } - -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index a1ce07210c..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.accumulo.processors.PutAccumuloRecord -org.apache.nifi.accumulo.processors.ScanAccumulo diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml deleted file mode 100644 index 75a88a56dc..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - <artifactId>nifi-accumulo-services-api-nar</artifactId> - <packaging>nar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-services-api-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - </dependencies> -</project> \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml deleted file mode 100644 index 6491a3e0fb..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml +++ /dev/null @@ -1,35 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-accumulo-services-api</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - </dependency> - - </dependencies> -</project> diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java deleted file mode 100644 index 3266ad54dd..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.controllerservices; - -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.controller.ControllerService; - -@Tags({"accumulo", "client", "service"}) -@CapabilityDescription("Provides a basic connector to Accumulo services") -public interface BaseAccumuloService extends ControllerService { - - - AccumuloClient getClient(); - void renewTgtIfNecessary(); - -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml deleted file mode 100644 index fe6c251371..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-accumulo-services-nar</artifactId> - <packaging>nar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api-nar</artifactId> - <version>2.0.0-SNAPSHOT</version> - <type>nar</type> - </dependency> - </dependencies> -</project> \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml deleted file mode 100644 index 60056782ac..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml +++ /dev/null @@ -1,70 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-bundle</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <artifactId>nifi-accumulo-services</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-services-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-kerberos-credentials-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-kerberos-user-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security-kerberos</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security-kerberos-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-hadoop-utils</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - </dependencies> -</project> diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java deleted file mode 100644 index 11aaf0120c..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.controllerservices; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.ControllerServiceInitializationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.hadoop.SecurityUtil; -import org.apache.nifi.kerberos.KerberosCredentialsService; -import org.apache.nifi.kerberos.KerberosUserService; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.security.krb.KerberosKeytabUser; -import org.apache.nifi.security.krb.KerberosPasswordUser; -import org.apache.nifi.security.krb.KerberosUser; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -/** - * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this - * - * Justification: Centralizes the configuration of the connecting accumulo code. This also will be used - * for any kerberos integration. - */ -@RequiresInstanceClassLoading -@Tags({"accumulo", "client", "service"}) -@CapabilityDescription("A controller service for accessing an Accumulo Client.") -public class AccumuloService extends AbstractControllerService implements BaseAccumuloService { - - private enum AuthenticationType { - PASSWORD, - KERBEROS, - NONE - } - - protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() - .name("ZooKeeper Quorum") - .displayName("ZooKeeper Quorum") - .description("Comma-separated list of ZooKeeper hosts for Accumulo.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - - protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder() - .name("Instance Name") - .displayName("Instance Name") - .description("Instance name of the Accumulo cluster") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - - protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder() - .name("accumulo-authentication-type") - .displayName("Authentication Type") - .description("Authentication Type") - .allowableValues(AuthenticationType.values()) - .defaultValue(AuthenticationType.PASSWORD.toString()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder() - .name("Accumulo User") - .displayName("Accumulo User") - .description("Connecting user for Accumulo") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString()) - .build(); - - protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder() - .name("Accumulo Password") - .displayName("Accumulo Password") - .description("Connecting user's password") - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.PASSWORD.toString()) - .build(); - - protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() - .name("kerberos-user-service") - .displayName("Kerberos User Service") - .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos") - .identifiesControllerService(KerberosUserService.class) - .required(false) - .build(); - - protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .name("kerberos-credentials-service") - .displayName("Kerberos Credentials Service") - .description("Specifies the Kerberos Credentials Controller Service that should be used for principal + keytab Kerberos authentication") - .identifiesControllerService(KerberosCredentialsService.class) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString()) - .build(); - - protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() - .name("kerberos-principal") - .displayName("Kerberos Principal") - .description("Kerberos Principal") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString()) - .build(); - - protected static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() - .name("kerberos-password") - .displayName("Kerberos Password") - .description("Kerberos Password") - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString()) - .build(); - - protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new PropertyDescriptor.Builder() - .name("accumulo-sasl-qop") - .displayName("Accumulo SASL quality of protection") - .description("Accumulo SASL quality of protection for KERBEROS Authentication type") - .allowableValues("auth", "auth-int", "auth-conf") - .defaultValue("auth-conf") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dependsOn(AUTHENTICATION_TYPE, AuthenticationType.KERBEROS.toString()) - .build(); - - /** - * Reference to the accumulo client. - */ - AccumuloClient client; - - /** - * properties - */ - private List<PropertyDescriptor> properties; - - private KerberosUser kerberosUser; - - private AuthenticationType authType; - - @Override - protected void init(ControllerServiceInitializationContext config) { - List<PropertyDescriptor> props = new ArrayList<>(); - props.add(ZOOKEEPER_QUORUM); - props.add(INSTANCE_NAME); - props.add(AUTHENTICATION_TYPE); - props.add(ACCUMULO_USER); - props.add(ACCUMULO_PASSWORD); - props.add(KERBEROS_USER_SERVICE); - props.add(KERBEROS_CREDENTIALS_SERVICE); - props.add(KERBEROS_PRINCIPAL); - props.add(KERBEROS_PASSWORD); - props.add(ACCUMULO_SASL_QOP); - properties = Collections.unmodifiableList(props); - } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - final List<ValidationResult> problems = new ArrayList<>(); - - if (!validationContext.getProperty(INSTANCE_NAME).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance name must be supplied").build()); - } - - if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build()); - } - - final AuthenticationType type = validationContext.getProperty( - AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.NONE; - - switch(type){ - case PASSWORD: - if (!validationContext.getProperty(ACCUMULO_USER).isSet()){ - problems.add( - new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied for the Password Authentication type").build()); - } - if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){ - problems.add( - new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()) - .explanation("Password must be supplied for the Password Authentication type").build()); - } - break; - case KERBEROS: - if (!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && !validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() - && !validationContext.getProperty(KERBEROS_PASSWORD).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()) - .explanation("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set").build()); - } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()) - .explanation("Kerberos Password and Kerberos Credential Service should not be filled out at the same time").build()); - } else if (validationContext.getProperty(KERBEROS_PASSWORD).isSet() && !validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) { - problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName()) - .explanation("Kerberos Principal must be supplied when principal + password Kerberos authentication is used").build()); - } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName()) - .explanation("Kerberos Principal (for password) should not be filled out when principal + keytab Kerberos authentication is used").build()); - } else if (validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) { - problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName()) - .explanation("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service").build()); - } else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PASSWORD).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()) - .explanation("Kerberos Password and Kerberos User Service should not be filled out at the same time").build()); - } else if (validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){ - problems.add(new ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName()) - .explanation("Kerberos Principal (for password) should not be filled out when Kerberos User Service is used").build()); - } - break; - default: - problems.add(new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non supported Authentication type").build()); - } - - return problems; - } - - @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { - if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet()) { - throw new InitializationException("Instance name and Zookeeper Quorum must be specified"); - } - - final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); - final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue(); - final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue(); - this.authType = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue()); - - final Properties clientConf = new Properties(); - clientConf.setProperty("instance.zookeepers", zookeepers); - clientConf.setProperty("instance.name", instanceName); - - switch(authType){ - case PASSWORD: - final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue(); - - final AuthenticationToken token = new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue()); - - this.client = Accumulo.newClient().from(clientConf).as(accumuloUser, token).build(); - break; - case KERBEROS: - if (kerberosUserService != null) { - this.kerberosUser = kerberosUserService.createKerberosUser(); - } else if (kerberosCredentialsService != null) { - this.kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab()); - } else { - this.kerberosUser = new KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(), context.getProperty(KERBEROS_PASSWORD).getValue()); - } - - clientConf.setProperty("sasl.enabled", "true"); - clientConf.setProperty("sasl.qop", context.getProperty(ACCUMULO_SASL_QOP).getValue()); - - //Client uses the currently logged in user's security context, so need to login first. - Configuration conf = new Configuration(); - conf.set("hadoop.security.authentication", "kerberos"); - UserGroupInformation.setConfiguration(conf); - final UserGroupInformation clientUgi = SecurityUtil.getUgiForKerberosUser(conf, kerberosUser); - - this.client = clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () -> - Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new KerberosToken()).build()); - break; - default: - throw new InitializationException("Not supported authentication type."); - } - } - - @Override - public AccumuloClient getClient() { - return client; - } - - @Override - public void renewTgtIfNecessary() { - if (authType.equals(AuthenticationType.KERBEROS)) { - SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser); - } - } - - @OnDisabled - public void shutdown() { - if (client != null) { - client.close(); - } - } -} diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService deleted file mode 100644 index 0e27be47a5..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.accumulo.controllerservices.AccumuloService diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java deleted file mode 100644 index 8424da13a0..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.accumulo.controllerservices; - -import org.apache.nifi.kerberos.KerberosCredentialsService; -import org.apache.nifi.kerberos.KerberosUserService; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestAccumuloService { - - private static final String INSTANCE = "instance"; - private static final String ZOOKEEPER = "zookeeper"; - private static final String PASSWORD = "PASSWORD"; - private static final String USER = "USER"; - private static final String KERBEROS = "KERBEROS"; - private static final String PRINCIPAL = "principal"; - private static final String KERBEROS_PASSWORD = "kerberos_password"; - private static final String NONE = "NONE"; - - private TestRunner runner; - private AccumuloService accumuloService; - - private final KerberosCredentialsService credentialService = mock(KerberosCredentialsService.class); - private final KerberosUserService kerberosUserService = mock(KerberosUserService.class); - private final Processor dummyProcessor = mock(Processor.class); - - @BeforeEach - public void init() { - runner = TestRunners.newTestRunner(dummyProcessor); - accumuloService = new AccumuloService(); - - when(credentialService.getIdentifier()).thenReturn("1"); - when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1"); - } - - @Test - public void testServiceValidWithAuthTypePasswordAndInstanceZookeeperUserPasswordAreSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD); - runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER); - runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD); - - runner.assertValid(accumuloService); - } - - @Test - public void testServiceNotValidWithInstanceMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - - assertServiceIsInvalidWithErrorMessage("Instance name must be supplied"); - } - - @Test - public void testServiceNotValidWithZookeeperMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - - assertServiceIsInvalidWithErrorMessage("Zookeepers must be supplied"); - } - - @Test - public void testServiceNotValidWithAuthTypeNone() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, NONE); - - assertServiceIsInvalidWithErrorMessage("Non supported Authentication type"); - } - - @Test - public void testServiceNotValidWithAuthTypePasswordAndUserMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD); - runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD, PASSWORD); - - assertServiceIsInvalidWithErrorMessage("Accumulo user must be supplied"); - } - - @Test - public void testServiceNotValidWithAuthTypePasswordAndPasswordMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, PASSWORD); - runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER, USER); - - assertServiceIsInvalidWithErrorMessage("Password must be supplied"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - - assertServiceIsInvalidWithErrorMessage("Either Kerberos Password, Kerberos Credential Service, or Kerberos User Service must be set"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndKerberosPrincipalMissing() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD); - - assertServiceIsInvalidWithErrorMessage("Kerberos Principal must be supplied"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD); - runner.addControllerService("kerberos-credentials-service", credentialService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier()); - - assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndPrincipalAndCredentialServiceSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL); - runner.addControllerService("kerberos-credentials-service", credentialService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier()); - - assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for password) should not be filled out"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, KERBEROS_PASSWORD); - runner.addControllerService("kerberos-user-service", kerberosUserService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); - - assertServiceIsInvalidWithErrorMessage("should not be filled out at the same time"); - } - - @Test - public void testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - - runner.addControllerService("kerberos-credentials-service", credentialService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_CREDENTIALS_SERVICE, credentialService.getIdentifier()); - - runner.addControllerService("kerberos-user-service", kerberosUserService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); - - assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot be specified while also specifying a Kerberos Credential Service"); - } - - @Test - public void testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws InitializationException { - runner.addControllerService("accumulo-connector-service", accumuloService); - runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, INSTANCE); - runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, ZOOKEEPER); - runner.setProperty(accumuloService, AccumuloService.AUTHENTICATION_TYPE, KERBEROS); - runner.addControllerService("kerberos-user-service", kerberosUserService); - runner.enableControllerService(kerberosUserService); - runner.setProperty(accumuloService, AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); - runner.assertValid(accumuloService); - } - - private void assertServiceIsInvalidWithErrorMessage(String errorMessage) { - Exception exception = assertThrows(IllegalStateException.class, () -> runner.enableControllerService(accumuloService)); - assertTrue(exception.getMessage().contains(errorMessage)); - } -} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml b/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml deleted file mode 100644 index 5c64d96a56..0000000000 --- a/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml +++ /dev/null @@ -1,85 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-services-api-bom</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../nifi-standard-services-api-bom</relativePath> - </parent> - - <properties> - <accumulo.version>2.1.2</accumulo.version> - <guava.version>33.2.0-jre</guava.version> - </properties> - - <artifactId>nifi-accumulo-bundle</artifactId> - <packaging>pom</packaging> - - <modules> - <module>nifi-accumulo-services-api</module> - <module>nifi-accumulo-services-api-nar</module> - <module>nifi-accumulo-services</module> - <module>nifi-accumulo-services-nar</module> - <module>nifi-accumulo-processors</module> - <module>nifi-accumulo-nar</module> - </modules> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-accumulo-processors</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <!-- Override nimbus-jose-jwt 9.8.1 from hadoop-auth --> - <dependency> - <groupId>com.nimbusds</groupId> - <artifactId>nimbus-jose-jwt</artifactId> - <version>9.37.3</version> - </dependency> - <!-- Override Hadoop from accumulo-core --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-runtime</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <version>${accumulo.version}</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <!-- Override Guava 31.1 --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - </dependencies> - </dependencyManagement> -</project> diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml index c999eec3ac..fa85ffa68c 100755 --- a/nifi-extension-bundles/pom.xml +++ b/nifi-extension-bundles/pom.xml @@ -78,7 +78,6 @@ <module>nifi-prometheus-bundle</module> <module>nifi-sql-reporting-bundle</module> <module>nifi-hazelcast-bundle</module> - <module>nifi-accumulo-bundle</module> <module>nifi-asn1-bundle</module> <module>nifi-pgp-bundle</module> <module>nifi-hashicorp-vault-bundle</module>