This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c97813cd5 [lake/iceberg] Support pass hadoop configuration (#1541)
c97813cd5 is described below

commit c97813cd5e35cfdcd9b0c3d83cdb1918585a8569
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 19 10:56:25 2025 +0800

    [lake/iceberg] Support pass hadoop configuration (#1541)
---
 fluss-lake/fluss-lake-iceberg/pom.xml              |  27 +++++
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  13 +-
 .../fluss/lake/iceberg/conf/HadoopConfSerde.java   |  48 ++++++++
 .../fluss/lake/iceberg/conf/HadoopUtils.java       | 135 +++++++++++++++++++++
 .../lake/iceberg/conf/IcebergConfiguration.java    |  98 +++++++++++++++
 .../iceberg/tiering/IcebergCatalogProvider.java    |  10 +-
 .../src/main/resources/META-INF/NOTICE             |   6 +
 .../iceberg/conf/IcebergConfigurationTest.java     |  70 +++++++++++
 8 files changed, 393 insertions(+), 14 deletions(-)

diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 52f37d072..274de4919 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -33,6 +33,16 @@
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-api</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-common</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
@@ -48,10 +58,21 @@
             <artifactId>iceberg-parquet</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-orc</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-bundled-guava</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-common</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
@@ -136,7 +157,13 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    
<include>org.apache.iceberg:iceberg-api</include>
                                     
<include>org.apache.iceberg:iceberg-core</include>
+                                    
<include>org.apache.iceberg:iceberg-data</include>
+                                    
<include>org.apache.iceberg:iceberg-common</include>
+                                    
<include>org.apache.iceberg:iceberg-parquet</include>
+                                    
<include>org.apache.iceberg:iceberg-orc</include>
+                                    
<include>org.apache.iceberg:iceberg-bundled-guava</include>
                                 </includes>
                             </artifactSet>
                             <filters>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index dbc087c1f..46eb9f65d 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.lake.iceberg;
 import com.alibaba.fluss.annotation.VisibleForTesting;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
 import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
 import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TablePath;
@@ -53,6 +54,8 @@ import static 
org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 /** An Iceberg implementation of {@link LakeCatalog}. */
 public class IcebergLakeCatalog implements LakeCatalog {
 
+    public static final String ICEBERG_CATALOG_DEFAULT_NAME = 
"fluss-iceberg-catalog";
+
     private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
     static {
@@ -81,15 +84,9 @@ public class IcebergLakeCatalog implements LakeCatalog {
 
     private Catalog createIcebergCatalog(Configuration configuration) {
         Map<String, String> icebergProps = configuration.toMap();
-
-        String catalogName = icebergProps.getOrDefault("name", 
"fluss-iceberg-catalog");
-
+        String catalogName = icebergProps.getOrDefault("name", 
ICEBERG_CATALOG_DEFAULT_NAME);
         return buildIcebergCatalog(
-                catalogName,
-                icebergProps, // todo: current is an empty configuration, need 
to init from env or
-                // fluss
-                // configurations
-                new org.apache.hadoop.conf.Configuration());
+                catalogName, icebergProps, 
IcebergConfiguration.from(configuration).get());
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
new file mode 100644
index 000000000..ffb26c56d
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** Serde of {@link Configuration} . */
+public class HadoopConfSerde {
+
+    public static void writeObject(ObjectOutputStream out, Object hadoopConf) 
throws IOException {
+        try {
+            Configuration conf = (Configuration) hadoopConf;
+            conf.write(out);
+        } catch (IOException e) {
+            throw new IOException("Failed to serialize Hadoop Configuration: " 
+ e.getMessage(), e);
+        }
+    }
+
+    public static Configuration readObject(ObjectInputStream in) throws 
IOException {
+        try {
+            Configuration hadoopConf = new Configuration();
+            hadoopConf.readFields(in);
+            return hadoopConf;
+        } catch (IOException e) {
+            throw new IOException(
+                    "Failed to deserialize Hadoop Configuration: " + 
e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
new file mode 100644
index 000000000..f93cdd7df
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import com.alibaba.fluss.config.ConfigBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be 
used if Hadoop is on
+ * the classpath.
+ *
+ * <p>Note: Copied from HadoopUtils of fluss-fs-hadoop module
+ */
+public class HadoopUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopUtils.class);
+
+    /** The prefixes that Fluss adds to the Hadoop config for iceberg. */
+    private static final String[] FLUSS_CONFIG_PREFIXES = {"iceberg.hadoop."};
+
+    public static Configuration getHadoopConfiguration(
+            com.alibaba.fluss.config.Configuration flussConfiguration) {
+
+        // Instantiate an HdfsConfiguration to load the hdfs-site.xml and 
hdfs-default.xml
+        // from the classpath
+
+        Configuration result = new HdfsConfiguration();
+        boolean foundHadoopConfiguration = false;
+
+        // We need to load both core-site.xml and hdfs-site.xml to determine 
the default fs path and
+        // the hdfs configuration.
+        // The properties of a newly added resource will override the ones in 
previous resources, so
+        // a configuration
+        // file with higher priority should be added later.
+
+        // Approach 1: HADOOP_HOME environment variables
+        String[] possibleHadoopConfPaths = new String[2];
+
+        final String hadoopHome = System.getenv("HADOOP_HOME");
+        if (hadoopHome != null) {
+            LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: 
{}", hadoopHome);
+            possibleHadoopConfPaths[0] = hadoopHome + "/conf";
+            possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 
2.2
+        }
+
+        for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+            if (possibleHadoopConfPath != null) {
+                foundHadoopConfiguration = addHadoopConfIfFound(result, 
possibleHadoopConfPath);
+            }
+        }
+
+        // Approach 2: HADOOP_CONF_DIR environment variable
+        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+        if (hadoopConfDir != null) {
+            LOG.debug("Searching Hadoop configuration files in 
HADOOP_CONF_DIR: {}", hadoopConfDir);
+            foundHadoopConfiguration =
+                    addHadoopConfIfFound(result, hadoopConfDir) || 
foundHadoopConfiguration;
+        }
+
+        // Approach 3: Fluss configuration
+        // add all configuration key with prefix 'iceberg.hadoop.' in fluss 
conf to hadoop conf
+        for (String key : flussConfiguration.keySet()) {
+            for (String prefix : FLUSS_CONFIG_PREFIXES) {
+                if (key.startsWith(prefix)) {
+                    String newKey = key.substring(prefix.length());
+                    String value =
+                            flussConfiguration.getString(
+                                    
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
+                    result.set(newKey, value);
+                    LOG.debug(
+                            "Adding Fluss config entry for {} as {}={} to 
Hadoop config",
+                            key,
+                            newKey,
+                            value);
+                    foundHadoopConfiguration = true;
+                }
+            }
+        }
+
+        if (!foundHadoopConfiguration) {
+            LOG.warn(
+                    "Could not find Hadoop configuration via any of the 
supported methods "
+                            + "(Fluss configuration, environment variables).");
+        }
+
+        return result;
+    }
+
+    /**
+     * Search Hadoop configuration files in the given path, and add them to 
the configuration if
+     * found.
+     */
+    private static boolean addHadoopConfIfFound(
+            Configuration configuration, String possibleHadoopConfPath) {
+        boolean foundHadoopConfiguration = false;
+        if (new File(possibleHadoopConfPath).exists()) {
+            if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+                configuration.addResource(
+                        new org.apache.hadoop.fs.Path(possibleHadoopConfPath + 
"/core-site.xml"));
+                LOG.debug(
+                        "Adding {}/core-site.xml to hadoop configuration", 
possibleHadoopConfPath);
+                foundHadoopConfiguration = true;
+            }
+            if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+                configuration.addResource(
+                        new org.apache.hadoop.fs.Path(possibleHadoopConfPath + 
"/hdfs-site.xml"));
+                LOG.debug(
+                        "Adding {}/hdfs-site.xml to hadoop configuration", 
possibleHadoopConfPath);
+                foundHadoopConfiguration = true;
+            }
+        }
+        return foundHadoopConfiguration;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
new file mode 100644
index 000000000..b2b277745
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.config.Configuration;
+
+import org.apache.iceberg.common.DynClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Wraps the hadoop configuration used to configure {@link 
org.apache.iceberg.catalog.Catalog} if
+ * hadoop related classes is available.
+ *
+ * <p>It don't declare Hadoop configuration explicitly for some catalogs won't 
need hadoop
+ * configuration. For these catalogs, it won't throw class not found 
exception. It set the conf to
+ * null if no hadoop dependencies are found. It's fine to use null for the 
catalogs don't require
+ * Hadoop configuration.
+ *
+ * <p>For the catalogs require Hadoop configuration, hadoop related class not 
found exception will
+ * be thrown which guides users to add hadoop related classes.
+ */
+public class IcebergConfiguration implements Serializable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergConfiguration.class);
+
+    private transient Object conf;
+
+    @VisibleForTesting
+    protected IcebergConfiguration(Object conf) {
+        this.conf = conf;
+    }
+
+    public static IcebergConfiguration from(Configuration flussConfig) {
+        return new IcebergConfiguration(loadHadoopConfig(flussConfig));
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if (conf == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            HadoopConfSerde.writeObject(out, conf);
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOException {
+        in.defaultReadObject();
+        boolean configIsNotNull = in.readBoolean();
+        if (configIsNotNull) {
+            conf = HadoopConfSerde.readObject(in);
+        } else {
+            conf = null;
+        }
+    }
+
+    private static Object loadHadoopConfig(Configuration flussConfig) {
+        Class<?> configClass =
+                DynClasses.builder()
+                        .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
+                        .impl("org.apache.hadoop.conf.Configuration")
+                        .orNull()
+                        .build();
+
+        if (configClass == null) {
+            LOG.info("Hadoop not found on classpath, not creating Hadoop 
config");
+            return null;
+        }
+
+        return HadoopUtils.getHadoopConfiguration(flussConfig);
+    }
+
+    public Object get() {
+        return conf;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
index 1c484743a..b27f94e02 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -18,12 +18,14 @@
 package com.alibaba.fluss.lake.iceberg.tiering;
 
 import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
 
 import org.apache.iceberg.catalog.Catalog;
 
 import java.io.Serializable;
 import java.util.Map;
 
+import static 
com.alibaba.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
 import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 
 /** A provider for Iceberg catalog. */
@@ -38,13 +40,9 @@ public class IcebergCatalogProvider implements Serializable {
 
     public Catalog get() {
         Map<String, String> icebergProps = icebergConfig.toMap();
-        String catalogName = icebergProps.getOrDefault("name", 
"fluss-iceberg-catalog");
+        String catalogName = icebergProps.getOrDefault("name", 
ICEBERG_CATALOG_DEFAULT_NAME);
 
         return buildIcebergCatalog(
-                catalogName,
-                icebergProps,
-                // todo: current is an empty configuration, need to init from 
env or fluss
-                // configurations
-                new org.apache.hadoop.conf.Configuration());
+                catalogName, icebergProps, 
IcebergConfiguration.from(icebergConfig).get());
     }
 }
diff --git a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE 
b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
index 20b34f83d..edfd8ca2e 100644
--- a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
@@ -7,3 +7,9 @@ The Apache Software Foundation (http://www.apache.org/).
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
 - org.apache.iceberg:iceberg-core:1.4.3
+- org.apache.iceberg:iceberg-api:1.4.3
+- org.apache.iceberg:iceberg-bundled-guava:1.4.3
+- org.apache.iceberg:iceberg-common:1.4.3
+- org.apache.iceberg:iceberg-data:1.4.3
+- org.apache.iceberg:iceberg-orc:1.4.3
+- org.apache.iceberg:iceberg-parquet:1.4.3
\ No newline at end of file
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
new file mode 100644
index 000000000..749ae86ab
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergConfiguration}. */
+class IcebergConfigurationTest {
+
+    @Test
+    void testSerde() throws Exception {
+        // test when conf is null
+        IcebergConfiguration conf = new IcebergConfiguration(null);
+
+        byte[] data = serialize(conf);
+        IcebergConfiguration gotConf = deserialize(data);
+        assertThat(gotConf.get()).isNull();
+
+        // test when conf is not null
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set("k1", "v1");
+        hadoopConf.set("k2", "v2");
+        conf = new IcebergConfiguration(hadoopConf);
+        data = serialize(conf);
+        gotConf = deserialize(data);
+        Configuration gotHadoopConf = (Configuration) gotConf.get();
+        assertThat(gotHadoopConf.get("k1")).isEqualTo("v1");
+        assertThat(gotHadoopConf.get("k2")).isEqualTo("v2");
+    }
+
+    private byte[] serialize(IcebergConfiguration conf) throws IOException {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+            oos.writeObject(conf);
+            return bos.toByteArray();
+        }
+    }
+
+    private IcebergConfiguration deserialize(byte[] data) throws Exception {
+        try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+                ObjectInputStream ois = new ObjectInputStream(bis)) {
+            return (IcebergConfiguration) ois.readObject();
+        }
+    }
+}

Reply via email to