This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new b1be71f918 NIFI-11334: Fixed PutIceberg processor instance interference due to same class loader usage b1be71f918 is described below commit b1be71f918e45497099b069d04482bde8aff025d Author: Mark Bathori <bathori.m...@gmail.com> AuthorDate: Thu Jun 29 08:59:43 2023 +0200 NIFI-11334: Fixed PutIceberg processor instance interference due to same class loader usage This closes #7449. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi-iceberg-processors-nar/pom.xml | 258 +++++++++++++-------- .../nifi-iceberg-processors/pom.xml | 102 +++++++- .../iceberg/AbstractIcebergProcessor.java | 22 +- .../nifi/processors/iceberg/IcebergUtils.java} | 28 ++- .../apache/nifi/processors/iceberg/PutIceberg.java | 7 +- .../iceberg/catalog/IcebergCatalogFactory.java | 87 +++++++ .../processors/iceberg/TestDataFileActions.java | 4 +- .../iceberg/TestPutIcebergCustomValidation.java | 24 +- .../iceberg/TestPutIcebergWithHadoopCatalog.java | 15 +- .../iceberg/TestPutIcebergWithHiveCatalog.java | 29 ++- .../iceberg/catalog/TestHadoopCatalogService.java | 25 +- .../iceberg/catalog/TestHiveCatalogService.java | 64 ++--- .../src/test/resources/secured-core-site.xml | 22 ++ .../src/test/resources/unsecured-core-site.xml | 22 ++ .../nifi-iceberg-services-api-nar/pom.xml | 162 ------------- .../nifi-iceberg-services-api/pom.xml | 115 --------- ...logService.java => IcebergCatalogProperty.java} | 21 +- .../services/iceberg/IcebergCatalogService.java | 11 +- ...CatalogService.java => IcebergCatalogType.java} | 15 +- .../nifi-iceberg-services/pom.xml | 5 + .../services/iceberg/AbstractCatalogService.java | 62 +++-- .../services/iceberg/HadoopCatalogService.java | 24 +- .../nifi/services/iceberg/HiveCatalogService.java | 62 ++--- 23 files changed, 631 insertions(+), 555 deletions(-) diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml index 3c7e102bca..4ed687eb42 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml @@ -43,101 +43,165 @@ </dependency> </dependencies> - <dependencyManagement> - <dependencies> - <!-- Provided through nifi-iceberg-services-api --> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-shims</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.codehaus.groovy</groupId> - <artifactId>groovy-all</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-registry</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.ant</groupId> - <artifactId>ant</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.ivy</groupId> - <artifactId>ivy</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - <scope>provided</scope> - <classifier>nohive</classifier> - </dependency> - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-avro</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - </dependencyManagement> + <profiles> + <!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem --> + <profile> + <id>include-hadoop-aws</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </profile> + <!-- Includes hadoop-azure and hadoop-azure-datalake for accessing HDFS with wasb://, abfs://, and adl:// filesystems --> + <profile> + <id>include-hadoop-azure</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-azure</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-azure-datalake</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + <!-- Includes hadoop-cloud-storage --> + <profile> + <id>include-hadoop-cloud-storage</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-cloud-storage</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + <!-- Includes hadoop-ozone for o3fs:// file system --> + <profile> + <id>include-hadoop-ozone</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.ozone</groupId> + <artifactId>ozone-client</artifactId> + <version>${ozone.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + </exclusion> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk15on</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.ozone</groupId> + <artifactId>ozone-filesystem</artifactId> + <version>${ozone.version}</version> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk18on</artifactId> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk18on</artifactId> + </dependency> + </dependencies> + </profile> + <!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem --> + <profile> + <id>include-hadoop-gcp</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcs-connector</artifactId> + <version>hadoop3-${gcs.version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util</artifactId> + <version>${gcs.version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util-hadoop</artifactId> + <version>hadoop3-${gcs.version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcsio</artifactId> + <version>${gcs.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> </project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml index 61d40d0365..96e67ce03f 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml @@ -74,13 +74,27 @@ <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>${iceberg.version}</version> - <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-hive-metastore</artifactId> + <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-data</artifactId> <version>${iceberg.version}</version> </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-parquet</artifactId> + <version>${iceberg.version}</version> + </dependency> + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-orc</artifactId> + <version>${iceberg.version}</version> + </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> @@ -171,18 +185,100 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> + <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> - <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> <exclusion> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-web</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-api</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-core</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-hbase-compat-1.0</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> + </exclusion> + <exclusion> + <groupId>com.tdunning</groupId> + <artifactId>json</artifactId> + </exclusion> + <exclusion> + <groupId>com.zaxxer</groupId> + <artifactId>HikariCP</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> </exclusions> </dependency> diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java index 9f527344ec..3352959d17 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java @@ -18,9 +18,12 @@ package org.apache.nifi.processors.iceberg; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.ClassloaderIsolationKeyProvider; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processor.AbstractProcessor; @@ -36,11 +39,13 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser; +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; /** * Base Iceberg processor class. */ -public abstract class AbstractIcebergProcessor extends AbstractProcessor { +@RequiresInstanceClassLoading(cloneAncestorResources = true) +public abstract class AbstractIcebergProcessor extends AbstractProcessor implements ClassloaderIsolationKeyProvider { public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder() .name("catalog-service") @@ -66,14 +71,14 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { private volatile UserGroupInformation ugi; @OnScheduled - public final void onScheduled(final ProcessContext context) { + public void onScheduled(final ProcessContext context) { final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); if (kerberosUserService != null) { this.kerberosUser = kerberosUserService.createKerberosUser(); try { - this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser); + this.ugi = getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser); } catch (IOException e) { throw new ProcessException("Kerberos Authentication failed", e); } @@ -81,7 +86,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { } @OnStopped - public final void onStopped() { + public void onStopped() { if (kerberosUser != null) { try { kerberosUser.logout(); @@ -117,6 +122,15 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { } } + @Override + public String getClassloaderIsolationKey(PropertyContext context) { + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + if (kerberosUserService != null) { + return kerberosUserService.getIdentifier(); + } + return null; + } + private UserGroupInformation getUgi() { try { kerberosUser.checkTGTAndRelogin(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java similarity index 55% copy from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java copy to nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java index 991ac625dc..7a3db7de71 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java @@ -15,18 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.services.iceberg; +package org.apache.nifi.processors.iceberg; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.nifi.controller.ControllerService; +import org.apache.hadoop.fs.Path; -/** - * Provides a basic connector to Iceberg catalog services. - */ -public interface IcebergCatalogService extends ControllerService { +import java.util.List; - Catalog getCatalog(); +public class IcebergUtils { - Configuration getConfiguration(); + /** + * Loads configuration files from the provided paths. + * + * @param configFilePaths list of config file paths separated with comma + * @return merged configuration + */ + public static Configuration getConfigurationFromFiles(List<String> configFilePaths) { + final Configuration conf = new Configuration(); + if (configFilePaths != null) { + for (final String configFile : configFilePaths) { + conf.addResource(new Path(configFile.trim())); + } + } + return conf; + } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 20f67093ab..360ea17f1b 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; import org.apache.nifi.serialization.RecordReader; @@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; @Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"}) @CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " + @@ -208,7 +210,7 @@ public class PutIceberg extends AbstractIcebergProcessor { if (catalogServiceEnabled) { final boolean kerberosUserServiceIsSet = context.getProperty(KERBEROS_USER_SERVICE).isSet(); - final boolean securityEnabled = SecurityUtil.isSecurityEnabled(catalogService.getConfiguration()); + final boolean securityEnabled = SecurityUtil.isSecurityEnabled(getConfigurationFromFiles(catalogService.getConfigFilePaths())); if (securityEnabled && !kerberosUserServiceIsSet) { problems.add(new ValidationResult.Builder() @@ -293,7 +295,8 @@ public class PutIceberg extends AbstractIcebergProcessor { final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final Catalog catalog = catalogService.getCatalog(); + final IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + final Catalog catalog = catalogFactory.create(); final Namespace namespace = Namespace.of(catalogNamespace); final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java new file mode 100644 index 0000000000..1b1e058090 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg.catalog; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; +import org.apache.nifi.services.iceberg.IcebergCatalogService; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + +public class IcebergCatalogFactory { + + private final IcebergCatalogService catalogService; + + public IcebergCatalogFactory(IcebergCatalogService catalogService) { + this.catalogService = catalogService; + } + + public Catalog create() { + switch (catalogService.getCatalogType()) { + case HIVE: + return initHiveCatalog(catalogService); + case HADOOP: + return initHadoopCatalog(catalogService); + default: + throw new IllegalArgumentException("Unknown catalog type: " + catalogService.getCatalogType()); + } + } + + private Catalog initHiveCatalog(IcebergCatalogService catalogService) { + HiveCatalog catalog = new HiveCatalog(); + + if (catalogService.getConfigFilePaths() != null) { + final Configuration configuration = getConfigurationFromFiles(catalogService.getConfigFilePaths()); + catalog.setConf(configuration); + } + + final Map<IcebergCatalogProperty, String> catalogProperties = catalogService.getCatalogProperties(); + final Map <String, String> properties = new HashMap<>(); + + if (catalogProperties.containsKey(METASTORE_URI)) { + properties.put(CatalogProperties.URI, catalogProperties.get(METASTORE_URI)); + } + + if (catalogProperties.containsKey(WAREHOUSE_LOCATION)) { + properties.put(CatalogProperties.WAREHOUSE_LOCATION, catalogProperties.get(WAREHOUSE_LOCATION)); + } + + catalog.initialize("hive-catalog", properties); + return catalog; + } + + private Catalog initHadoopCatalog(IcebergCatalogService catalogService) { + final Map<IcebergCatalogProperty, String> catalogProperties = catalogService.getCatalogProperties(); + final String warehousePath = catalogProperties.get(WAREHOUSE_LOCATION); + + if (catalogService.getConfigFilePaths() != null) { + return new HadoopCatalog(getConfigurationFromFiles(catalogService.getConfigFilePaths()), warehousePath); + } else { + return new HadoopCatalog(new Configuration(), warehousePath); + } + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java index ee6b4c0e19..4e535c3f8a 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -33,6 +33,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; @@ -193,7 +194,8 @@ public class TestDataFileActions { private Table initCatalog() throws IOException { TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - Catalog catalog = catalogService.getCatalog(); + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + Catalog catalog = catalogFactory.create(); return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned()); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java index 7ca4bde3ac..36f14cedb8 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java @@ -17,7 +17,6 @@ */ package org.apache.nifi.processors.iceberg; -import org.apache.hadoop.conf.Configuration; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.reporting.InitializationException; @@ -27,6 +26,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.List; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,8 +58,8 @@ public class TestPutIcebergCustomValidation { runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME); } - private void initCatalogService(Configuration configuration) throws InitializationException { - TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfig(configuration).build(); + private void initCatalogService(List<String> configFilePaths) throws InitializationException { + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfigFilePaths(configFilePaths).build(); runner.addControllerService(CATALOG_SERVICE_NAME, catalogService); runner.enableControllerService(catalogService); @@ -78,10 +80,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException { initRecordReader(); - - Configuration config = new Configuration(); - config.set("hadoop.security.authentication", "kerberos"); - initCatalogService(config); + initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -91,10 +90,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - - Configuration config = new Configuration(); - config.set("hadoop.security.authentication", "kerberos"); - initCatalogService(config); + initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); initKerberosUserService(); @@ -106,8 +102,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - - initCatalogService(new Configuration()); + initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -117,8 +112,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException { initRecordReader(); - - initCatalogService(new Configuration()); + initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); initKerberosUserService(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java index 9b75ee9ef6..ff8f5a9a3e 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java @@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; @@ -62,6 +63,7 @@ public class TestPutIcebergWithHadoopCatalog { private TestRunner runner; private PutIceberg processor; private Schema inputSchema; + private Catalog catalog; private static final Namespace NAMESPACE = Namespace.of("default"); @@ -100,9 +102,10 @@ public class TestPutIcebergWithHadoopCatalog { runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); } - private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException { + private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException { TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - Catalog catalog = catalogService.getCatalog(); + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + catalog = catalogFactory.create(); Map<String, String> tableProperties = new HashMap<>(); tableProperties.put(TableProperties.FORMAT_VERSION, "2"); @@ -114,8 +117,6 @@ public class TestPutIcebergWithHadoopCatalog { runner.enableControllerService(catalogService); runner.setProperty(PutIceberg.CATALOG, "catalog-service"); - - return catalog; } @DisabledOnOs(WINDOWS) @@ -128,7 +129,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -156,7 +157,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -185,7 +186,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index 3ec70c3d37..c672d90e8b 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -22,12 +22,14 @@ import org.apache.commons.io.IOUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.hive.metastore.ThriftMetastore; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; import org.apache.nifi.reporting.InitializationException; @@ -66,7 +68,7 @@ public class TestPutIcebergWithHiveCatalog { private TestRunner runner; private PutIceberg processor; private Schema inputSchema; - private TestHiveCatalogService catalogService; + private Catalog catalog; @RegisterExtension public static ThriftMetastore metastore = new ThriftMetastore(); @@ -90,16 +92,11 @@ public class TestPutIcebergWithHiveCatalog { inputSchema = new Schema.Parser().parse(avroSchema); processor = new PutIceberg(); - - catalogService = new TestHiveCatalogService.Builder() - .withMetastoreUri(metastore.getThriftConnectionUri()) - .withWarehouseLocation(metastore.getWarehouseLocation()) - .build(); } @AfterEach public void tearDown() { - catalogService.getCatalog().dropTable(TABLE_IDENTIFIER); + catalog.dropTable(TABLE_IDENTIFIER); } private void initRecordReader() throws InitializationException { @@ -126,7 +123,15 @@ public class TestPutIcebergWithHiveCatalog { tableProperties.put(TableProperties.FORMAT_VERSION, "2"); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); - catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() + .withMetastoreUri(metastore.getThriftConnectionUri()) + .withWarehouseLocation(metastore.getWarehouseLocation()) + .build(); + + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + catalog = catalogFactory.create(); + + catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); runner.addControllerService("catalog-service", catalogService); runner.enableControllerService(catalogService); @@ -150,7 +155,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -187,7 +192,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -225,7 +230,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -266,7 +271,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0], attributes); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java index 111c4c5720..9673b894a3 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java @@ -17,35 +17,40 @@ */ package org.apache.nifi.processors.iceberg.catalog; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.apache.nifi.services.iceberg.IcebergCatalogType; import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static java.nio.file.Files.createTempDirectory; public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService { - private final HadoopCatalog catalog; + private final Map<IcebergCatalogProperty, String> catalogProperties = new HashMap<>(); public TestHadoopCatalogService() throws IOException { File warehouseLocation = createTempDirectory("metastore").toFile(); - - catalog = new HadoopCatalog(new Configuration(), warehouseLocation.getAbsolutePath()); + catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, warehouseLocation.getAbsolutePath()); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HADOOP; } @Override - public Configuration getConfiguration() { - return catalog.getConf(); + public Map<IcebergCatalogProperty, String> getCatalogProperties() { + return catalogProperties; } + @Override + public List<String> getConfigFilePaths() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java index 3a65e944f5..0cd7f042a8 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java @@ -17,28 +17,47 @@ */ package org.apache.nifi.processors.iceberg.catalog; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.apache.nifi.services.iceberg.IcebergCatalogType; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService { - private final HiveCatalog catalog; + private final List<String> configFilePaths; + private final Map<IcebergCatalogProperty, String> catalogProperties; + + public TestHiveCatalogService(Map<IcebergCatalogProperty, String> catalogProperties, List<String> configFilePaths) { + this.catalogProperties = catalogProperties; + this.configFilePaths = configFilePaths; + } + + @Override + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HIVE; + } + + @Override + public Map<IcebergCatalogProperty, String> getCatalogProperties() { + return catalogProperties; + } - public TestHiveCatalogService(HiveCatalog catalog) { - this.catalog = catalog; + @Override + public List<String> getConfigFilePaths() { + return configFilePaths; } public static class Builder { private String metastoreUri; private String warehouseLocation; - private Configuration config; + private List<String> configFilePaths; public Builder withMetastoreUri(String metastoreUri) { this.metastoreUri = metastoreUri; @@ -50,40 +69,23 @@ public class TestHiveCatalogService extends AbstractControllerService implements return this; } - public Builder withConfig(Configuration config) { - this.config = config; + public Builder withConfigFilePaths(List<String> configFilePaths) { + this.configFilePaths = configFilePaths; return this; } public TestHiveCatalogService build() { - HiveCatalog catalog = new HiveCatalog(); - Map<String, String> properties = new HashMap<>(); + Map<IcebergCatalogProperty, String> properties = new HashMap<>(); if (metastoreUri != null) { - properties.put(CatalogProperties.URI, metastoreUri); + properties.put(METASTORE_URI, metastoreUri); } if (warehouseLocation != null) { - properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + properties.put(WAREHOUSE_LOCATION, warehouseLocation); } - if (config != null) { - catalog.setConf(config); - } - - catalog.initialize("hive-catalog", properties); - return new TestHiveCatalogService(catalog); + return new TestHiveCatalogService(properties, configFilePaths); } } - - @Override - public Catalog getCatalog() { - return catalog; - } - - @Override - public Configuration getConfiguration() { - return catalog.getConf(); - } - } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml new file mode 100644 index 0000000000..0fd06a5383 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml @@ -0,0 +1,22 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>hadoop.security.authentication</name> + <value>kerberos</value> + </property> +</configuration> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml new file mode 100644 index 0000000000..d590a5039c --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml @@ -0,0 +1,22 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>hadoop.security.authentication</name> + <value>simple</value> + </property> +</configuration> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml index 3ce733bd25..4e7d773945 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml @@ -37,166 +37,4 @@ <type>nar</type> </dependency> </dependencies> - - <profiles> - <!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem --> - <profile> - <id>include-hadoop-aws</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-aws</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> - </profile> - <!-- Includes hadoop-azure and hadoop-azure-datalake for accessing HDFS with wasb://, abfs://, and adl:// filesystems --> - <profile> - <id>include-hadoop-azure</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-azure</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-azure-datalake</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - <!-- Includes hadoop-cloud-storage --> - <profile> - <id>include-hadoop-cloud-storage</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-cloud-storage</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - <!-- Includes hadoop-ozone for o3fs:// file system --> - <profile> - <id>include-hadoop-ozone</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.ozone</groupId> - <artifactId>ozone-client</artifactId> - <version>${ozone.version}</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk15on</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.ozone</groupId> - <artifactId>ozone-filesystem</artifactId> - <version>${ozone.version}</version> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk18on</artifactId> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk18on</artifactId> - </dependency> - </dependencies> - </profile> - <!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem --> - <profile> - <id>include-hadoop-gcp</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>gcs-connector</artifactId> - <version>hadoop3-${gcs.version}</version> - </dependency> - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>util</artifactId> - <version>${gcs.version}</version> - </dependency> - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>util-hadoop</artifactId> - <version>hadoop3-${gcs.version}</version> - </dependency> - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>gcsio</artifactId> - <version>${gcs.version}</version> - </dependency> - </dependencies> - </profile> - </profiles> </project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml index f7783db574..9cf2556a9e 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml @@ -31,120 +31,5 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> - - <!-- External dependencies --> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-hive-metastore</artifactId> - <version>${iceberg.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-parquet</artifactId> - <version>${iceberg.version}</version> - </dependency> - <dependency> - <groupId>org.apache.iceberg</groupId> - <artifactId>iceberg-orc</artifactId> - <version>${iceberg.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-metastore</artifactId> - <version>${hive.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-web</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j-impl</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - </exclusion> - <exclusion> - <groupId>co.cask.tephra</groupId> - <artifactId>tephra-api</artifactId> - </exclusion> - <exclusion> - <groupId>co.cask.tephra</groupId> - <artifactId>tephra-core</artifactId> - </exclusion> - <exclusion> - <groupId>co.cask.tephra</groupId> - <artifactId>tephra-hbase-compat-1.0</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-hadoop-bundle</artifactId> - </exclusion> - <exclusion> - <groupId>com.tdunning</groupId> - <artifactId>json</artifactId> - </exclusion> - <exclusion> - <groupId>com.zaxxer</groupId> - <artifactId>HikariCP</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> </project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java similarity index 68% copy from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java copy to nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java index 991ac625dc..f4c55c39c8 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java @@ -17,16 +17,19 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.nifi.controller.ControllerService; +public enum IcebergCatalogProperty { -/** - * Provides a basic connector to Iceberg catalog services. - */ -public interface IcebergCatalogService extends ControllerService { + METASTORE_URI("hive.metastore.uris"), + WAREHOUSE_LOCATION("hive.metastore.warehouse.dir"); + + private final String hadoopPropertyName; + + IcebergCatalogProperty(String hadoopPropertyName) { + this.hadoopPropertyName = hadoopPropertyName; + } - Catalog getCatalog(); + public String getHadoopPropertyName() { + return hadoopPropertyName; + } - Configuration getConfiguration(); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java index 991ac625dc..56e595d2e9 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java @@ -17,16 +17,19 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; import org.apache.nifi.controller.ControllerService; +import java.util.List; +import java.util.Map; + /** * Provides a basic connector to Iceberg catalog services. */ public interface IcebergCatalogService extends ControllerService { - Catalog getCatalog(); + IcebergCatalogType getCatalogType(); + + Map<IcebergCatalogProperty, String> getCatalogProperties(); - Configuration getConfiguration(); + List<String> getConfigFilePaths(); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java similarity index 71% copy from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java copy to nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java index 991ac625dc..4b8640da1d 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java @@ -17,16 +17,7 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.nifi.controller.ControllerService; - -/** - * Provides a basic connector to Iceberg catalog services. - */ -public interface IcebergCatalogService extends ControllerService { - - Catalog getCatalog(); - - Configuration getConfiguration(); +public enum IcebergCatalogType { + HIVE, + HADOOP } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml index 2a22d49aac..e81de19917 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml @@ -36,6 +36,11 @@ <artifactId>nifi-utils</artifactId> <version>2.0.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-xml-processing</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java index 38f156c68d..7afc68a6a0 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java @@ -17,14 +17,24 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.w3c.dom.Document; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** @@ -32,7 +42,9 @@ import org.apache.nifi.expression.ExpressionLanguageScope; */ public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService { - protected Configuration configuration = new Configuration(); + protected Map<IcebergCatalogProperty, String> catalogProperties = new HashMap<>(); + + protected List<String> configFilePaths; static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() .name("hadoop-config-resources") @@ -44,24 +56,38 @@ public abstract class AbstractCatalogService extends AbstractControllerService i .dynamicallyModifiesClasspath(true) .build(); - /** - * Loads configuration files from the provided paths. - * - * @param configFiles list of config file paths separated with comma - * @return merged configuration - */ - protected Configuration getConfigurationFromFiles(String configFiles) { - final Configuration conf = new Configuration(); - if (StringUtils.isNotBlank(configFiles)) { - for (final String configFile : configFiles.split(",")) { - conf.addResource(new Path(configFile.trim())); + protected List<Document> parseConfigFilePaths(String configFilePaths) { + List<Document> documentList = new ArrayList<>(); + for (final String configFile : createFilePathList(configFilePaths)) { + File file = new File(configFile.trim()); + try (final InputStream fis = new FileInputStream(file); + final InputStream in = new BufferedInputStream(fis)) { + final StandardDocumentProvider documentProvider = new StandardDocumentProvider(); + documentList.add(documentProvider.parse(in)); + } catch (IOException e) { + throw new ProcessException("Failed to load config files", e); } } - return conf; + return documentList; + } + + protected List<String> createFilePathList(String configFilePaths) { + List<String> filePathList = new ArrayList<>(); + if (configFilePaths != null && !configFilePaths.trim().isEmpty()) { + for (final String configFile : configFilePaths.split(",")) { + filePathList.add(configFile.trim()); + } + } + return filePathList; + } + + @Override + public Map<IcebergCatalogProperty, String> getCatalogProperties() { + return catalogProperties; } @Override - public Configuration getConfiguration() { - return configuration; + public List<String> getConfigFilePaths() { + return configFilePaths; } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java index dcf2dd395f..8f62e1e183 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + @Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"}) @CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.") public class HadoopCatalogService extends AbstractCatalogService { @@ -39,6 +39,7 @@ public class HadoopCatalogService extends AbstractCatalogService { .name("warehouse-path") .displayName("Warehouse Path") .description("Path to the location of the warehouse.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -53,25 +54,18 @@ public class HadoopCatalogService extends AbstractCatalogService { return PROPERTIES; } - private HadoopCatalog catalog; - @OnEnabled public void onEnabled(final ConfigurationContext context) { - final String warehousePath = context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue(); - if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { - final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - - configuration = getConfigurationFromFiles(configFiles); - catalog = new HadoopCatalog(configuration, warehousePath); - } else { - catalog = new HadoopCatalog(new Configuration(), warehousePath); + configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue()); } + + catalogProperties.put(WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue()); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HADOOP; } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java index 25bafe8116..e609981d46 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java @@ -17,10 +17,6 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -30,14 +26,14 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; @Tags({"iceberg", "catalog", "service", "metastore", "hive"}) @CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.") @@ -47,7 +43,7 @@ public class HiveCatalogService extends AbstractCatalogService { .name("hive-metastore-uri") .displayName("Hive Metastore URI") .description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.URI_LIST_VALIDATOR) .build(); @@ -55,6 +51,7 @@ public class HiveCatalogService extends AbstractCatalogService { .name("warehouse-location") .displayName("Default Warehouse Location") .description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -69,14 +66,12 @@ public class HiveCatalogService extends AbstractCatalogService { return PROPERTIES; } - private HiveCatalog catalog; - @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { final List<ValidationResult> problems = new ArrayList<>(); - String configMetastoreUri = null; - String configWarehouseLocation = null; + boolean configMetastoreUriPresent = false; + boolean configWarehouseLocationPresent = false; final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue(); @@ -84,13 +79,30 @@ public class HiveCatalogService extends AbstractCatalogService { // Load the configurations for validation only if any config resource is provided and if either the metastore URI or the warehouse location property is missing if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && (propertyMetastoreUri == null || propertyWarehouseLocation == null)) { final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + final List<Document> documents = parseConfigFilePaths(configFiles); + + for (Document document : documents) { + final NodeList nameNodeList = document.getElementsByTagName("name"); + + for (int i = 0; i < nameNodeList.getLength(); i++) { + final String nodeValue = nameNodeList.item(i).getFirstChild().getNodeValue(); + + if (nodeValue.equals(IcebergCatalogProperty.METASTORE_URI.getHadoopPropertyName())) { + configMetastoreUriPresent = true; + } - Configuration configuration = getConfigurationFromFiles(configFiles); - configMetastoreUri = configuration.get("hive.metastore.uris"); - configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir"); + if (nodeValue.equals(IcebergCatalogProperty.WAREHOUSE_LOCATION.getHadoopPropertyName())) { + configWarehouseLocationPresent = true; + } + + if (configMetastoreUriPresent && configWarehouseLocationPresent) { + break; + } + } + } } - if (configMetastoreUri == null && propertyMetastoreUri == null) { + if (!configMetastoreUriPresent && propertyMetastoreUri == null) { problems.add(new ValidationResult.Builder() .subject("Hive Metastore URI") .valid(false) @@ -99,7 +111,7 @@ public class HiveCatalogService extends AbstractCatalogService { .build()); } - if (configWarehouseLocation == null && propertyWarehouseLocation == null) { + if (!configWarehouseLocationPresent && propertyWarehouseLocation == null) { problems.add(new ValidationResult.Builder() .subject("Default Warehouse Location") .valid(false) @@ -113,29 +125,21 @@ public class HiveCatalogService extends AbstractCatalogService { @OnEnabled public void onEnabled(final ConfigurationContext context) { - catalog = new HiveCatalog(); - Map<String, String> properties = new HashMap<>(); - if (context.getProperty(METASTORE_URI).isSet()) { - properties.put(CatalogProperties.URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue()); + catalogProperties.put(IcebergCatalogProperty.METASTORE_URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue()); } if (context.getProperty(WAREHOUSE_LOCATION).isSet()) { - properties.put(CatalogProperties.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue()); + catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue()); } if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { - final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - - configuration = getConfigurationFromFiles(configFiles); - catalog.setConf(configuration); + configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue()); } - - catalog.initialize("hive-catalog", properties); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HIVE; } }