This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b1be71f918 NIFI-11334: Fixed PutIceberg processor instance 
interference due to same class loader usage
b1be71f918 is described below

commit b1be71f918e45497099b069d04482bde8aff025d
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Thu Jun 29 08:59:43 2023 +0200

    NIFI-11334: Fixed PutIceberg processor instance interference due to same 
class loader usage
    
    This closes #7449.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi-iceberg-processors-nar/pom.xml            | 258 +++++++++++++--------
 .../nifi-iceberg-processors/pom.xml                | 102 +++++++-
 .../iceberg/AbstractIcebergProcessor.java          |  22 +-
 .../nifi/processors/iceberg/IcebergUtils.java}     |  28 ++-
 .../apache/nifi/processors/iceberg/PutIceberg.java |   7 +-
 .../iceberg/catalog/IcebergCatalogFactory.java     |  87 +++++++
 .../processors/iceberg/TestDataFileActions.java    |   4 +-
 .../iceberg/TestPutIcebergCustomValidation.java    |  24 +-
 .../iceberg/TestPutIcebergWithHadoopCatalog.java   |  15 +-
 .../iceberg/TestPutIcebergWithHiveCatalog.java     |  29 ++-
 .../iceberg/catalog/TestHadoopCatalogService.java  |  25 +-
 .../iceberg/catalog/TestHiveCatalogService.java    |  64 ++---
 .../src/test/resources/secured-core-site.xml       |  22 ++
 .../src/test/resources/unsecured-core-site.xml     |  22 ++
 .../nifi-iceberg-services-api-nar/pom.xml          | 162 -------------
 .../nifi-iceberg-services-api/pom.xml              | 115 ---------
 ...logService.java => IcebergCatalogProperty.java} |  21 +-
 .../services/iceberg/IcebergCatalogService.java    |  11 +-
 ...CatalogService.java => IcebergCatalogType.java} |  15 +-
 .../nifi-iceberg-services/pom.xml                  |   5 +
 .../services/iceberg/AbstractCatalogService.java   |  62 +++--
 .../services/iceberg/HadoopCatalogService.java     |  24 +-
 .../nifi/services/iceberg/HiveCatalogService.java  |  62 ++---
 23 files changed, 631 insertions(+), 555 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
index 3c7e102bca..4ed687eb42 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml
@@ -43,101 +43,165 @@
         </dependency>
     </dependencies>
 
