This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 617b8b35b0 [core] Introduce creation methods to Blob (#6320)
617b8b35b0 is described below

commit 617b8b35b0700bd5fedbc4a37d5917bd8183ca84
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 24 21:53:45 2025 +0800

    [core] Introduce creation methods to Blob (#6320)
---
 .../java/org/apache/paimon/rest/HttpClient.java    |   7 +-
 .../org/apache/paimon/rest/HttpClientUtils.java    |  17 +++
 .../apache/paimon/data/AbstractBinaryWriter.java   |   2 +-
 .../src/main/java/org/apache/paimon/data/Blob.java |  31 +++++-
 .../main/java/org/apache/paimon/data/BlobData.java |   7 +-
 .../org/apache/paimon/data/BlobDescriptor.java     | 114 +++++++++++++++++++++
 .../main/java/org/apache/paimon/data/BlobRef.java  |  59 +++++++----
 .../paimon/data/serializer/BlobSerializer.java     |   2 +-
 ...Stream.java => LimitedSeekableInputStream.java} |  37 ++-----
 .../paimon/fs/OffsetSeekableInputStream.java       |  20 ++--
 .../java/org/apache/paimon/utils/UriReader.java    |  65 ++++++++++++
 .../org/apache/paimon/utils/UriReaderFactory.java  |  88 ++++++++++++++++
 .../java/org/apache/paimon/data/BinaryRowTest.java |   2 +-
 .../org/apache/paimon/data/BlobDescriptorTest.java |  83 +++++++++++++++
 .../test/java/org/apache/paimon/data/BlobTest.java |  82 +++++++++++++++
 .../paimon/fs/OffsetSeekableInputStreamTest.java   |  27 +++++
 .../apache/paimon/utils/UriReaderFactoryTest.java  |  80 +++++++++++++++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  21 ++--
 .../org/apache/paimon/catalog/CatalogUtils.java    |   4 +-
 .../apache/paimon/catalog/FileSystemCatalog.java   |   9 +-
 .../paimon/catalog/FileSystemCatalogFactory.java   |   2 +-
 .../paimon/catalog/FileSystemCatalogLoader.java    |  11 +-
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   |  10 +-
 .../org/apache/paimon/jdbc/JdbcCatalogFactory.java |   2 +-
 .../org/apache/paimon/jdbc/JdbcCatalogLoader.java  |  13 +--
 .../java/org/apache/paimon/rest/RESTCatalog.java   |   6 +-
 .../apache/paimon/table/CatalogEnvironment.java    |  14 ++-
 .../paimon/catalog/FileSystemCatalogTest.java      |   5 +-
 .../java/org/apache/paimon/data/BlobHttpTest.java  |  41 ++++++--
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    |   6 +-
 .../apache/paimon/jdbc/PostgresqlCatalogTest.java  |   2 +-
 .../paimon/operation/PartitionExpireTest.java      |   2 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |   6 +-
 .../cdc/mysql/TestAlterTableCatalogFactory.java    |   2 +-
 .../paimon/format/blob/BlobFormatReader.java       |  13 +--
 .../paimon/format/blob/BlobFileFormatTest.java     |   4 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  18 ++--
 .../org/apache/paimon/hive/HiveCatalogLoader.java  |  12 +--
 .../org/apache/paimon/hive/HiveCatalogTest.java    |   5 +-
 .../org/apache/paimon/hive/HiveTableStatsTest.java |   2 +-
 .../paimon/hive/pool/TestCachedClientPool.java     |   2 +-
 41 files changed, 789 insertions(+), 146 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
index e682a2aee5..b09d3b4528 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -32,7 +32,6 @@ import org.apache.hc.client5.http.classic.methods.HttpDelete;
 import org.apache.hc.client5.http.classic.methods.HttpGet;
 import org.apache.hc.client5.http.classic.methods.HttpPost;
 import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
 import org.apache.hc.core5.http.ClassicHttpResponse;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.io.entity.StringEntity;
@@ -43,13 +42,11 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.function.Function;
 
-import static org.apache.paimon.rest.HttpClientUtils.createLoggingBuilder;
+import static org.apache.paimon.rest.HttpClientUtils.DEFAULT_HTTP_CLIENT;
 
 /** Apache HTTP client for REST catalog. */
 public class HttpClient implements RESTClient {
 
-    private static final CloseableHttpClient HTTP_CLIENT = 
createLoggingBuilder().build();
-
     private final String uri;
 
     private ErrorHandler errorHandler;
@@ -137,7 +134,7 @@ public class HttpClient implements RESTClient {
 
     private <T extends RESTResponse> T exec(HttpUriRequestBase request, 
Class<T> responseType) {
         try {
-            return HTTP_CLIENT.execute(
+            return DEFAULT_HTTP_CLIENT.execute(
                     request,
                     response -> {
                         String responseBodyStr = 
RESTUtil.extractResponseBodyAsString(response);
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
index 28402eeadd..9447f24171 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
@@ -21,7 +21,10 @@ package org.apache.paimon.rest;
 import org.apache.paimon.rest.interceptor.LoggingInterceptor;
 import org.apache.paimon.rest.interceptor.TimingInterceptor;
 
+import org.apache.hc.client5.http.classic.methods.HttpGet;
 import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
 import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
 import org.apache.hc.client5.http.impl.classic.HttpClients;
 import 
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
@@ -32,9 +35,14 @@ import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
 import org.apache.hc.core5.ssl.SSLContexts;
 import org.apache.hc.core5.util.Timeout;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 /** Utils for {@link HttpClientBuilder}. */
 public class HttpClientUtils {
 
+    public static final CloseableHttpClient DEFAULT_HTTP_CLIENT = 
createLoggingBuilder().build();
+
     public static HttpClientBuilder createLoggingBuilder() {
         HttpClientBuilder clientBuilder = createBuilder();
         clientBuilder
@@ -74,4 +82,13 @@ public class HttpClientUtils {
 
         return connectionManagerBuilder.build();
     }
+
+    public static InputStream getAsInputStream(String uri) throws IOException {
+        HttpGet httpGet = new HttpGet(uri);
+        CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet);
+        if (response.getCode() != 200) {
+            throw new RuntimeException("HTTP error code: " + 
response.getCode());
+        }
+        return response.getEntity().getContent();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
index c03dd704b9..85d0445948 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
@@ -196,7 +196,7 @@ abstract class AbstractBinaryWriter implements BinaryWriter 
{
 
     @Override
     public void writeBlob(int pos, Blob blob) {
-        byte[] bytes = blob.toBytes();
+        byte[] bytes = blob.toData();
         writeBinary(pos, bytes, 0, bytes.length);
     }
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java 
b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
index a063297a9d..b7d49e01c1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
@@ -19,7 +19,10 @@
 package org.apache.paimon.data;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.UriReader;
 
 import java.io.IOException;
 
@@ -31,7 +34,33 @@ import java.io.IOException;
 @Public
 public interface Blob {
 
-    byte[] toBytes();
+    byte[] toData();
+
+    BlobDescriptor toDescriptor();
 
     SeekableInputStream newInputStream() throws IOException;
+
+    static Blob fromData(byte[] data) {
+        return new BlobData(data);
+    }
+
+    static Blob fromLocal(String file) {
+        return fromFile(LocalFileIO.create(), file);
+    }
+
+    static Blob fromHttp(String uri) {
+        return fromDescriptor(UriReader.fromHttp(), new BlobDescriptor(uri, 0, 
-1));
+    }
+
+    static Blob fromFile(FileIO fileIO, String file) {
+        return fromFile(fileIO, file, 0, -1);
+    }
+
+    static Blob fromFile(FileIO fileIO, String file, long offset, long length) 
{
+        return fromDescriptor(UriReader.fromFile(fileIO), new 
BlobDescriptor(file, offset, length));
+    }
+
+    static Blob fromDescriptor(UriReader reader, BlobDescriptor descriptor) {
+        return new BlobRef(reader, descriptor);
+    }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
index fd9a32cd65..e5e5d53aad 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
@@ -44,10 +44,15 @@ public class BlobData implements Blob, Serializable {
     }
 
     @Override
-    public byte[] toBytes() {
+    public byte[] toData() {
         return data;
     }
 
+    @Override
+    public BlobDescriptor toDescriptor() {
+        throw new RuntimeException("Blob data can not convert to descriptor.");
+    }
+
     @Override
     public SeekableInputStream newInputStream() throws IOException {
         return new ByteArraySeekableStream(data);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
new file mode 100644
index 0000000000..52e9667cbf
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Blob descriptor to describe a blob reference. */
+public class BlobDescriptor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String uri;
+    private final long offset;
+    private final long length;
+
+    public BlobDescriptor(String uri, long offset, long length) {
+        this.uri = uri;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public String uri() {
+        return uri;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public long length() {
+        return length;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BlobDescriptor that = (BlobDescriptor) o;
+        return offset == that.offset && length == that.length && 
Objects.equals(uri, that.uri);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(uri, offset, length);
+    }
+
+    @Override
+    public String toString() {
+        return "BlobDescriptor{"
+                + "uri='"
+                + uri
+                + '\''
+                + ", offset="
+                + offset
+                + ", length="
+                + length
+                + '}';
+    }
+
+    public byte[] serialize() {
+        byte[] uriBytes = uri.getBytes(UTF_8);
+        int uriLength = uriBytes.length;
+
+        int totalSize = 4 + uriLength + 8 + 8;
+        ByteBuffer buffer = ByteBuffer.allocate(totalSize);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+        buffer.putInt(uriLength);
+        buffer.put(uriBytes);
+
+        buffer.putLong(offset);
+        buffer.putLong(length);
+
+        return buffer.array();
+    }
+
+    public static BlobDescriptor deserialize(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+        int uriLength = buffer.getInt();
+        byte[] uriBytes = new byte[uriLength];
+        buffer.get(uriBytes);
+        String uri = new String(uriBytes, StandardCharsets.UTF_8);
+
+        long offset = buffer.getLong();
+        long length = buffer.getLong();
+
+        return new BlobDescriptor(uri, offset, length);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
similarity index 52%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
rename to paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
index 85e98e3ab8..0248454ee9 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
@@ -16,34 +16,35 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format.blob;
+package org.apache.paimon.data;
 
-import org.apache.paimon.data.Blob;
-import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.OffsetSeekableInputStream;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.UriReader;
 
 import java.io.IOException;
+import java.util.Objects;
 
-/** BlobRef is a reference to a blob file. */
-public class BlobFileRef implements Blob {
+/**
+ * A {@link Blob} refers blob in {@link BlobDescriptor}.
+ *
+ * @since 1.4.0
+ */
+@Public
+public class BlobRef implements Blob {
 
-    private final FileIO fileIO;
-    private final Path filePath;
-    private final long blobLength;
-    private final long blobOffset;
+    private final UriReader uriReader;
+    private final BlobDescriptor descriptor;
 
-    public BlobFileRef(FileIO fileIO, Path filePath, long blobLength, long 
blobOffset) {
-        this.fileIO = fileIO;
-        this.filePath = filePath;
-        this.blobLength = blobLength;
-        this.blobOffset = blobOffset;
+    public BlobRef(UriReader uriReader, BlobDescriptor descriptor) {
+        this.uriReader = uriReader;
+        this.descriptor = descriptor;
     }
 
     @Override
-    public byte[] toBytes() {
+    public byte[] toData() {
         try {
             return IOUtils.readFully(newInputStream(), true);
         } catch (IOException e) {
@@ -51,10 +52,30 @@ public class BlobFileRef implements Blob {
         }
     }
 
+    @Override
+    public BlobDescriptor toDescriptor() {
+        return descriptor;
+    }
+
     @Override
     public SeekableInputStream newInputStream() throws IOException {
-        SeekableInputStream in = fileIO.newInputStream(filePath);
-        // TODO validate Magic number?
-        return new OffsetSeekableInputStream(in, blobOffset + 4, blobLength - 
16);
+        return new OffsetSeekableInputStream(
+                uriReader.newInputStream(descriptor.uri()),
+                descriptor.offset(),
+                descriptor.length());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BlobRef blobRef = (BlobRef) o;
+        return Objects.deepEquals(descriptor, blobRef.descriptor);
+    }
+
+    @Override
+    public int hashCode() {
+        return descriptor.hashCode();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
index efc3bd2146..28f9233a37 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
@@ -39,7 +39,7 @@ public class BlobSerializer extends SerializerSingleton<Blob> 
{
 
     @Override
     public void serialize(Blob blob, DataOutputView target) throws IOException 
{
-        BinarySerializer.INSTANCE.serialize(blob.toBytes(), target);
+        BinarySerializer.INSTANCE.serialize(blob.toData(), target);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
similarity index 57%
copy from 
paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
copy to 
paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
index c9fad5b457..66f9e89260 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
@@ -19,54 +19,39 @@
 package org.apache.paimon.fs;
 
 import java.io.IOException;
+import java.io.InputStream;
 
-/**
- * A {@link SeekableInputStream} wrapping another {@link SeekableInputStream} 
with offset and
- * length.
- */
-public class OffsetSeekableInputStream extends SeekableInputStream {
+/** A {@link SeekableInputStream} wrapped {@link InputStream} without seek. */
+public class LimitedSeekableInputStream extends SeekableInputStream {
 
-    private final SeekableInputStream wrapped;
-    private final long offset;
-    private final long length;
+    private final InputStream in;
 
-    public OffsetSeekableInputStream(SeekableInputStream wrapped, long offset, 
long length)
-            throws IOException {
-        this.wrapped = wrapped;
-        this.offset = offset;
-        this.length = length;
-        wrapped.seek(offset);
+    public LimitedSeekableInputStream(InputStream in) {
+        this.in = in;
     }
 
     @Override
     public void seek(long desired) throws IOException {
-        wrapped.seek(offset + desired);
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public long getPos() throws IOException {
-        return wrapped.getPos() - offset;
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public int read() throws IOException {
-        if (getPos() >= length) {
-            return -1;
-        }
-        return wrapped.read();
+        return in.read();
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-        long realLen = Math.min(len, length - getPos());
-        if (realLen == 0) {
-            return -1;
-        }
-        return wrapped.read(b, off, (int) realLen);
+        return in.read(b, off, len);
     }
 
     @Override
     public void close() throws IOException {
-        wrapped.close();
+        in.close();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
index c9fad5b457..94e2cf33e2 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
@@ -35,7 +35,9 @@ public class OffsetSeekableInputStream extends 
SeekableInputStream {
         this.wrapped = wrapped;
         this.offset = offset;
         this.length = length;
-        wrapped.seek(offset);
+        if (offset != 0) {
+            wrapped.seek(offset);
+        }
     }
 
     @Override
@@ -50,19 +52,23 @@ public class OffsetSeekableInputStream extends 
SeekableInputStream {
 
     @Override
     public int read() throws IOException {
-        if (getPos() >= length) {
-            return -1;
+        if (length != -1) {
+            if (getPos() >= length) {
+                return -1;
+            }
         }
         return wrapped.read();
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-        long realLen = Math.min(len, length - getPos());
-        if (realLen == 0) {
-            return -1;
+        if (length != -1) {
+            len = (int) Math.min(len, length - getPos());
+            if (len == 0) {
+                return -1;
+            }
         }
-        return wrapped.read(b, off, (int) realLen);
+        return wrapped.read(b, off, len);
     }
 
     @Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
new file mode 100644
index 0000000000..5640a4c893
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.LimitedSeekableInputStream;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.rest.HttpClientUtils;
+
+import java.io.IOException;
+
+/** An interface to read uri as a stream. */
+public interface UriReader {
+
+    SeekableInputStream newInputStream(String uri) throws IOException;
+
+    static UriReader fromFile(FileIO fileIO) {
+        return new FileUriReader(fileIO);
+    }
+
+    static UriReader fromHttp() {
+        return new HttpUriReader();
+    }
+
+    /** A {@link UriReader} uses {@link FileIO} to read file. */
+    class FileUriReader implements UriReader {
+
+        private final FileIO fileIO;
+
+        public FileUriReader(FileIO fileIO) {
+            this.fileIO = fileIO;
+        }
+
+        @Override
+        public SeekableInputStream newInputStream(String uri) throws 
IOException {
+            return fileIO.newInputStream(new Path(uri));
+        }
+    }
+
+    /** A {@link UriReader} reads http uri. */
+    class HttpUriReader implements UriReader {
+
+        @Override
+        public SeekableInputStream newInputStream(String uri) throws 
IOException {
+            return new 
LimitedSeekableInputStream(HttpClientUtils.getAsInputStream(uri));
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
new file mode 100644
index 0000000000..92e3b8e6a8
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A factory to create and cache {@link UriReader}. */
+public class UriReaderFactory {
+
+    private final CatalogContext context;
+    private final Map<UriKey, UriReader> readers;
+
+    public UriReaderFactory(CatalogContext context) {
+        this.context = context;
+        this.readers = new ConcurrentHashMap<>();
+    }
+
+    public UriReader create(String input) {
+        URI uri = URI.create(input);
+        UriKey key = new UriKey(uri.getScheme(), uri.getAuthority());
+        return readers.computeIfAbsent(key, k -> newReader(k, uri));
+    }
+
+    private UriReader newReader(UriKey key, URI uri) {
+        if ("http".equals(key.scheme) || "https".equals(key.scheme)) {
+            return UriReader.fromHttp();
+        }
+
+        try {
+            FileIO fileIO = FileIO.get(new Path(uri), context);
+            return UriReader.fromFile(fileIO);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final class UriKey {
+
+        private final @Nullable String scheme;
+        private final @Nullable String authority;
+
+        public UriKey(@Nullable String scheme, @Nullable String authority) {
+            this.scheme = scheme;
+            this.authority = authority;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UriKey uriKey = (UriKey) o;
+            return Objects.equals(scheme, uriKey.scheme)
+                    && Objects.equals(authority, uriKey.authority);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(scheme, authority);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
index c7fae18163..04181eb2a8 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
@@ -974,7 +974,7 @@ public class BinaryRowTest {
 
         Blob blob0 = row.getBlob(0);
         assertThat(blob0).isInstanceOf(BlobData.class);
-        assertThat(blob0.toBytes()).isEqualTo(new byte[] {1, 3, 1});
+        assertThat(blob0.toData()).isEqualTo(new byte[] {1, 3, 1});
         assertThat(row.isNullAt(1)).isTrue();
     }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
new file mode 100644
index 0000000000..5cd3a06f55
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BlobDescriptor}. */
+public class BlobDescriptorTest {
+
+    @Test
+    public void testEquals() {
+        String uri1 = "/test/path1";
+        String uri2 = "/test/path2";
+
+        BlobDescriptor descriptor1 = new BlobDescriptor(uri1, 100L, 200L);
+        BlobDescriptor descriptor2 = new BlobDescriptor(uri1, 100L, 200L);
+        BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L);
+        BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L);
+        BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L);
+
+        assertThat(descriptor1).isEqualTo(descriptor2);
+        assertThat(descriptor1).isNotEqualTo(descriptor3);
+        assertThat(descriptor1).isNotEqualTo(descriptor4);
+        assertThat(descriptor1).isNotEqualTo(descriptor5);
+        assertThat(descriptor1).isNotEqualTo(null);
+        assertThat(descriptor1).isNotEqualTo(new Object());
+    }
+
+    @Test
+    public void testHashCode() {
+        String uri = "/test/path";
+
+        BlobDescriptor descriptor1 = new BlobDescriptor(uri, 100L, 200L);
+        BlobDescriptor descriptor2 = new BlobDescriptor(uri, 100L, 200L);
+
+        assertThat(descriptor1.hashCode()).isEqualTo(descriptor2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        String uri = "/test/path";
+        BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L);
+
+        String toString = descriptor.toString();
+        assertThat(toString).contains("uri='/test/path'");
+        assertThat(toString).contains("offset=100");
+        assertThat(toString).contains("length=200");
+    }
+
+    @Test
+    public void testSerializeAndDeserialize() {
+        String uri = "/test/path";
+        long offset = 100L;
+        long length = 200L;
+
+        BlobDescriptor original = new BlobDescriptor(uri, offset, length);
+        byte[] serialized = original.serialize();
+        BlobDescriptor deserialized = BlobDescriptor.deserialize(serialized);
+
+        assertThat(deserialized.uri()).isEqualTo(original.uri());
+        assertThat(deserialized.offset()).isEqualTo(original.offset());
+        assertThat(deserialized.length()).isEqualTo(original.length());
+        assertThat(deserialized).isEqualTo(original);
+    }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java
new file mode 100644
index 0000000000..27703662da
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link Blob}. */
+public class BlobTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    private String file;
+
+    @BeforeEach
+    void beforeEach() throws IOException {
+        file = new File(tempPath.toString(), "test.txt").getAbsolutePath();
+        Files.write(Paths.get(file), "test data".getBytes());
+    }
+
+    @Test
+    public void testFromData() {
+        byte[] testData = "test data".getBytes();
+        Blob blob = Blob.fromData(testData);
+        assertThat(blob).isInstanceOf(BlobData.class);
+        assertThat(blob.toData()).isEqualTo(testData);
+    }
+
+    @Test
+    public void testFromLocal() {
+        Blob blob = Blob.fromLocal(file);
+        assertThat(blob).isInstanceOf(BlobRef.class);
+        assertThat(blob.toData()).isEqualTo("test data".getBytes());
+    }
+
+    @Test
+    public void testFromFile() {
+        Blob blob = Blob.fromFile(LocalFileIO.create(), file, 0, 4);
+        assertThat(blob).isInstanceOf(BlobRef.class);
+        assertThat(blob.toData()).isEqualTo("test".getBytes());
+    }
+
+    @Test
+    public void testFromPath() {
+        Blob blob = Blob.fromFile(LocalFileIO.create(), file);
+        assertThat(blob).isInstanceOf(BlobRef.class);
+        assertThat(blob.toData()).isEqualTo("test data".getBytes());
+    }
+
+    @Test
+    public void testFromHttp() {
+        String uri = "http://example.com/file.txt";;
+        Blob blob = Blob.fromHttp(uri);
+        assertThat(blob).isInstanceOf(BlobRef.class);
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
index fa8a14ced5..94f7c47fa0 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
@@ -25,6 +25,9 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /** Tests for {@link OffsetSeekableInputStream}. */
 public class OffsetSeekableInputStreamTest {
@@ -146,4 +149,28 @@ public class OffsetSeekableInputStreamTest {
             assertThat(bytesRead).isEqualTo(-1);
         }
     }
+
+    @Test
+    public void testClose() throws IOException {
+        SeekableInputStream mockStream = mock(SeekableInputStream.class);
+        OffsetSeekableInputStream offsetStream = new 
OffsetSeekableInputStream(mockStream, 0, 10);
+        offsetStream.close();
+        verify(mockStream, times(1)).close();
+    }
+
+    @Test
+    public void testReadWithUnlimitedLength() throws IOException {
+        long offset = 5;
+        long length = -1; // Unlimited length
+        try (OffsetSeekableInputStream stream =
+                new OffsetSeekableInputStream(wrapped, offset, length)) {
+            // Should be able to read beyond the original testData length
+            byte[] buffer = new byte[10];
+            int bytesRead = stream.read(buffer, 0, 10);
+
+            assertThat(bytesRead).isEqualTo(10);
+            assertThat(buffer).containsExactly(Arrays.copyOfRange(testData, 5, 
15));
+            assertThat(stream.getPos()).isEqualTo(10);
+        }
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
new file mode 100644
index 0000000000..a3b6db3d75
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.UriReader.FileUriReader;
+import org.apache.paimon.utils.UriReader.HttpUriReader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UriReaderFactory}. */
+public class UriReaderFactoryTest {
+
+    private final UriReaderFactory factory =
+            new UriReaderFactory(CatalogContext.create(new Options()));
+
+    @Test
+    public void testCreateHttpUriReader() {
+        UriReader reader = factory.create("http://example.com/file.txt";);
+        assertThat(reader).isInstanceOf(HttpUriReader.class);
+    }
+
+    @Test
+    public void testCreateHttpsUriReader() {
+        UriReader reader = factory.create("https://example.com/file.txt";);
+        assertThat(reader).isInstanceOf(HttpUriReader.class);
+    }
+
+    @Test
+    public void testCreateFileUriReader() {
+        UriReader reader = factory.create("file:///path/to/file.txt");
+        assertThat(reader).isInstanceOf(FileUriReader.class);
+    }
+
+    @Test
+    public void testCreateUriReaderWithAuthority() {
+        UriReader reader1 = 
factory.create("http://my_bucket1/path/to/file.txt";);
+        UriReader reader2 = 
factory.create("http://my_bucket2/path/to/file.txt";);
+        assertThat(reader1).isNotEqualTo(reader2);
+    }
+
+    @Test
+    public void testCachedReadersWithSameSchemeAndAuthority() {
+        UriReader reader1 = 
factory.create("http://my_bucket/path/to/file1.txt";);
+        UriReader reader2 = 
factory.create("http://my_bucket/path/to/file2.txt";);
+        assertThat(reader1).isSameAs(reader2);
+    }
+
+    @Test
+    public void testCachedReadersWithNullAuthority() {
+        UriReader reader1 = factory.create("file:///path/to/file1.txt");
+        UriReader reader2 = factory.create("file:///path/to/file2.txt");
+        assertThat(reader1).isSameAs(reader2);
+    }
+
+    @Test
+    public void testCreateUriReaderWithLocalPath() {
+        UriReader reader = factory.create("/local/path/to/file.txt");
+        assertThat(reader).isInstanceOf(FileUriReader.class);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 5df78212f1..03e47194d0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -80,23 +80,23 @@ public abstract class AbstractCatalog implements Catalog {
 
     protected final FileIO fileIO;
     protected final Map<String, String> tableDefaultOptions;
-    protected final Options catalogOptions;
+    protected final CatalogContext context;
 
     protected AbstractCatalog(FileIO fileIO) {
         this.fileIO = fileIO;
         this.tableDefaultOptions = new HashMap<>();
-        this.catalogOptions = new Options();
+        this.context = CatalogContext.create(new Options());
     }
 
-    protected AbstractCatalog(FileIO fileIO, Options options) {
+    protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
         this.fileIO = fileIO;
-        this.tableDefaultOptions = 
CatalogUtils.tableDefaultOptions(options.toMap());
-        this.catalogOptions = options;
+        this.tableDefaultOptions = 
CatalogUtils.tableDefaultOptions(context.options().toMap());
+        this.context = context;
     }
 
     @Override
     public Map<String, String> options() {
-        return catalogOptions.toMap();
+        return context.options().toMap();
     }
 
     public abstract String warehouse();
@@ -114,7 +114,7 @@ public abstract class AbstractCatalog implements Catalog {
             return Optional.empty();
         }
 
-        String lock = catalogOptions.get(LOCK_TYPE);
+        String lock = context.options().get(LOCK_TYPE);
         if (lock == null) {
             return defaultLockFactory();
         }
@@ -129,11 +129,11 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     public Optional<CatalogLockContext> lockContext() {
-        return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
+        return Optional.of(CatalogLockContext.fromOptions(context.options()));
     }
 
     protected boolean lockEnabled() {
-        return 
catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
+        return 
context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
     }
 
     protected boolean allowCustomTablePath() {
@@ -474,7 +474,8 @@ public abstract class AbstractCatalog implements Catalog {
                 this::fileIO,
                 this::loadTableMetadata,
                 lockFactory().orElse(null),
-                lockContext().orElse(null));
+                lockContext().orElse(null),
+                context);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 8be164b1b1..d26642e046 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -214,7 +214,8 @@ public class CatalogUtils {
             Function<Path, FileIO> externalFileIO,
             TableMetadata.Loader metadataLoader,
             @Nullable CatalogLockFactory lockFactory,
-            @Nullable CatalogLockContext lockContext)
+            @Nullable CatalogLockContext lockContext,
+            @Nullable CatalogContext catalogContext)
             throws Catalog.TableNotExistException {
         if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
             return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -258,6 +259,7 @@ public class CatalogUtils {
                         catalog.catalogLoader(),
                         lockFactory,
                         lockContext,
+                        catalogContext,
                         catalog.supportsVersionManagement());
         Path path = new Path(schema.options().get(PATH.key()));
         FileStoreTable table =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index da56253335..cea266f703 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -21,7 +21,6 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -49,8 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog {
         this.warehouse = warehouse;
     }
 
-    public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
-        super(fileIO, options);
+    public FileSystemCatalog(FileIO fileIO, Path warehouse, CatalogContext 
context) {
+        super(fileIO, context);
         this.warehouse = warehouse;
     }
 
@@ -198,11 +197,11 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public CatalogLoader catalogLoader() {
-        return new FileSystemCatalogLoader(fileIO, warehouse, catalogOptions);
+        return new FileSystemCatalogLoader(fileIO, warehouse, context);
     }
 
     @Override
     public boolean caseSensitive() {
-        return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
+        return context.options().getOptional(CASE_SENSITIVE).orElse(true);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 353361b298..d17844baea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -41,6 +41,6 @@ public class FileSystemCatalogFactory implements 
CatalogFactory {
                     "Only managed table is supported in File system catalog.");
         }
 
-        return new FileSystemCatalog(fileIO, warehouse, context.options());
+        return new FileSystemCatalog(fileIO, warehouse, context);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
index 0fb5da6c4d..6a0cead0be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
@@ -20,25 +20,24 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
 
 /** Loader to create {@link FileSystemCatalog}. */
 public class FileSystemCatalogLoader implements CatalogLoader {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final FileIO fileIO;
     private final Path warehouse;
-    private final Options options;
+    private final CatalogContext context;
 
-    public FileSystemCatalogLoader(FileIO fileIO, Path warehouse, Options 
options) {
+    public FileSystemCatalogLoader(FileIO fileIO, Path warehouse, 
CatalogContext context) {
         this.fileIO = fileIO;
         this.warehouse = warehouse;
-        this.options = options;
+        this.context = context;
     }
 
     @Override
     public Catalog load() {
-        return new FileSystemCatalog(fileIO, warehouse, options);
+        return new FileSystemCatalog(fileIO, warehouse, context);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 22a9939d20..0891bad2bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.jdbc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
@@ -85,10 +86,11 @@ public class JdbcCatalog extends AbstractCatalog {
     private final Options options;
     private final String warehouse;
 
-    protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, 
String warehouse) {
-        super(fileIO, options);
+    protected JdbcCatalog(
+            FileIO fileIO, String catalogKey, CatalogContext context, String 
warehouse) {
+        super(fileIO, context);
         this.catalogKey = catalogKey;
-        this.options = options;
+        this.options = context.options();
         this.warehouse = warehouse;
         Preconditions.checkNotNull(options, "Invalid catalog properties: 
null");
         this.connections =
@@ -153,7 +155,7 @@ public class JdbcCatalog extends AbstractCatalog {
 
     @Override
     public CatalogLoader catalogLoader() {
-        return new JdbcCatalogLoader(fileIO, catalogKey, options, warehouse);
+        return new JdbcCatalogLoader(fileIO, catalogKey, context, warehouse);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index 6c3c1d0e41..b2998e5578 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -39,6 +39,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
     public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
         Options options = context.options();
         String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
-        return new JdbcCatalog(fileIO, catalogKey, context.options(), 
warehouse.toString());
+        return new JdbcCatalog(fileIO, catalogKey, context, 
warehouse.toString());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
index a1fa36e4b2..492d1a84b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
@@ -19,29 +19,30 @@
 package org.apache.paimon.jdbc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.options.Options;
 
 /** Loader to create {@link JdbcCatalog}. */
 public class JdbcCatalogLoader implements CatalogLoader {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final FileIO fileIO;
     private final String catalogKey;
-    private final Options options;
+    private final CatalogContext context;
     private final String warehouse;
 
-    public JdbcCatalogLoader(FileIO fileIO, String catalogKey, Options 
options, String warehouse) {
+    public JdbcCatalogLoader(
+            FileIO fileIO, String catalogKey, CatalogContext context, String 
warehouse) {
         this.fileIO = fileIO;
         this.catalogKey = catalogKey;
-        this.options = options;
+        this.context = context;
         this.warehouse = warehouse;
     }
 
     @Override
     public Catalog load() {
-        return new JdbcCatalog(fileIO, catalogKey, options, warehouse);
+        return new JdbcCatalog(fileIO, catalogKey, context, warehouse);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9179e71aa9..58f85a48cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -278,7 +278,8 @@ public class RESTCatalog implements Catalog {
                 this::fileIOFromOptions,
                 this::loadTableMetadata,
                 null,
-                null);
+                null,
+                context);
     }
 
     @Override
@@ -426,7 +427,8 @@ public class RESTCatalog implements Catalog {
                     this::fileIOFromOptions,
                     i -> toTableMetadata(db, response),
                     null,
-                    null);
+                    null,
+                    context);
         } catch (TableNotExistException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index fbc1b894a4..0f61f8fa09 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
@@ -42,13 +43,14 @@ import java.util.Optional;
 /** Catalog environment in table which contains log factory, metastore client 
factory. */
 public class CatalogEnvironment implements Serializable {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     @Nullable private final Identifier identifier;
     @Nullable private final String uuid;
     @Nullable private final CatalogLoader catalogLoader;
     @Nullable private final CatalogLockFactory lockFactory;
     @Nullable private final CatalogLockContext lockContext;
+    @Nullable private final CatalogContext catalogContext;
     private final boolean supportsVersionManagement;
 
     public CatalogEnvironment(
@@ -57,17 +59,19 @@ public class CatalogEnvironment implements Serializable {
             @Nullable CatalogLoader catalogLoader,
             @Nullable CatalogLockFactory lockFactory,
             @Nullable CatalogLockContext lockContext,
+            @Nullable CatalogContext catalogContext,
             boolean supportsVersionManagement) {
         this.identifier = identifier;
         this.uuid = uuid;
         this.catalogLoader = catalogLoader;
         this.lockFactory = lockFactory;
         this.lockContext = lockContext;
+        this.catalogContext = catalogContext;
         this.supportsVersionManagement = supportsVersionManagement;
     }
 
     public static CatalogEnvironment empty() {
-        return new CatalogEnvironment(null, null, null, null, null, false);
+        return new CatalogEnvironment(null, null, null, null, null, null, 
false);
     }
 
     @Nullable
@@ -132,6 +136,11 @@ public class CatalogEnvironment implements Serializable {
         return catalogLoader;
     }
 
+    @Nullable
+    public CatalogContext catalogContext() {
+        return catalogContext;
+    }
+
     public CatalogEnvironment copy(Identifier identifier) {
         return new CatalogEnvironment(
                 identifier,
@@ -139,6 +148,7 @@ public class CatalogEnvironment implements Serializable {
                 catalogLoader,
                 lockFactory,
                 lockContext,
+                catalogContext,
                 supportsVersionManagement);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index dcd27a91ed..04f67c018d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -36,8 +36,9 @@ public class FileSystemCatalogTest extends CatalogTestBase {
     @BeforeEach
     public void setUp() throws Exception {
         super.setUp();
-        Options catalogOptions = new Options();
-        catalog = new FileSystemCatalog(fileIO, new Path(warehouse), 
catalogOptions);
+        catalog =
+                new FileSystemCatalog(
+                        fileIO, new Path(warehouse), CatalogContext.create(new 
Options()));
     }
 
     @Test
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java 
b/paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
similarity index 50%
copy from paimon-common/src/main/java/org/apache/paimon/data/Blob.java
copy to paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
index a063297a9d..cc03cd1fed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
@@ -18,20 +18,39 @@
 
 package org.apache.paimon.data;
 
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.rest.TestHttpWebServer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
-/**
- * Blob interface, provides bytes and input stream methods.
- *
- * @since 1.4.0
- */
-@Public
-public interface Blob {
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Blob} http. */
+public class BlobHttpTest {
+
+    private TestHttpWebServer server;
+
+    @Before
+    public void setUp() throws Exception {
+        server = new TestHttpWebServer("/my_path");
+        server.start();
+    }
 
-    byte[] toBytes();
+    @After
+    public void tearDown() throws IOException {
+        server.stop();
+    }
 
-    SeekableInputStream newInputStream() throws IOException;
+    @Test
+    public void testHttp() {
+        String data = "my_data";
+        server.enqueueResponse(data, 200);
+        Blob blob = Blob.fromHttp(server.getBaseUrl());
+        assertThat(blob).isInstanceOf(BlobRef.class);
+        assertThat(new String(blob.toData(), UTF_8)).isEqualTo(data);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 419c8576e1..9e106ad966 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.jdbc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
@@ -63,7 +64,10 @@ public class JdbcCatalogTest extends CatalogTestBase {
         properties.putAll(props);
         JdbcCatalog catalog =
                 new JdbcCatalog(
-                        fileIO, "test-jdbc-catalog", 
Options.fromMap(properties), warehouse);
+                        fileIO,
+                        "test-jdbc-catalog",
+                        CatalogContext.create(Options.fromMap(properties)),
+                        warehouse);
         assertThat(catalog.warehouse()).isEqualTo(warehouse);
         return catalog;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
index b6b3e28b8c..d6bdc2438f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
@@ -121,7 +121,7 @@ public class PostgresqlCatalogTest {
                 new JdbcCatalog(
                         fileIO,
                         "test-jdbc-postgres-catalog",
-                        Options.fromMap(properties),
+                        CatalogContext.create(Options.fromMap(properties)),
                         warehouse);
         assertThat(catalog.warehouse()).isEqualTo(warehouse);
         return catalog;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 4c48671add..9f3903bd9b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -137,7 +137,7 @@ public class PartitionExpireTest {
                 };
 
         CatalogEnvironment env =
-                new CatalogEnvironment(null, null, null, null, null, false) {
+                new CatalogEnvironment(null, null, null, null, null, null, 
false) {
 
                     @Override
                     public PartitionHandler partitionHandler() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 08f3363c0e..c4b996436d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -161,6 +161,7 @@ public class RESTCatalogServer {
 
     private final String databaseUri;
 
+    private final CatalogContext catalogContext;
     private final FileSystemCatalog catalog;
     private final MockWebServer server;
 
@@ -190,7 +191,7 @@ public class RESTCatalogServer {
         Options conf = new Options();
         this.configResponse.getDefaults().forEach(conf::setString);
         conf.setString(WAREHOUSE.key(), dataPath);
-        CatalogContext context = CatalogContext.create(conf);
+        this.catalogContext = CatalogContext.create(conf);
         Path warehousePath = new Path(dataPath);
         FileIO fileIO;
         try {
@@ -199,7 +200,7 @@ public class RESTCatalogServer {
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
-        this.catalog = new FileSystemCatalog(fileIO, warehousePath, 
context.options());
+        this.catalog = new FileSystemCatalog(fileIO, warehousePath, 
catalogContext);
         Dispatcher dispatcher = initDispatcher(authProvider);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
@@ -2270,6 +2271,7 @@ public class RESTCatalogServer {
                             catalog.catalogLoader(),
                             catalog.lockFactory().orElse(null),
                             catalog.lockContext().orElse(null),
+                            catalogContext,
                             false);
             Path path = new Path(schema.options().get(PATH.key()));
             FileIO dataFileIO = catalog.fileIO();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
index 94abe72864..5a3f118d81 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
@@ -43,7 +43,7 @@ public class TestAlterTableCatalogFactory implements 
CatalogFactory {
 
     @Override
     public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
-        return new FileSystemCatalog(fileIO, warehouse, context.options()) {
+        return new FileSystemCatalog(fileIO, warehouse, context) {
 
             @Override
             public void alterTable(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 4a1a1d6444..46f8be153e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format.blob;
 
+import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
@@ -121,14 +122,14 @@ public class BlobFormatReader implements 
FileRecordReader<InternalRow> {
                     return null;
                 }
 
-                BlobFileRef blobRef =
-                        new BlobFileRef(
+                Blob blob =
+                        Blob.fromFile(
                                 fileIO,
-                                filePath,
-                                blobLengths[currentPosition],
-                                blobOffsets[currentPosition]);
+                                filePath.toString(),
+                                blobOffsets[currentPosition] + 4,
+                                blobLengths[currentPosition] - 16);
                 currentPosition++;
-                return GenericRow.of(blobRef);
+                return GenericRow.of(blob);
             }
 
             @Override
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index d30a669a29..005a78c24c 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -83,7 +83,7 @@ public class BlobFileFormatTest {
         List<byte[]> result = new ArrayList<>();
         readerFactory
                 .createReader(context)
-                .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+                .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
 
         // assert
         assertThat(result).containsExactlyElementsOf(blobs);
@@ -95,7 +95,7 @@ public class BlobFileFormatTest {
         result.clear();
         readerFactory
                 .createReader(context)
-                .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+                .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
 
         // assert
         assertThat(result).containsOnly(blobs.get(1));
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 6e3bf4782c..5485fc6b6b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -164,19 +164,19 @@ public class HiveCatalog extends AbstractCatalog {
     private final LocationHelper locationHelper;
 
     public HiveCatalog(FileIO fileIO, HiveConf hiveConf, String 
clientClassName, String warehouse) {
-        this(fileIO, hiveConf, clientClassName, new Options(), warehouse);
+        this(fileIO, hiveConf, clientClassName, CatalogContext.create(new 
Options()), warehouse);
     }
 
     public HiveCatalog(
             FileIO fileIO,
             HiveConf hiveConf,
             String clientClassName,
-            Options options,
+            CatalogContext context,
             String warehouse) {
-        super(fileIO, options);
+        super(fileIO, context);
         this.hiveConf = hiveConf;
         this.clientClassName = clientClassName;
-        this.options = options;
+        this.options = context.options();
         this.warehouse = warehouse;
 
         boolean needLocationInProperties =
@@ -206,7 +206,7 @@ public class HiveCatalog extends AbstractCatalog {
     public Optional<CatalogLockContext> lockContext() {
         return Optional.of(
                 new HiveCatalogLockContext(
-                        new SerializableHiveConf(hiveConf), clientClassName, 
catalogOptions));
+                        new SerializableHiveConf(hiveConf), clientClassName, 
options));
     }
 
     @Override
@@ -1202,7 +1202,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public boolean caseSensitive() {
-        return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false);
+        return options.getOptional(CASE_SENSITIVE).orElse(false);
     }
 
     @Override
@@ -1211,7 +1211,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     public boolean syncAllProperties() {
-        return catalogOptions.get(SYNC_ALL_PROPERTIES);
+        return options.get(SYNC_ALL_PROPERTIES);
     }
 
     @Override
@@ -1328,7 +1328,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public CatalogLoader catalogLoader() {
         return new HiveCatalogLoader(
-                fileIO, new SerializableHiveConf(hiveConf), clientClassName, 
options, warehouse);
+                fileIO, new SerializableHiveConf(hiveConf), clientClassName, 
context, warehouse);
     }
 
     public Table getHmsTable(Identifier identifier)
@@ -1715,7 +1715,7 @@ public class HiveCatalog extends AbstractCatalog {
                 fileIO,
                 hiveConf,
                 options.get(HiveCatalogOptions.METASTORE_CLIENT_CLASS),
-                options,
+                context,
                 warehouse.toUri().toString());
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
index ff389434d4..b7ed1c3ff8 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
@@ -19,36 +19,36 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.options.Options;
 
 /** Loader to create {@link HiveCatalog}. */
 public class HiveCatalogLoader implements CatalogLoader {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final FileIO fileIO;
     private final SerializableHiveConf hiveConf;
     private final String clientClassName;
-    private final Options options;
+    private final CatalogContext context;
     private final String warehouse;
 
     public HiveCatalogLoader(
             FileIO fileIO,
             SerializableHiveConf hiveConf,
             String clientClassName,
-            Options options,
+            CatalogContext context,
             String warehouse) {
         this.fileIO = fileIO;
         this.hiveConf = hiveConf;
         this.clientClassName = clientClassName;
-        this.options = options;
+        this.context = context;
         this.warehouse = warehouse;
     }
 
     @Override
     public Catalog load() {
-        return new HiveCatalog(fileIO, hiveConf.conf(), clientClassName, 
options, warehouse);
+        return new HiveCatalog(fileIO, hiveConf.conf(), clientClassName, 
context, warehouse);
     }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index d6fa84efb8..3043a57464 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
@@ -79,8 +80,8 @@ public class HiveCatalogTest extends CatalogTestBase {
         String metastoreClientClass = 
"org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
         Options catalogOptions = new Options();
         catalogOptions.set(CatalogOptions.SYNC_ALL_PROPERTIES.key(), "false");
-        catalog =
-                new HiveCatalog(fileIO, hiveConf, metastoreClientClass, 
catalogOptions, warehouse);
+        CatalogContext context = CatalogContext.create(catalogOptions, 
hiveConf);
+        catalog = new HiveCatalog(fileIO, hiveConf, metastoreClientClass, 
context, warehouse);
     }
 
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
index 33016fd083..35065eb43b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
@@ -64,7 +64,7 @@ public class HiveTableStatsTest {
         CatalogContext catalogContext = CatalogContext.create(catalogOptions);
         FileIO fileIO = FileIO.get(new Path(warehouse), catalogContext);
         catalog =
-                new HiveCatalog(fileIO, hiveConf, metastoreClientClass, 
catalogOptions, warehouse);
+                new HiveCatalog(fileIO, hiveConf, metastoreClientClass, 
catalogContext, warehouse);
     }
 
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
index 691c514dfa..6108633398 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
@@ -421,7 +421,7 @@ public class TestCachedClientPool {
                                                         fileIO,
                                                         hiveConf,
                                                         metastoreClientClass,
-                                                        options,
+                                                        
CatalogContext.create(options),
                                                         warehouse);
                                             } catch (Exception e) {
                                                 throw new RuntimeException(e);

Reply via email to