-    <dependencyManagement>
-        <dependencies>
-            <!-- Provided through nifi-iceberg-services-api -->
-            <dependency>
-                <groupId>org.apache.iceberg</groupId>
-                <artifactId>iceberg-core</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.hive</groupId>
-                <artifactId>hive-shims</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.codehaus.groovy</groupId>
-                <artifactId>groovy-all</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-common</artifactId>
-                <scope>provided</scope>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-reload4j</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-yarn-api</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-yarn-registry</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.zookeeper</groupId>
-                <artifactId>zookeeper</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.curator</groupId>
-                <artifactId>curator-client</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.curator</groupId>
-                <artifactId>curator-framework</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>com.fasterxml.jackson.core</groupId>
-                <artifactId>jackson-databind</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>com.fasterxml.jackson.core</groupId>
-                <artifactId>jackson-core</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>com.fasterxml.jackson.core</groupId>
-                <artifactId>jackson-annotations</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.xerial.snappy</groupId>
-                <artifactId>snappy-java</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.ant</groupId>
-                <artifactId>ant</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.ivy</groupId>
-                <artifactId>ivy</artifactId>
-                <scope>provided</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.orc</groupId>
-                <artifactId>orc-core</artifactId>
-                <scope>provided</scope>
-                <classifier>nohive</classifier>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.parquet</groupId>
-                <artifactId>parquet-avro</artifactId>
-                <scope>provided</scope>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
+    <profiles>
+        <!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem 
-->
+        <profile>
+            <id>include-hadoop-aws</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-aws</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <!-- Includes hadoop-azure and hadoop-azure-datalake for accessing 
HDFS with wasb://, abfs://, and adl:// filesystems -->
+        <profile>
+            <id>include-hadoop-azure</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-azure</artifactId>
+                    <version>${hadoop.version}</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>com.google.guava</groupId>
+                            <artifactId>guava</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>com.fasterxml.jackson.core</groupId>
+                            <artifactId>jackson-core</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>commons-logging</groupId>
+                            <artifactId>commons-logging</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-azure-datalake</artifactId>
+                    <version>${hadoop.version}</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>com.fasterxml.jackson.core</groupId>
+                            <artifactId>jackson-core</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+        <!-- Includes hadoop-cloud-storage -->
+        <profile>
+            <id>include-hadoop-cloud-storage</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-cloud-storage</artifactId>
+                    <version>${hadoop.version}</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>log4j</groupId>
+                            <artifactId>log4j</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.logging.log4j</groupId>
+                            <artifactId>log4j-core</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.slf4j</groupId>
+                            <artifactId>slf4j-log4j12</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>commons-logging</groupId>
+                            <artifactId>commons-logging</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+        <!-- Includes hadoop-ozone for o3fs:// file system -->
+        <profile>
+            <id>include-hadoop-ozone</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.ozone</groupId>
+                    <artifactId>ozone-client</artifactId>
+                    <version>${ozone.version}</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>commons-logging</groupId>
+                            <artifactId>commons-logging</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.logging.log4j</groupId>
+                            <artifactId>log4j-core</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.bouncycastle</groupId>
+                            <artifactId>bcprov-jdk15on</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.bouncycastle</groupId>
+                            <artifactId>bcpkix-jdk15on</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.ozone</groupId>
+                    <artifactId>ozone-filesystem</artifactId>
+                    <version>${ozone.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk18on</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcpkix-jdk18on</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+        <!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem 
-->
+        <profile>
+            <id>include-hadoop-gcp</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>com.google.cloud.bigdataoss</groupId>
+                    <artifactId>gcs-connector</artifactId>
+                    <version>hadoop3-${gcs.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>com.google.cloud.bigdataoss</groupId>
+                    <artifactId>util</artifactId>
+                    <version>${gcs.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>com.google.cloud.bigdataoss</groupId>
+                    <artifactId>util-hadoop</artifactId>
+                    <version>hadoop3-${gcs.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>com.google.cloud.bigdataoss</groupId>
+                    <artifactId>gcsio</artifactId>
+                    <version>${gcs.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 </project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
index 61d40d0365..96e67ce03f 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml
@@ -74,13 +74,27 @@
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
             <version>${iceberg.version}</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-metastore</artifactId>
+            <version>${iceberg.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-data</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-orc</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
@@ -171,18 +185,100 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
+            <artifactId>hadoop-client</artifactId>
             <version>${hadoop.version}</version>
-            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>commons-logging</groupId>
                     <artifactId>commons-logging</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>co.cask.tephra</groupId>
+                    <artifactId>tephra-hbase-compat-1.0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-hadoop-bundle</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.tdunning</groupId>
+                    <artifactId>json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.zaxxer</groupId>
+                    <artifactId>HikariCP</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
index 9f527344ec..3352959d17 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
@@ -18,9 +18,12 @@
 package org.apache.nifi.processors.iceberg;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -36,11 +39,13 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
+import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
 
 /**
  * Base Iceberg processor class.
  */
-public abstract class AbstractIcebergProcessor extends AbstractProcessor {
+@RequiresInstanceClassLoading(cloneAncestorResources = true)
+public abstract class AbstractIcebergProcessor extends AbstractProcessor 
implements ClassloaderIsolationKeyProvider {
 
     public static final PropertyDescriptor CATALOG = new 
PropertyDescriptor.Builder()
             .name("catalog-service")
@@ -66,14 +71,14 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor {
     private volatile UserGroupInformation ugi;
 
     @OnScheduled
-    public final void onScheduled(final ProcessContext context) {
+    public void onScheduled(final ProcessContext context) {
         final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
         final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
 
         if (kerberosUserService != null) {
             this.kerberosUser = kerberosUserService.createKerberosUser();
             try {
-                this.ugi = 
getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser);
+                this.ugi = 
getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()),
 kerberosUser);
             } catch (IOException e) {
                 throw new ProcessException("Kerberos Authentication failed", 
e);
             }
@@ -81,7 +86,7 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor {
     }
 
     @OnStopped
-    public final void onStopped() {
+    public void onStopped() {
         if (kerberosUser != null) {
             try {
                 kerberosUser.logout();
@@ -117,6 +122,15 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor {
         }
     }
 
+    @Override
+    public String getClassloaderIsolationKey(PropertyContext context) {
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        if (kerberosUserService != null) {
+            return kerberosUserService.getIdentifier();
+        }
+        return null;
+    }
+
     private UserGroupInformation getUgi() {
         try {
             kerberosUser.checkTGTAndRelogin();
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
similarity index 55%
copy from 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
copy to 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
index 991ac625dc..7a3db7de71 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
@@ -15,18 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.services.iceberg;
+package org.apache.nifi.processors.iceberg;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.nifi.controller.ControllerService;
+import org.apache.hadoop.fs.Path;
 
-/**
- * Provides a basic connector to Iceberg catalog services.
- */
-public interface IcebergCatalogService extends ControllerService {
+import java.util.List;
 
-    Catalog getCatalog();
+public class IcebergUtils {
 
-    Configuration getConfiguration();
+    /**
+     * Loads configuration files from the provided paths.
+     *
+     * @param configFilePaths list of config file paths separated with comma
+     * @return merged configuration
+     */
+    public static Configuration getConfigurationFromFiles(List<String> 
configFilePaths) {
+        final Configuration conf = new Configuration();
+        if (configFilePaths != null) {
+            for (final String configFile : configFilePaths) {
+                conf.addResource(new Path(configFile.trim()));
+            }
+        }
+        return conf;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 20f67093ab..360ea17f1b 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
 import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
 import org.apache.nifi.serialization.RecordReader;
@@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
 
 @Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", 
"parquet", "avro"})
 @CapabilityDescription("This processor uses Iceberg API to parse and load 
records into Iceberg tables. " +
@@ -208,7 +210,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
 
         if (catalogServiceEnabled) {
             final boolean kerberosUserServiceIsSet = 
context.getProperty(KERBEROS_USER_SERVICE).isSet();
-            final boolean securityEnabled = 
SecurityUtil.isSecurityEnabled(catalogService.getConfiguration());
+            final boolean securityEnabled = 
SecurityUtil.isSecurityEnabled(getConfigurationFromFiles(catalogService.getConfigFilePaths()));
 
             if (securityEnabled && !kerberosUserServiceIsSet) {
                 problems.add(new ValidationResult.Builder()
@@ -293,7 +295,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
         final String catalogNamespace = 
context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
-        final Catalog catalog = catalogService.getCatalog();
+        final IcebergCatalogFactory catalogFactory = new 
IcebergCatalogFactory(catalogService);
+        final Catalog catalog = catalogFactory.create();
 
         final Namespace namespace = Namespace.of(catalogNamespace);
         final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
tableName);
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java
new file mode 100644
index 0000000000..1b1e058090
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
+import org.apache.nifi.services.iceberg.IcebergCatalogService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
+import static 
org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI;
+import static 
org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
+
+public class IcebergCatalogFactory {
+
+    private final IcebergCatalogService catalogService;
+
+    public IcebergCatalogFactory(IcebergCatalogService catalogService) {
+        this.catalogService = catalogService;
+    }
+
+    public Catalog create() {
+        switch (catalogService.getCatalogType()) {
+            case HIVE:
+                return initHiveCatalog(catalogService);
+            case HADOOP:
+                return initHadoopCatalog(catalogService);
+            default:
+                throw new IllegalArgumentException("Unknown catalog type: " + 
catalogService.getCatalogType());
+        }
+    }
+
+    private Catalog initHiveCatalog(IcebergCatalogService catalogService) {
+        HiveCatalog catalog = new HiveCatalog();
+
+        if (catalogService.getConfigFilePaths() != null) {
+            final Configuration configuration = 
getConfigurationFromFiles(catalogService.getConfigFilePaths());
+            catalog.setConf(configuration);
+        }
+
+        final Map<IcebergCatalogProperty, String> catalogProperties = 
catalogService.getCatalogProperties();
+        final Map <String, String> properties = new HashMap<>();
+
+        if (catalogProperties.containsKey(METASTORE_URI)) {
+            properties.put(CatalogProperties.URI, 
catalogProperties.get(METASTORE_URI));
+        }
+
+        if (catalogProperties.containsKey(WAREHOUSE_LOCATION)) {
+            properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
catalogProperties.get(WAREHOUSE_LOCATION));
+        }
+
+        catalog.initialize("hive-catalog", properties);
+        return catalog;
+    }
+
+    private Catalog initHadoopCatalog(IcebergCatalogService catalogService) {
+        final Map<IcebergCatalogProperty, String> catalogProperties = 
catalogService.getCatalogProperties();
+        final String warehousePath = catalogProperties.get(WAREHOUSE_LOCATION);
+
+        if (catalogService.getConfigFilePaths() != null) {
+            return new 
HadoopCatalog(getConfigurationFromFiles(catalogService.getConfigFilePaths()), 
warehousePath);
+        } else {
+            return new HadoopCatalog(new Configuration(), warehousePath);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
index ee6b4c0e19..4e535c3f8a 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.types.Types;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
 import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
 import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
@@ -193,7 +194,8 @@ public class TestDataFileActions {
 
     private Table initCatalog() throws IOException {
         TestHadoopCatalogService catalogService = new 
TestHadoopCatalogService();
-        Catalog catalog = catalogService.getCatalog();
+        IcebergCatalogFactory catalogFactory = new 
IcebergCatalogFactory(catalogService);
+        Catalog catalog = catalogFactory.create();
 
         return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, 
PartitionSpec.unpartitioned());
     }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
index 7ca4bde3ac..36f14cedb8 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java
@@ -17,7 +17,6 @@
  */
 package org.apache.nifi.processors.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
 import org.apache.nifi.reporting.InitializationException;
@@ -27,6 +26,9 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.List;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -56,8 +58,8 @@ public class TestPutIcebergCustomValidation {
         runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME);
     }
 
-    private void initCatalogService(Configuration configuration) throws 
InitializationException {
-        TestHiveCatalogService catalogService = new 
TestHiveCatalogService.Builder().withConfig(configuration).build();
+    private void initCatalogService(List<String> configFilePaths) throws 
InitializationException {
+        TestHiveCatalogService catalogService = new 
TestHiveCatalogService.Builder().withConfigFilePaths(configFilePaths).build();
 
         runner.addControllerService(CATALOG_SERVICE_NAME, catalogService);
         runner.enableControllerService(catalogService);
@@ -78,10 +80,7 @@ public class TestPutIcebergCustomValidation {
     @Test
     public void 
testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() 
throws InitializationException {
         initRecordReader();
-
-        Configuration config = new Configuration();
-        config.set("hadoop.security.authentication", "kerberos");
-        initCatalogService(config);
+        
initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml"));
 
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
@@ -91,10 +90,7 @@ public class TestPutIcebergCustomValidation {
     @Test
     public void 
testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws 
InitializationException {
         initRecordReader();
-
-        Configuration config = new Configuration();
-        config.set("hadoop.security.authentication", "kerberos");
-        initCatalogService(config);
+        
initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml"));
 
         initKerberosUserService();
 
@@ -106,8 +102,7 @@ public class TestPutIcebergCustomValidation {
     @Test
     public void 
testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws 
InitializationException {
         initRecordReader();
-
-        initCatalogService(new Configuration());
+        
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
 
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
         runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
@@ -117,8 +112,7 @@ public class TestPutIcebergCustomValidation {
     @Test
     public void 
testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() 
throws InitializationException {
         initRecordReader();
-
-        initCatalogService(new Configuration());
+        
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
 
         initKerberosUserService();
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
index 9b75ee9ef6..ff8f5a9a3e 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -62,6 +63,7 @@ public class TestPutIcebergWithHadoopCatalog {
     private TestRunner runner;
     private PutIceberg processor;
     private Schema inputSchema;
+    private Catalog catalog;
 
     private static final Namespace NAMESPACE = Namespace.of("default");
 
@@ -100,9 +102,10 @@ public class TestPutIcebergWithHadoopCatalog {
         runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
     }
 
-    private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException, IOException {
+    private void initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException, IOException {
         TestHadoopCatalogService catalogService = new 
TestHadoopCatalogService();
-        Catalog catalog = catalogService.getCatalog();
+        IcebergCatalogFactory catalogFactory = new 
IcebergCatalogFactory(catalogService);
+        catalog = catalogFactory.create();
 
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put(TableProperties.FORMAT_VERSION, "2");
@@ -114,8 +117,6 @@ public class TestPutIcebergWithHadoopCatalog {
         runner.enableControllerService(catalogService);
 
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
-
-        return catalog;
     }
 
     @DisabledOnOs(WINDOWS)
@@ -128,7 +129,7 @@ public class TestPutIcebergWithHadoopCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
+        initCatalog(spec, fileFormat);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
@@ -156,7 +157,7 @@ public class TestPutIcebergWithHadoopCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
+        initCatalog(spec, fileFormat);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
@@ -185,7 +186,7 @@ public class TestPutIcebergWithHadoopCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
+        initCatalog(spec, fileFormat);
         runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
         runner.setProperty(PutIceberg.TABLE_NAME, "date");
         runner.setValidateExpressionUsage(false);
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index 3ec70c3d37..c672d90e8b 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -22,12 +22,14 @@ import org.apache.commons.io.IOUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Types;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.hive.metastore.ThriftMetastore;
+import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
 import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
 import org.apache.nifi.reporting.InitializationException;
@@ -66,7 +68,7 @@ public class TestPutIcebergWithHiveCatalog {
     private TestRunner runner;
     private PutIceberg processor;
     private Schema inputSchema;
-    private TestHiveCatalogService catalogService;
+    private Catalog catalog;
 
     @RegisterExtension
     public static ThriftMetastore metastore = new ThriftMetastore();
@@ -90,16 +92,11 @@ public class TestPutIcebergWithHiveCatalog {
         inputSchema = new Schema.Parser().parse(avroSchema);
 
         processor = new PutIceberg();
-
-        catalogService = new TestHiveCatalogService.Builder()
-                .withMetastoreUri(metastore.getThriftConnectionUri())
-                .withWarehouseLocation(metastore.getWarehouseLocation())
-                .build();
     }
 
     @AfterEach
     public void tearDown() {
-        catalogService.getCatalog().dropTable(TABLE_IDENTIFIER);
+        catalog.dropTable(TABLE_IDENTIFIER);
     }
 
     private void initRecordReader() throws InitializationException {
@@ -126,7 +123,15 @@ public class TestPutIcebergWithHiveCatalog {
         tableProperties.put(TableProperties.FORMAT_VERSION, "2");
         tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
 
-        catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, 
spec, tableProperties);
+        TestHiveCatalogService catalogService = new 
TestHiveCatalogService.Builder()
+                .withMetastoreUri(metastore.getThriftConnectionUri())
+                .withWarehouseLocation(metastore.getWarehouseLocation())
+                .build();
+
+        IcebergCatalogFactory catalogFactory = new 
IcebergCatalogFactory(catalogService);
+        catalog = catalogFactory.create();
+
+        catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, 
tableProperties);
 
         runner.addControllerService("catalog-service", catalogService);
         runner.enableControllerService(catalogService);
@@ -150,7 +155,7 @@ public class TestPutIcebergWithHiveCatalog {
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -187,7 +192,7 @@ public class TestPutIcebergWithHiveCatalog {
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -225,7 +230,7 @@ public class TestPutIcebergWithHiveCatalog {
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -266,7 +271,7 @@ public class TestPutIcebergWithHiveCatalog {
         runner.enqueue(new byte[0], attributes);
         runner.run();
 
-        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
+        Table table = catalog.loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
index 111c4c5720..9673b894a3 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java
@@ -17,35 +17,40 @@
  */
 package org.apache.nifi.processors.iceberg.catalog;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
 import org.apache.nifi.services.iceberg.IcebergCatalogService;
+import org.apache.nifi.services.iceberg.IcebergCatalogType;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static java.nio.file.Files.createTempDirectory;
 
 public class TestHadoopCatalogService extends AbstractControllerService 
implements IcebergCatalogService {
 
-    private final HadoopCatalog catalog;
+    private final Map<IcebergCatalogProperty, String> catalogProperties = new 
HashMap<>();
 
     public TestHadoopCatalogService() throws IOException {
         File warehouseLocation = createTempDirectory("metastore").toFile();
-
-        catalog =  new HadoopCatalog(new Configuration(), 
warehouseLocation.getAbsolutePath());
+        catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, 
warehouseLocation.getAbsolutePath());
     }
 
     @Override
-    public Catalog getCatalog() {
-        return catalog;
+    public IcebergCatalogType getCatalogType() {
+        return IcebergCatalogType.HADOOP;
     }
 
     @Override
-    public Configuration getConfiguration() {
-        return catalog.getConf();
+    public Map<IcebergCatalogProperty, String> getCatalogProperties() {
+        return catalogProperties;
     }
 
+    @Override
+    public List<String> getConfigFilePaths() {
+        return null;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
index 3a65e944f5..0cd7f042a8 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java
@@ -17,28 +17,47 @@
  */
 package org.apache.nifi.processors.iceberg.catalog;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
 import org.apache.nifi.services.iceberg.IcebergCatalogService;
+import org.apache.nifi.services.iceberg.IcebergCatalogType;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI;
+import static 
org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
+
 public class TestHiveCatalogService extends AbstractControllerService 
implements IcebergCatalogService {
 
-    private final HiveCatalog catalog;
+    private final List<String> configFilePaths;
+    private final Map<IcebergCatalogProperty, String> catalogProperties;
+
+    public TestHiveCatalogService(Map<IcebergCatalogProperty, String> 
catalogProperties, List<String> configFilePaths) {
+        this.catalogProperties = catalogProperties;
+        this.configFilePaths = configFilePaths;
+    }
+
+    @Override
+    public IcebergCatalogType getCatalogType() {
+        return IcebergCatalogType.HIVE;
+    }
+
+    @Override
+    public Map<IcebergCatalogProperty, String> getCatalogProperties() {
+        return catalogProperties;
+    }
 
-    public TestHiveCatalogService(HiveCatalog catalog) {
-        this.catalog = catalog;
+    @Override
+    public List<String> getConfigFilePaths() {
+        return configFilePaths;
     }
 
     public static class Builder {
         private String metastoreUri;
         private String warehouseLocation;
-        private Configuration config;
+        private List<String> configFilePaths;
 
         public Builder withMetastoreUri(String metastoreUri) {
             this.metastoreUri = metastoreUri;
@@ -50,40 +69,23 @@ public class TestHiveCatalogService extends 
AbstractControllerService implements
             return this;
         }
 
-        public Builder withConfig(Configuration config) {
-            this.config = config;
+        public Builder withConfigFilePaths(List<String> configFilePaths) {
+            this.configFilePaths = configFilePaths;
             return this;
         }
 
         public TestHiveCatalogService build() {
-            HiveCatalog catalog = new HiveCatalog();
-            Map<String, String> properties = new HashMap<>();
+            Map<IcebergCatalogProperty, String> properties = new HashMap<>();
 
             if (metastoreUri != null) {
-                properties.put(CatalogProperties.URI, metastoreUri);
+                properties.put(METASTORE_URI, metastoreUri);
             }
 
             if (warehouseLocation != null) {
-                properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
warehouseLocation);
+                properties.put(WAREHOUSE_LOCATION, warehouseLocation);
             }
 
-            if (config != null) {
-                catalog.setConf(config);
-            }
-
-            catalog.initialize("hive-catalog", properties);
-            return new TestHiveCatalogService(catalog);
+            return new TestHiveCatalogService(properties, configFilePaths);
         }
     }
-
-    @Override
-    public Catalog getCatalog() {
-        return catalog;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        return catalog.getConf();
-    }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml
new file mode 100644
index 0000000000..0fd06a5383
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>hadoop.security.authentication</name>
+        <value>kerberos</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml
new file mode 100644
index 0000000000..d590a5039c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>hadoop.security.authentication</name>
+        <value>simple</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
index 3ce733bd25..4e7d773945 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml
@@ -37,166 +37,4 @@
             <type>nar</type>
         </dependency>
     </dependencies>
-
-    <profiles>
-        <!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem 
-->
-        <profile>
-            <id>include-hadoop-aws</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-aws</artifactId>
-                    <version>${hadoop.version}</version>
-                </dependency>
-            </dependencies>
-        </profile>
-        <!-- Includes hadoop-azure and hadoop-azure-datalake for accessing 
HDFS with wasb://, abfs://, and adl:// filesystems -->
-        <profile>
-            <id>include-hadoop-azure</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-azure</artifactId>
-                    <version>${hadoop.version}</version>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>com.google.guava</groupId>
-                            <artifactId>guava</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>com.fasterxml.jackson.core</groupId>
-                            <artifactId>jackson-core</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>commons-logging</groupId>
-                            <artifactId>commons-logging</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-azure-datalake</artifactId>
-                    <version>${hadoop.version}</version>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>com.fasterxml.jackson.core</groupId>
-                            <artifactId>jackson-core</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-            </dependencies>
-        </profile>
-        <!-- Includes hadoop-cloud-storage -->
-        <profile>
-            <id>include-hadoop-cloud-storage</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-cloud-storage</artifactId>
-                    <version>${hadoop.version}</version>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>log4j</groupId>
-                            <artifactId>log4j</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.logging.log4j</groupId>
-                            <artifactId>log4j-core</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.slf4j</groupId>
-                            <artifactId>slf4j-log4j12</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>commons-logging</groupId>
-                            <artifactId>commons-logging</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-            </dependencies>
-        </profile>
-        <!-- Includes hadoop-ozone for o3fs:// file system -->
-        <profile>
-            <id>include-hadoop-ozone</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.ozone</groupId>
-                    <artifactId>ozone-client</artifactId>
-                    <version>${ozone.version}</version>
-                    <exclusions>
-                        <exclusion>
-                            <groupId>commons-logging</groupId>
-                            <artifactId>commons-logging</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.logging.log4j</groupId>
-                            <artifactId>log4j-core</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.bouncycastle</groupId>
-                            <artifactId>bcprov-jdk15on</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.bouncycastle</groupId>
-                            <artifactId>bcpkix-jdk15on</artifactId>
-                        </exclusion>
-                    </exclusions>
-                </dependency>
-                <dependency>
-                    <groupId>org.apache.ozone</groupId>
-                    <artifactId>ozone-filesystem</artifactId>
-                    <version>${ozone.version}</version>
-                </dependency>
-                <dependency>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-jdk18on</artifactId>
-                </dependency>
-                <dependency>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcpkix-jdk18on</artifactId>
-                </dependency>
-            </dependencies>
-        </profile>
-        <!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem 
-->
-        <profile>
-            <id>include-hadoop-gcp</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>com.google.cloud.bigdataoss</groupId>
-                    <artifactId>gcs-connector</artifactId>
-                    <version>hadoop3-${gcs.version}</version>
-                </dependency>
-                <dependency>
-                    <groupId>com.google.cloud.bigdataoss</groupId>
-                    <artifactId>util</artifactId>
-                    <version>${gcs.version}</version>
-                </dependency>
-                <dependency>
-                    <groupId>com.google.cloud.bigdataoss</groupId>
-                    <artifactId>util-hadoop</artifactId>
-                    <version>hadoop3-${gcs.version}</version>
-                </dependency>
-                <dependency>
-                    <groupId>com.google.cloud.bigdataoss</groupId>
-                    <artifactId>gcsio</artifactId>
-                    <version>${gcs.version}</version>
-                </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
 </project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
index f7783db574..9cf2556a9e 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml
@@ -31,120 +31,5 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
-
-        <!-- External dependencies -->
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-hive-metastore</artifactId>
-            <version>${iceberg.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-parquet</artifactId>
-            <version>${iceberg.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-orc</artifactId>
-            <version>${iceberg.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <version>${hadoop.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-reload4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>javax.servlet-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-metastore</artifactId>
-            <version>${hive.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-web</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-1.2-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-slf4j-impl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.orc</groupId>
-                    <artifactId>orc-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hbase</groupId>
-                    <artifactId>hbase-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>co.cask.tephra</groupId>
-                    <artifactId>tephra-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>co.cask.tephra</groupId>
-                    <artifactId>tephra-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>co.cask.tephra</groupId>
-                    <artifactId>tephra-hbase-compat-1.0</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.parquet</groupId>
-                    <artifactId>parquet-hadoop-bundle</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.tdunning</groupId>
-                    <artifactId>json</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.zaxxer</groupId>
-                    <artifactId>HikariCP</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java
similarity index 68%
copy from 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
copy to 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java
index 991ac625dc..f4c55c39c8 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java
@@ -17,16 +17,19 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.nifi.controller.ControllerService;
+public enum IcebergCatalogProperty {
 
-/**
- * Provides a basic connector to Iceberg catalog services.
- */
-public interface IcebergCatalogService extends ControllerService {
+    METASTORE_URI("hive.metastore.uris"),
+    WAREHOUSE_LOCATION("hive.metastore.warehouse.dir");
+
+    private final String hadoopPropertyName;
+
+    IcebergCatalogProperty(String hadoopPropertyName) {
+        this.hadoopPropertyName = hadoopPropertyName;
+    }
 
-    Catalog getCatalog();
+    public String getHadoopPropertyName() {
+        return hadoopPropertyName;
+    }
 
-    Configuration getConfiguration();
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
index 991ac625dc..56e595d2e9 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
@@ -17,16 +17,19 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
 import org.apache.nifi.controller.ControllerService;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * Provides a basic connector to Iceberg catalog services.
  */
 public interface IcebergCatalogService extends ControllerService {
 
-    Catalog getCatalog();
+    IcebergCatalogType getCatalogType();
+
+    Map<IcebergCatalogProperty, String> getCatalogProperties();
 
-    Configuration getConfiguration();
+    List<String> getConfigFilePaths();
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java
similarity index 71%
copy from 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
copy to 
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java
index 991ac625dc..4b8640da1d 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java
@@ -17,16 +17,7 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.nifi.controller.ControllerService;
-
-/**
- * Provides a basic connector to Iceberg catalog services.
- */
-public interface IcebergCatalogService extends ControllerService {
-
-    Catalog getCatalog();
-
-    Configuration getConfiguration();
+public enum IcebergCatalogType {
+    HIVE,
+    HADOOP
 }
diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
index 2a22d49aac..e81de19917 100644
--- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
+++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml
@@ -36,6 +36,11 @@
             <artifactId>nifi-utils</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-xml-processing</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
index 38f156c68d..7afc68a6a0 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java
@@ -17,14 +17,24 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.resource.ResourceCardinality;
 import org.apache.nifi.components.resource.ResourceType;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.w3c.dom.Document;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -32,7 +42,9 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
  */
 public abstract class AbstractCatalogService extends AbstractControllerService 
implements IcebergCatalogService {
 
-    protected Configuration configuration = new Configuration();
+    protected Map<IcebergCatalogProperty, String> catalogProperties = new 
HashMap<>();
+
+    protected List<String> configFilePaths;
 
     static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new 
PropertyDescriptor.Builder()
             .name("hadoop-config-resources")
@@ -44,24 +56,38 @@ public abstract class AbstractCatalogService extends 
AbstractControllerService i
             .dynamicallyModifiesClasspath(true)
             .build();
 
-    /**
-     * Loads configuration files from the provided paths.
-     *
-     * @param configFiles list of config file paths separated with comma
-     * @return merged configuration
-     */
-    protected Configuration getConfigurationFromFiles(String configFiles) {
-        final Configuration conf = new Configuration();
-        if (StringUtils.isNotBlank(configFiles)) {
-            for (final String configFile : configFiles.split(",")) {
-                conf.addResource(new Path(configFile.trim()));
+    protected List<Document> parseConfigFilePaths(String configFilePaths) {
+        List<Document> documentList = new ArrayList<>();
+        for (final String configFile : createFilePathList(configFilePaths)) {
+            File file = new File(configFile.trim());
+            try (final InputStream fis = new FileInputStream(file);
+                 final InputStream in = new BufferedInputStream(fis)) {
+                final StandardDocumentProvider documentProvider = new 
StandardDocumentProvider();
+                documentList.add(documentProvider.parse(in));
+            } catch (IOException e) {
+                throw new ProcessException("Failed to load config files", e);
             }
         }
-        return conf;
+        return documentList;
+    }
+
+    protected List<String> createFilePathList(String configFilePaths) {
+        List<String> filePathList = new ArrayList<>();
+        if (configFilePaths != null && !configFilePaths.trim().isEmpty()) {
+            for (final String configFile : configFilePaths.split(",")) {
+                filePathList.add(configFile.trim());
+            }
+        }
+        return filePathList;
+    }
+
+    @Override
+    public Map<IcebergCatalogProperty, String> getCatalogProperties() {
+        return catalogProperties;
     }
 
     @Override
-    public Configuration getConfiguration() {
-        return configuration;
+    public List<String> getConfigFilePaths() {
+        return configFilePaths;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
index dcf2dd395f..8f62e1e183 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java
@@ -17,20 +17,20 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
+
 @Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"})
 @CapabilityDescription("Catalog service that can use HDFS or similar file 
systems that support atomic rename.")
 public class HadoopCatalogService extends AbstractCatalogService {
@@ -39,6 +39,7 @@ public class HadoopCatalogService extends 
AbstractCatalogService {
             .name("warehouse-path")
             .displayName("Warehouse Path")
             .description("Path to the location of the warehouse.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
@@ -53,25 +54,18 @@ public class HadoopCatalogService extends 
AbstractCatalogService {
         return PROPERTIES;
     }
 
-    private HadoopCatalog catalog;
-
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
-        final String warehousePath = 
context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue();
-
         if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
-            final String configFiles = 
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
-
-            configuration = getConfigurationFromFiles(configFiles);
-            catalog = new HadoopCatalog(configuration, warehousePath);
-        } else {
-            catalog = new HadoopCatalog(new Configuration(), warehousePath);
+            configFilePaths = 
createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
         }
+
+        catalogProperties.put(WAREHOUSE_LOCATION, 
context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue());
     }
 
     @Override
-    public Catalog getCatalog() {
-        return catalog;
+    public IcebergCatalogType getCatalogType() {
+        return IcebergCatalogType.HADOOP;
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
index 25bafe8116..e609981d46 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java
@@ -17,10 +17,6 @@
  */
 package org.apache.nifi.services.iceberg;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -30,14 +26,14 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 @Tags({"iceberg", "catalog", "service", "metastore", "hive"})
 @CapabilityDescription("Catalog service that connects to a Hive metastore to 
keep track of Iceberg tables.")
@@ -47,7 +43,7 @@ public class HiveCatalogService extends 
AbstractCatalogService {
             .name("hive-metastore-uri")
             .displayName("Hive Metastore URI")
             .description("The URI location(s) for the Hive metastore; note 
that this is not the location of the Hive Server. The default port for the Hive 
metastore is 9043.")
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.URI_LIST_VALIDATOR)
             .build();
 
@@ -55,6 +51,7 @@ public class HiveCatalogService extends 
AbstractCatalogService {
             .name("warehouse-location")
             .displayName("Default Warehouse Location")
             .description("Location of default database for the warehouse. This 
field sets or overrides the 'hive.metastore.warehouse.dir' configuration 
property.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
@@ -69,14 +66,12 @@ public class HiveCatalogService extends 
AbstractCatalogService {
         return PROPERTIES;
     }
 
-    private HiveCatalog catalog;
-
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
 
         final List<ValidationResult> problems = new ArrayList<>();
-        String configMetastoreUri = null;
-        String configWarehouseLocation = null;
+        boolean configMetastoreUriPresent = false;
+        boolean configWarehouseLocationPresent = false;
 
         final String propertyMetastoreUri = 
validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
         final String propertyWarehouseLocation = 
validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
@@ -84,13 +79,30 @@ public class HiveCatalogService extends 
AbstractCatalogService {
         // Load the configurations for validation only if any config resource 
is provided and if either the metastore URI or the warehouse location property 
is missing
         if 
(validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && 
(propertyMetastoreUri == null || propertyWarehouseLocation == null)) {
             final String configFiles = 
validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+            final List<Document> documents = parseConfigFilePaths(configFiles);
+
+            for (Document document : documents) {
+                final NodeList nameNodeList = 
document.getElementsByTagName("name");
+
+                for (int i = 0; i < nameNodeList.getLength(); i++) {
+                    final String nodeValue = 
nameNodeList.item(i).getFirstChild().getNodeValue();
+
+                    if 
(nodeValue.equals(IcebergCatalogProperty.METASTORE_URI.getHadoopPropertyName()))
 {
+                        configMetastoreUriPresent = true;
+                    }
 
-            Configuration configuration = 
getConfigurationFromFiles(configFiles);
-            configMetastoreUri = configuration.get("hive.metastore.uris");
-            configWarehouseLocation = 
configuration.get("hive.metastore.warehouse.dir");
+                    if 
(nodeValue.equals(IcebergCatalogProperty.WAREHOUSE_LOCATION.getHadoopPropertyName()))
 {
+                        configWarehouseLocationPresent = true;
+                    }
+
+                    if (configMetastoreUriPresent && 
configWarehouseLocationPresent) {
+                        break;
+                    }
+                }
+            }
         }
 
-        if (configMetastoreUri == null && propertyMetastoreUri == null) {
+        if (!configMetastoreUriPresent && propertyMetastoreUri == null) {
             problems.add(new ValidationResult.Builder()
                     .subject("Hive Metastore URI")
                     .valid(false)
@@ -99,7 +111,7 @@ public class HiveCatalogService extends 
AbstractCatalogService {
                     .build());
         }
 
-        if (configWarehouseLocation == null && propertyWarehouseLocation == 
null) {
+        if (!configWarehouseLocationPresent && propertyWarehouseLocation == 
null) {
             problems.add(new ValidationResult.Builder()
                     .subject("Default Warehouse Location")
                     .valid(false)
@@ -113,29 +125,21 @@ public class HiveCatalogService extends 
AbstractCatalogService {
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
-        catalog = new HiveCatalog();
-        Map<String, String> properties = new HashMap<>();
-
         if (context.getProperty(METASTORE_URI).isSet()) {
-            properties.put(CatalogProperties.URI, 
context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
+            catalogProperties.put(IcebergCatalogProperty.METASTORE_URI, 
context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
         }
 
         if (context.getProperty(WAREHOUSE_LOCATION).isSet()) {
-            properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
+            catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, 
context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
         }
 
         if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
-            final String configFiles = 
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
-
-            configuration = getConfigurationFromFiles(configFiles);
-            catalog.setConf(configuration);
+            configFilePaths = 
createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
         }
-
-        catalog.initialize("hive-catalog", properties);
     }
 
     @Override
-    public Catalog getCatalog() {
-        return catalog;
+    public IcebergCatalogType getCatalogType() {
+        return IcebergCatalogType.HIVE;
     }
 }


Reply via email to