This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 617b8b35b0 [core] Introduce creation methods to Blob (#6320)
617b8b35b0 is described below
commit 617b8b35b0700bd5fedbc4a37d5917bd8183ca84
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 24 21:53:45 2025 +0800
[core] Introduce creation methods to Blob (#6320)
---
.../java/org/apache/paimon/rest/HttpClient.java | 7 +-
.../org/apache/paimon/rest/HttpClientUtils.java | 17 +++
.../apache/paimon/data/AbstractBinaryWriter.java | 2 +-
.../src/main/java/org/apache/paimon/data/Blob.java | 31 +++++-
.../main/java/org/apache/paimon/data/BlobData.java | 7 +-
.../org/apache/paimon/data/BlobDescriptor.java | 114 +++++++++++++++++++++
.../main/java/org/apache/paimon/data/BlobRef.java | 59 +++++++----
.../paimon/data/serializer/BlobSerializer.java | 2 +-
...Stream.java => LimitedSeekableInputStream.java} | 37 ++-----
.../paimon/fs/OffsetSeekableInputStream.java | 20 ++--
.../java/org/apache/paimon/utils/UriReader.java | 65 ++++++++++++
.../org/apache/paimon/utils/UriReaderFactory.java | 88 ++++++++++++++++
.../java/org/apache/paimon/data/BinaryRowTest.java | 2 +-
.../org/apache/paimon/data/BlobDescriptorTest.java | 83 +++++++++++++++
.../test/java/org/apache/paimon/data/BlobTest.java | 82 +++++++++++++++
.../paimon/fs/OffsetSeekableInputStreamTest.java | 27 +++++
.../apache/paimon/utils/UriReaderFactoryTest.java | 80 +++++++++++++++
.../org/apache/paimon/catalog/AbstractCatalog.java | 21 ++--
.../org/apache/paimon/catalog/CatalogUtils.java | 4 +-
.../apache/paimon/catalog/FileSystemCatalog.java | 9 +-
.../paimon/catalog/FileSystemCatalogFactory.java | 2 +-
.../paimon/catalog/FileSystemCatalogLoader.java | 11 +-
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 10 +-
.../org/apache/paimon/jdbc/JdbcCatalogFactory.java | 2 +-
.../org/apache/paimon/jdbc/JdbcCatalogLoader.java | 13 +--
.../java/org/apache/paimon/rest/RESTCatalog.java | 6 +-
.../apache/paimon/table/CatalogEnvironment.java | 14 ++-
.../paimon/catalog/FileSystemCatalogTest.java | 5 +-
.../java/org/apache/paimon/data/BlobHttpTest.java | 41 ++++++--
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 6 +-
.../apache/paimon/jdbc/PostgresqlCatalogTest.java | 2 +-
.../paimon/operation/PartitionExpireTest.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 6 +-
.../cdc/mysql/TestAlterTableCatalogFactory.java | 2 +-
.../paimon/format/blob/BlobFormatReader.java | 13 +--
.../paimon/format/blob/BlobFileFormatTest.java | 4 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 18 ++--
.../org/apache/paimon/hive/HiveCatalogLoader.java | 12 +--
.../org/apache/paimon/hive/HiveCatalogTest.java | 5 +-
.../org/apache/paimon/hive/HiveTableStatsTest.java | 2 +-
.../paimon/hive/pool/TestCachedClientPool.java | 2 +-
41 files changed, 789 insertions(+), 146 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
index e682a2aee5..b09d3b4528 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -32,7 +32,6 @@ import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.io.entity.StringEntity;
@@ -43,13 +42,11 @@ import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
-import static org.apache.paimon.rest.HttpClientUtils.createLoggingBuilder;
+import static org.apache.paimon.rest.HttpClientUtils.DEFAULT_HTTP_CLIENT;
/** Apache HTTP client for REST catalog. */
public class HttpClient implements RESTClient {
- private static final CloseableHttpClient HTTP_CLIENT =
createLoggingBuilder().build();
-
private final String uri;
private ErrorHandler errorHandler;
@@ -137,7 +134,7 @@ public class HttpClient implements RESTClient {
private <T extends RESTResponse> T exec(HttpUriRequestBase request,
Class<T> responseType) {
try {
- return HTTP_CLIENT.execute(
+ return DEFAULT_HTTP_CLIENT.execute(
request,
response -> {
String responseBodyStr =
RESTUtil.extractResponseBodyAsString(response);
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
index 28402eeadd..9447f24171 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java
@@ -21,7 +21,10 @@ package org.apache.paimon.rest;
import org.apache.paimon.rest.interceptor.LoggingInterceptor;
import org.apache.paimon.rest.interceptor.TimingInterceptor;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import
org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
@@ -32,9 +35,14 @@ import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
+import java.io.IOException;
+import java.io.InputStream;
+
/** Utils for {@link HttpClientBuilder}. */
public class HttpClientUtils {
+ public static final CloseableHttpClient DEFAULT_HTTP_CLIENT =
createLoggingBuilder().build();
+
public static HttpClientBuilder createLoggingBuilder() {
HttpClientBuilder clientBuilder = createBuilder();
clientBuilder
@@ -74,4 +82,13 @@ public class HttpClientUtils {
return connectionManagerBuilder.build();
}
+
+ public static InputStream getAsInputStream(String uri) throws IOException {
+ HttpGet httpGet = new HttpGet(uri);
+ CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet);
+ if (response.getCode() != 200) {
+ throw new RuntimeException("HTTP error code: " +
response.getCode());
+ }
+ return response.getEntity().getContent();
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
index c03dd704b9..85d0445948 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java
@@ -196,7 +196,7 @@ abstract class AbstractBinaryWriter implements BinaryWriter
{
@Override
public void writeBlob(int pos, Blob blob) {
- byte[] bytes = blob.toBytes();
+ byte[] bytes = blob.toData();
writeBinary(pos, bytes, 0, bytes.length);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
index a063297a9d..b7d49e01c1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
@@ -19,7 +19,10 @@
package org.apache.paimon.data;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.UriReader;
import java.io.IOException;
@@ -31,7 +34,33 @@ import java.io.IOException;
@Public
public interface Blob {
- byte[] toBytes();
+ byte[] toData();
+
+ BlobDescriptor toDescriptor();
SeekableInputStream newInputStream() throws IOException;
+
+ static Blob fromData(byte[] data) {
+ return new BlobData(data);
+ }
+
+ static Blob fromLocal(String file) {
+ return fromFile(LocalFileIO.create(), file);
+ }
+
+ static Blob fromHttp(String uri) {
+ return fromDescriptor(UriReader.fromHttp(), new BlobDescriptor(uri, 0,
-1));
+ }
+
+ static Blob fromFile(FileIO fileIO, String file) {
+ return fromFile(fileIO, file, 0, -1);
+ }
+
+ static Blob fromFile(FileIO fileIO, String file, long offset, long length)
{
+ return fromDescriptor(UriReader.fromFile(fileIO), new
BlobDescriptor(file, offset, length));
+ }
+
+ static Blob fromDescriptor(UriReader reader, BlobDescriptor descriptor) {
+ return new BlobRef(reader, descriptor);
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
index fd9a32cd65..e5e5d53aad 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobData.java
@@ -44,10 +44,15 @@ public class BlobData implements Blob, Serializable {
}
@Override
- public byte[] toBytes() {
+ public byte[] toData() {
return data;
}
+ @Override
+ public BlobDescriptor toDescriptor() {
+ throw new RuntimeException("Blob data can not convert to descriptor.");
+ }
+
@Override
public SeekableInputStream newInputStream() throws IOException {
return new ByteArraySeekableStream(data);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
new file mode 100644
index 0000000000..52e9667cbf
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Blob descriptor to describe a blob reference. */
+public class BlobDescriptor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String uri;
+ private final long offset;
+ private final long length;
+
+ public BlobDescriptor(String uri, long offset, long length) {
+ this.uri = uri;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public String uri() {
+ return uri;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BlobDescriptor that = (BlobDescriptor) o;
+ return offset == that.offset && length == that.length &&
Objects.equals(uri, that.uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(uri, offset, length);
+ }
+
+ @Override
+ public String toString() {
+ return "BlobDescriptor{"
+ + "uri='"
+ + uri
+ + '\''
+ + ", offset="
+ + offset
+ + ", length="
+ + length
+ + '}';
+ }
+
+ public byte[] serialize() {
+ byte[] uriBytes = uri.getBytes(UTF_8);
+ int uriLength = uriBytes.length;
+
+ int totalSize = 4 + uriLength + 8 + 8;
+ ByteBuffer buffer = ByteBuffer.allocate(totalSize);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ buffer.putInt(uriLength);
+ buffer.put(uriBytes);
+
+ buffer.putLong(offset);
+ buffer.putLong(length);
+
+ return buffer.array();
+ }
+
+ public static BlobDescriptor deserialize(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ int uriLength = buffer.getInt();
+ byte[] uriBytes = new byte[uriLength];
+ buffer.get(uriBytes);
+ String uri = new String(uriBytes, StandardCharsets.UTF_8);
+
+ long offset = buffer.getLong();
+ long length = buffer.getLong();
+
+ return new BlobDescriptor(uri, offset, length);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
similarity index 52%
rename from
paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
rename to paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
index 85e98e3ab8..0248454ee9 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileRef.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobRef.java
@@ -16,34 +16,35 @@
* limitations under the License.
*/
-package org.apache.paimon.format.blob;
+package org.apache.paimon.data;
-import org.apache.paimon.data.Blob;
-import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.OffsetSeekableInputStream;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.UriReader;
import java.io.IOException;
+import java.util.Objects;
-/** BlobRef is a reference to a blob file. */
-public class BlobFileRef implements Blob {
+/**
+ * A {@link Blob} refers blob in {@link BlobDescriptor}.
+ *
+ * @since 1.4.0
+ */
+@Public
+public class BlobRef implements Blob {
- private final FileIO fileIO;
- private final Path filePath;
- private final long blobLength;
- private final long blobOffset;
+ private final UriReader uriReader;
+ private final BlobDescriptor descriptor;
- public BlobFileRef(FileIO fileIO, Path filePath, long blobLength, long
blobOffset) {
- this.fileIO = fileIO;
- this.filePath = filePath;
- this.blobLength = blobLength;
- this.blobOffset = blobOffset;
+ public BlobRef(UriReader uriReader, BlobDescriptor descriptor) {
+ this.uriReader = uriReader;
+ this.descriptor = descriptor;
}
@Override
- public byte[] toBytes() {
+ public byte[] toData() {
try {
return IOUtils.readFully(newInputStream(), true);
} catch (IOException e) {
@@ -51,10 +52,30 @@ public class BlobFileRef implements Blob {
}
}
+ @Override
+ public BlobDescriptor toDescriptor() {
+ return descriptor;
+ }
+
@Override
public SeekableInputStream newInputStream() throws IOException {
- SeekableInputStream in = fileIO.newInputStream(filePath);
- // TODO validate Magic number?
- return new OffsetSeekableInputStream(in, blobOffset + 4, blobLength -
16);
+ return new OffsetSeekableInputStream(
+ uriReader.newInputStream(descriptor.uri()),
+ descriptor.offset(),
+ descriptor.length());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BlobRef blobRef = (BlobRef) o;
+ return Objects.deepEquals(descriptor, blobRef.descriptor);
+ }
+
+ @Override
+ public int hashCode() {
+ return descriptor.hashCode();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
index efc3bd2146..28f9233a37 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobSerializer.java
@@ -39,7 +39,7 @@ public class BlobSerializer extends SerializerSingleton<Blob>
{
@Override
public void serialize(Blob blob, DataOutputView target) throws IOException
{
- BinarySerializer.INSTANCE.serialize(blob.toBytes(), target);
+ BinarySerializer.INSTANCE.serialize(blob.toData(), target);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
similarity index 57%
copy from
paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
copy to
paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
index c9fad5b457..66f9e89260 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/LimitedSeekableInputStream.java
@@ -19,54 +19,39 @@
package org.apache.paimon.fs;
import java.io.IOException;
+import java.io.InputStream;
-/**
- * A {@link SeekableInputStream} wrapping another {@link SeekableInputStream}
with offset and
- * length.
- */
-public class OffsetSeekableInputStream extends SeekableInputStream {
+/** A {@link SeekableInputStream} wrapped {@link InputStream} without seek. */
+public class LimitedSeekableInputStream extends SeekableInputStream {
- private final SeekableInputStream wrapped;
- private final long offset;
- private final long length;
+ private final InputStream in;
- public OffsetSeekableInputStream(SeekableInputStream wrapped, long offset,
long length)
- throws IOException {
- this.wrapped = wrapped;
- this.offset = offset;
- this.length = length;
- wrapped.seek(offset);
+ public LimitedSeekableInputStream(InputStream in) {
+ this.in = in;
}
@Override
public void seek(long desired) throws IOException {
- wrapped.seek(offset + desired);
+ throw new UnsupportedOperationException();
}
@Override
public long getPos() throws IOException {
- return wrapped.getPos() - offset;
+ throw new UnsupportedOperationException();
}
@Override
public int read() throws IOException {
- if (getPos() >= length) {
- return -1;
- }
- return wrapped.read();
+ return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- long realLen = Math.min(len, length - getPos());
- if (realLen == 0) {
- return -1;
- }
- return wrapped.read(b, off, (int) realLen);
+ return in.read(b, off, len);
}
@Override
public void close() throws IOException {
- wrapped.close();
+ in.close();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
index c9fad5b457..94e2cf33e2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/OffsetSeekableInputStream.java
@@ -35,7 +35,9 @@ public class OffsetSeekableInputStream extends
SeekableInputStream {
this.wrapped = wrapped;
this.offset = offset;
this.length = length;
- wrapped.seek(offset);
+ if (offset != 0) {
+ wrapped.seek(offset);
+ }
}
@Override
@@ -50,19 +52,23 @@ public class OffsetSeekableInputStream extends
SeekableInputStream {
@Override
public int read() throws IOException {
- if (getPos() >= length) {
- return -1;
+ if (length != -1) {
+ if (getPos() >= length) {
+ return -1;
+ }
}
return wrapped.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- long realLen = Math.min(len, length - getPos());
- if (realLen == 0) {
- return -1;
+ if (length != -1) {
+ len = (int) Math.min(len, length - getPos());
+ if (len == 0) {
+ return -1;
+ }
}
- return wrapped.read(b, off, (int) realLen);
+ return wrapped.read(b, off, len);
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
new file mode 100644
index 0000000000..5640a4c893
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.LimitedSeekableInputStream;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.rest.HttpClientUtils;
+
+import java.io.IOException;
+
+/** An interface to read uri as a stream. */
+public interface UriReader {
+
+ SeekableInputStream newInputStream(String uri) throws IOException;
+
+ static UriReader fromFile(FileIO fileIO) {
+ return new FileUriReader(fileIO);
+ }
+
+ static UriReader fromHttp() {
+ return new HttpUriReader();
+ }
+
+ /** A {@link UriReader} uses {@link FileIO} to read file. */
+ class FileUriReader implements UriReader {
+
+ private final FileIO fileIO;
+
+ public FileUriReader(FileIO fileIO) {
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(String uri) throws
IOException {
+ return fileIO.newInputStream(new Path(uri));
+ }
+ }
+
+ /** A {@link UriReader} reads http uri. */
+ class HttpUriReader implements UriReader {
+
+ @Override
+ public SeekableInputStream newInputStream(String uri) throws
IOException {
+ return new
LimitedSeekableInputStream(HttpClientUtils.getAsInputStream(uri));
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
new file mode 100644
index 0000000000..92e3b8e6a8
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A factory to create and cache {@link UriReader}. */
+public class UriReaderFactory {
+
+ private final CatalogContext context;
+ private final Map<UriKey, UriReader> readers;
+
+ public UriReaderFactory(CatalogContext context) {
+ this.context = context;
+ this.readers = new ConcurrentHashMap<>();
+ }
+
+ public UriReader create(String input) {
+ URI uri = URI.create(input);
+ UriKey key = new UriKey(uri.getScheme(), uri.getAuthority());
+ return readers.computeIfAbsent(key, k -> newReader(k, uri));
+ }
+
+ private UriReader newReader(UriKey key, URI uri) {
+ if ("http".equals(key.scheme) || "https".equals(key.scheme)) {
+ return UriReader.fromHttp();
+ }
+
+ try {
+ FileIO fileIO = FileIO.get(new Path(uri), context);
+ return UriReader.fromFile(fileIO);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final class UriKey {
+
+ private final @Nullable String scheme;
+ private final @Nullable String authority;
+
+ public UriKey(@Nullable String scheme, @Nullable String authority) {
+ this.scheme = scheme;
+ this.authority = authority;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UriKey uriKey = (UriKey) o;
+ return Objects.equals(scheme, uriKey.scheme)
+ && Objects.equals(authority, uriKey.authority);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(scheme, authority);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
index c7fae18163..04181eb2a8 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryRowTest.java
@@ -974,7 +974,7 @@ public class BinaryRowTest {
Blob blob0 = row.getBlob(0);
assertThat(blob0).isInstanceOf(BlobData.class);
- assertThat(blob0.toBytes()).isEqualTo(new byte[] {1, 3, 1});
+ assertThat(blob0.toData()).isEqualTo(new byte[] {1, 3, 1});
assertThat(row.isNullAt(1)).isTrue();
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
new file mode 100644
index 0000000000..5cd3a06f55
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BlobDescriptor}. */
+public class BlobDescriptorTest {
+
+ @Test
+ public void testEquals() {
+ String uri1 = "/test/path1";
+ String uri2 = "/test/path2";
+
+ BlobDescriptor descriptor1 = new BlobDescriptor(uri1, 100L, 200L);
+ BlobDescriptor descriptor2 = new BlobDescriptor(uri1, 100L, 200L);
+ BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L);
+ BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L);
+ BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L);
+
+ assertThat(descriptor1).isEqualTo(descriptor2);
+ assertThat(descriptor1).isNotEqualTo(descriptor3);
+ assertThat(descriptor1).isNotEqualTo(descriptor4);
+ assertThat(descriptor1).isNotEqualTo(descriptor5);
+ assertThat(descriptor1).isNotEqualTo(null);
+ assertThat(descriptor1).isNotEqualTo(new Object());
+ }
+
+ @Test
+ public void testHashCode() {
+ String uri = "/test/path";
+
+ BlobDescriptor descriptor1 = new BlobDescriptor(uri, 100L, 200L);
+ BlobDescriptor descriptor2 = new BlobDescriptor(uri, 100L, 200L);
+
+ assertThat(descriptor1.hashCode()).isEqualTo(descriptor2.hashCode());
+ }
+
+ @Test
+ public void testToString() {
+ String uri = "/test/path";
+ BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L);
+
+ String toString = descriptor.toString();
+ assertThat(toString).contains("uri='/test/path'");
+ assertThat(toString).contains("offset=100");
+ assertThat(toString).contains("length=200");
+ }
+
+ @Test
+ public void testSerializeAndDeserialize() {
+ String uri = "/test/path";
+ long offset = 100L;
+ long length = 200L;
+
+ BlobDescriptor original = new BlobDescriptor(uri, offset, length);
+ byte[] serialized = original.serialize();
+ BlobDescriptor deserialized = BlobDescriptor.deserialize(serialized);
+
+ assertThat(deserialized.uri()).isEqualTo(original.uri());
+ assertThat(deserialized.offset()).isEqualTo(original.offset());
+ assertThat(deserialized.length()).isEqualTo(original.length());
+ assertThat(deserialized).isEqualTo(original);
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java
new file mode 100644
index 0000000000..27703662da
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link Blob}. */
+public class BlobTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ private String file;
+
+ @BeforeEach
+ void beforeEach() throws IOException {
+ file = new File(tempPath.toString(), "test.txt").getAbsolutePath();
+ Files.write(Paths.get(file), "test data".getBytes());
+ }
+
+ @Test
+ public void testFromData() {
+ byte[] testData = "test data".getBytes();
+ Blob blob = Blob.fromData(testData);
+ assertThat(blob).isInstanceOf(BlobData.class);
+ assertThat(blob.toData()).isEqualTo(testData);
+ }
+
+ @Test
+ public void testFromLocal() {
+ Blob blob = Blob.fromLocal(file);
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ assertThat(blob.toData()).isEqualTo("test data".getBytes());
+ }
+
+ @Test
+ public void testFromFile() {
+ Blob blob = Blob.fromFile(LocalFileIO.create(), file, 0, 4);
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ assertThat(blob.toData()).isEqualTo("test".getBytes());
+ }
+
+ @Test
+ public void testFromPath() {
+ Blob blob = Blob.fromFile(LocalFileIO.create(), file);
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ assertThat(blob.toData()).isEqualTo("test data".getBytes());
+ }
+
+ @Test
+ public void testFromHttp() {
+ String uri = "http://example.com/file.txt";
+ Blob blob = Blob.fromHttp(uri);
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
index fa8a14ced5..94f7c47fa0 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/OffsetSeekableInputStreamTest.java
@@ -25,6 +25,9 @@ import java.io.IOException;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/** Tests for {@link OffsetSeekableInputStream}. */
public class OffsetSeekableInputStreamTest {
@@ -146,4 +149,28 @@ public class OffsetSeekableInputStreamTest {
assertThat(bytesRead).isEqualTo(-1);
}
}
+
+ @Test
+ public void testClose() throws IOException {
+ SeekableInputStream mockStream = mock(SeekableInputStream.class);
+ OffsetSeekableInputStream offsetStream = new
OffsetSeekableInputStream(mockStream, 0, 10);
+ offsetStream.close();
+ verify(mockStream, times(1)).close();
+ }
+
+ @Test
+ public void testReadWithUnlimitedLength() throws IOException {
+ long offset = 5;
+ long length = -1; // Unlimited length
+ try (OffsetSeekableInputStream stream =
+ new OffsetSeekableInputStream(wrapped, offset, length)) {
+ // Should be able to read beyond the original testData length
+ byte[] buffer = new byte[10];
+ int bytesRead = stream.read(buffer, 0, 10);
+
+ assertThat(bytesRead).isEqualTo(10);
+ assertThat(buffer).containsExactly(Arrays.copyOfRange(testData, 5,
15));
+ assertThat(stream.getPos()).isEqualTo(10);
+ }
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
new file mode 100644
index 0000000000..a3b6db3d75
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.UriReader.FileUriReader;
+import org.apache.paimon.utils.UriReader.HttpUriReader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UriReaderFactory}. */
+public class UriReaderFactoryTest {
+
+ private final UriReaderFactory factory =
+ new UriReaderFactory(CatalogContext.create(new Options()));
+
+ @Test
+ public void testCreateHttpUriReader() {
+ UriReader reader = factory.create("http://example.com/file.txt");
+ assertThat(reader).isInstanceOf(HttpUriReader.class);
+ }
+
+ @Test
+ public void testCreateHttpsUriReader() {
+ UriReader reader = factory.create("https://example.com/file.txt");
+ assertThat(reader).isInstanceOf(HttpUriReader.class);
+ }
+
+ @Test
+ public void testCreateFileUriReader() {
+ UriReader reader = factory.create("file:///path/to/file.txt");
+ assertThat(reader).isInstanceOf(FileUriReader.class);
+ }
+
+ @Test
+ public void testCreateUriReaderWithAuthority() {
+ UriReader reader1 =
factory.create("http://my_bucket1/path/to/file.txt");
+ UriReader reader2 =
factory.create("http://my_bucket2/path/to/file.txt");
+ assertThat(reader1).isNotEqualTo(reader2);
+ }
+
+ @Test
+ public void testCachedReadersWithSameSchemeAndAuthority() {
+ UriReader reader1 =
factory.create("http://my_bucket/path/to/file1.txt");
+ UriReader reader2 =
factory.create("http://my_bucket/path/to/file2.txt");
+ assertThat(reader1).isSameAs(reader2);
+ }
+
+ @Test
+ public void testCachedReadersWithNullAuthority() {
+ UriReader reader1 = factory.create("file:///path/to/file1.txt");
+ UriReader reader2 = factory.create("file:///path/to/file2.txt");
+ assertThat(reader1).isSameAs(reader2);
+ }
+
+ @Test
+ public void testCreateUriReaderWithLocalPath() {
+ UriReader reader = factory.create("/local/path/to/file.txt");
+ assertThat(reader).isInstanceOf(FileUriReader.class);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 5df78212f1..03e47194d0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -80,23 +80,23 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
- protected final Options catalogOptions;
+ protected final CatalogContext context;
protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
this.tableDefaultOptions = new HashMap<>();
- this.catalogOptions = new Options();
+ this.context = CatalogContext.create(new Options());
}
- protected AbstractCatalog(FileIO fileIO, Options options) {
+ protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
this.fileIO = fileIO;
- this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(options.toMap());
- this.catalogOptions = options;
+ this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(context.options().toMap());
+ this.context = context;
}
@Override
public Map<String, String> options() {
- return catalogOptions.toMap();
+ return context.options().toMap();
}
public abstract String warehouse();
@@ -114,7 +114,7 @@ public abstract class AbstractCatalog implements Catalog {
return Optional.empty();
}
- String lock = catalogOptions.get(LOCK_TYPE);
+ String lock = context.options().get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactory();
}
@@ -129,11 +129,11 @@ public abstract class AbstractCatalog implements Catalog {
}
public Optional<CatalogLockContext> lockContext() {
- return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
+ return Optional.of(CatalogLockContext.fromOptions(context.options()));
}
protected boolean lockEnabled() {
- return
catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
+ return
context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}
protected boolean allowCustomTablePath() {
@@ -474,7 +474,8 @@ public abstract class AbstractCatalog implements Catalog {
this::fileIO,
this::loadTableMetadata,
lockFactory().orElse(null),
- lockContext().orElse(null));
+ lockContext().orElse(null),
+ context);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 8be164b1b1..d26642e046 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -214,7 +214,8 @@ public class CatalogUtils {
Function<Path, FileIO> externalFileIO,
TableMetadata.Loader metadataLoader,
@Nullable CatalogLockFactory lockFactory,
- @Nullable CatalogLockContext lockContext)
+ @Nullable CatalogLockContext lockContext,
+ @Nullable CatalogContext catalogContext)
throws Catalog.TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -258,6 +259,7 @@ public class CatalogUtils {
catalog.catalogLoader(),
lockFactory,
lockContext,
+ catalogContext,
catalog.supportsVersionManagement());
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index da56253335..cea266f703 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -21,7 +21,6 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
-import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -49,8 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog {
this.warehouse = warehouse;
}
- public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
- super(fileIO, options);
+ public FileSystemCatalog(FileIO fileIO, Path warehouse, CatalogContext
context) {
+ super(fileIO, context);
this.warehouse = warehouse;
}
@@ -198,11 +197,11 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public CatalogLoader catalogLoader() {
- return new FileSystemCatalogLoader(fileIO, warehouse, catalogOptions);
+ return new FileSystemCatalogLoader(fileIO, warehouse, context);
}
@Override
public boolean caseSensitive() {
- return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true);
+ return context.options().getOptional(CASE_SENSITIVE).orElse(true);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 353361b298..d17844baea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -41,6 +41,6 @@ public class FileSystemCatalogFactory implements
CatalogFactory {
"Only managed table is supported in File system catalog.");
}
- return new FileSystemCatalog(fileIO, warehouse, context.options());
+ return new FileSystemCatalog(fileIO, warehouse, context);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
index 0fb5da6c4d..6a0cead0be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java
@@ -20,25 +20,24 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
/** Loader to create {@link FileSystemCatalog}. */
public class FileSystemCatalogLoader implements CatalogLoader {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final FileIO fileIO;
private final Path warehouse;
- private final Options options;
+ private final CatalogContext context;
- public FileSystemCatalogLoader(FileIO fileIO, Path warehouse, Options
options) {
+ public FileSystemCatalogLoader(FileIO fileIO, Path warehouse,
CatalogContext context) {
this.fileIO = fileIO;
this.warehouse = warehouse;
- this.options = options;
+ this.context = context;
}
@Override
public Catalog load() {
- return new FileSystemCatalog(fileIO, warehouse, options);
+ return new FileSystemCatalog(fileIO, warehouse, context);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 22a9939d20..0891bad2bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.jdbc;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
@@ -85,10 +86,11 @@ public class JdbcCatalog extends AbstractCatalog {
private final Options options;
private final String warehouse;
- protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options,
String warehouse) {
- super(fileIO, options);
+ protected JdbcCatalog(
+ FileIO fileIO, String catalogKey, CatalogContext context, String
warehouse) {
+ super(fileIO, context);
this.catalogKey = catalogKey;
- this.options = options;
+ this.options = context.options();
this.warehouse = warehouse;
Preconditions.checkNotNull(options, "Invalid catalog properties:
null");
this.connections =
@@ -153,7 +155,7 @@ public class JdbcCatalog extends AbstractCatalog {
@Override
public CatalogLoader catalogLoader() {
- return new JdbcCatalogLoader(fileIO, catalogKey, options, warehouse);
+ return new JdbcCatalogLoader(fileIO, catalogKey, context, warehouse);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
index 6c3c1d0e41..b2998e5578 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java
@@ -39,6 +39,6 @@ public class JdbcCatalogFactory implements CatalogFactory {
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
- return new JdbcCatalog(fileIO, catalogKey, context.options(),
warehouse.toString());
+ return new JdbcCatalog(fileIO, catalogKey, context,
warehouse.toString());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
index a1fa36e4b2..492d1a84b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java
@@ -19,29 +19,30 @@
package org.apache.paimon.jdbc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.options.Options;
/** Loader to create {@link JdbcCatalog}. */
public class JdbcCatalogLoader implements CatalogLoader {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final FileIO fileIO;
private final String catalogKey;
- private final Options options;
+ private final CatalogContext context;
private final String warehouse;
- public JdbcCatalogLoader(FileIO fileIO, String catalogKey, Options
options, String warehouse) {
+ public JdbcCatalogLoader(
+ FileIO fileIO, String catalogKey, CatalogContext context, String
warehouse) {
this.fileIO = fileIO;
this.catalogKey = catalogKey;
- this.options = options;
+ this.context = context;
this.warehouse = warehouse;
}
@Override
public Catalog load() {
- return new JdbcCatalog(fileIO, catalogKey, options, warehouse);
+ return new JdbcCatalog(fileIO, catalogKey, context, warehouse);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9179e71aa9..58f85a48cc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -278,7 +278,8 @@ public class RESTCatalog implements Catalog {
this::fileIOFromOptions,
this::loadTableMetadata,
null,
- null);
+ null,
+ context);
}
@Override
@@ -426,7 +427,8 @@ public class RESTCatalog implements Catalog {
this::fileIOFromOptions,
i -> toTableMetadata(db, response),
null,
- null);
+ null,
+ context);
} catch (TableNotExistException e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index fbc1b894a4..0f61f8fa09 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
@@ -42,13 +43,14 @@ import java.util.Optional;
/** Catalog environment in table which contains log factory, metastore client
factory. */
public class CatalogEnvironment implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
@Nullable private final Identifier identifier;
@Nullable private final String uuid;
@Nullable private final CatalogLoader catalogLoader;
@Nullable private final CatalogLockFactory lockFactory;
@Nullable private final CatalogLockContext lockContext;
+ @Nullable private final CatalogContext catalogContext;
private final boolean supportsVersionManagement;
public CatalogEnvironment(
@@ -57,17 +59,19 @@ public class CatalogEnvironment implements Serializable {
@Nullable CatalogLoader catalogLoader,
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext,
+ @Nullable CatalogContext catalogContext,
boolean supportsVersionManagement) {
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
this.lockFactory = lockFactory;
this.lockContext = lockContext;
+ this.catalogContext = catalogContext;
this.supportsVersionManagement = supportsVersionManagement;
}
public static CatalogEnvironment empty() {
- return new CatalogEnvironment(null, null, null, null, null, false);
+ return new CatalogEnvironment(null, null, null, null, null, null,
false);
}
@Nullable
@@ -132,6 +136,11 @@ public class CatalogEnvironment implements Serializable {
return catalogLoader;
}
+ @Nullable
+ public CatalogContext catalogContext() {
+ return catalogContext;
+ }
+
public CatalogEnvironment copy(Identifier identifier) {
return new CatalogEnvironment(
identifier,
@@ -139,6 +148,7 @@ public class CatalogEnvironment implements Serializable {
catalogLoader,
lockFactory,
lockContext,
+ catalogContext,
supportsVersionManagement);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index dcd27a91ed..04f67c018d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -36,8 +36,9 @@ public class FileSystemCatalogTest extends CatalogTestBase {
@BeforeEach
public void setUp() throws Exception {
super.setUp();
- Options catalogOptions = new Options();
- catalog = new FileSystemCatalog(fileIO, new Path(warehouse),
catalogOptions);
+ catalog =
+ new FileSystemCatalog(
+ fileIO, new Path(warehouse), CatalogContext.create(new
Options()));
}
@Test
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
b/paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
similarity index 50%
copy from paimon-common/src/main/java/org/apache/paimon/data/Blob.java
copy to paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
index a063297a9d..cc03cd1fed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-core/src/test/java/org/apache/paimon/data/BlobHttpTest.java
@@ -18,20 +18,39 @@
package org.apache.paimon.data;
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.rest.TestHttpWebServer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
-/**
- * Blob interface, provides bytes and input stream methods.
- *
- * @since 1.4.0
- */
-@Public
-public interface Blob {
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Blob} http. */
+public class BlobHttpTest {
+
+ private TestHttpWebServer server;
+
+ @Before
+ public void setUp() throws Exception {
+ server = new TestHttpWebServer("/my_path");
+ server.start();
+ }
- byte[] toBytes();
+ @After
+ public void tearDown() throws IOException {
+ server.stop();
+ }
- SeekableInputStream newInputStream() throws IOException;
+ @Test
+ public void testHttp() {
+ String data = "my_data";
+ server.enqueueResponse(data, 200);
+ Blob blob = Blob.fromHttp(server.getBaseUrl());
+ assertThat(blob).isInstanceOf(BlobRef.class);
+ assertThat(new String(blob.toData(), UTF_8)).isEqualTo(data);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 419c8576e1..9e106ad966 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.jdbc;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
@@ -63,7 +64,10 @@ public class JdbcCatalogTest extends CatalogTestBase {
properties.putAll(props);
JdbcCatalog catalog =
new JdbcCatalog(
- fileIO, "test-jdbc-catalog",
Options.fromMap(properties), warehouse);
+ fileIO,
+ "test-jdbc-catalog",
+ CatalogContext.create(Options.fromMap(properties)),
+ warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
return catalog;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
index b6b3e28b8c..d6bdc2438f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java
@@ -121,7 +121,7 @@ public class PostgresqlCatalogTest {
new JdbcCatalog(
fileIO,
"test-jdbc-postgres-catalog",
- Options.fromMap(properties),
+ CatalogContext.create(Options.fromMap(properties)),
warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
return catalog;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 4c48671add..9f3903bd9b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -137,7 +137,7 @@ public class PartitionExpireTest {
};
CatalogEnvironment env =
- new CatalogEnvironment(null, null, null, null, null, false) {
+ new CatalogEnvironment(null, null, null, null, null, null,
false) {
@Override
public PartitionHandler partitionHandler() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 08f3363c0e..c4b996436d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -161,6 +161,7 @@ public class RESTCatalogServer {
private final String databaseUri;
+ private final CatalogContext catalogContext;
private final FileSystemCatalog catalog;
private final MockWebServer server;
@@ -190,7 +191,7 @@ public class RESTCatalogServer {
Options conf = new Options();
this.configResponse.getDefaults().forEach(conf::setString);
conf.setString(WAREHOUSE.key(), dataPath);
- CatalogContext context = CatalogContext.create(conf);
+ this.catalogContext = CatalogContext.create(conf);
Path warehousePath = new Path(dataPath);
FileIO fileIO;
try {
@@ -199,7 +200,7 @@ public class RESTCatalogServer {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- this.catalog = new FileSystemCatalog(fileIO, warehousePath,
context.options());
+ this.catalog = new FileSystemCatalog(fileIO, warehousePath,
catalogContext);
Dispatcher dispatcher = initDispatcher(authProvider);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
@@ -2270,6 +2271,7 @@ public class RESTCatalogServer {
catalog.catalogLoader(),
catalog.lockFactory().orElse(null),
catalog.lockContext().orElse(null),
+ catalogContext,
false);
Path path = new Path(schema.options().get(PATH.key()));
FileIO dataFileIO = catalog.fileIO();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
index 94abe72864..5a3f118d81 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
@@ -43,7 +43,7 @@ public class TestAlterTableCatalogFactory implements
CatalogFactory {
@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext
context) {
- return new FileSystemCatalog(fileIO, warehouse, context.options()) {
+ return new FileSystemCatalog(fileIO, warehouse, context) {
@Override
public void alterTable(
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 4a1a1d6444..46f8be153e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.blob;
+import org.apache.paimon.data.Blob;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -121,14 +122,14 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
return null;
}
- BlobFileRef blobRef =
- new BlobFileRef(
+ Blob blob =
+ Blob.fromFile(
fileIO,
- filePath,
- blobLengths[currentPosition],
- blobOffsets[currentPosition]);
+ filePath.toString(),
+ blobOffsets[currentPosition] + 4,
+ blobLengths[currentPosition] - 16);
currentPosition++;
- return GenericRow.of(blobRef);
+ return GenericRow.of(blob);
}
@Override
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index d30a669a29..005a78c24c 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -83,7 +83,7 @@ public class BlobFileFormatTest {
List<byte[]> result = new ArrayList<>();
readerFactory
.createReader(context)
- .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+ .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
// assert
assertThat(result).containsExactlyElementsOf(blobs);
@@ -95,7 +95,7 @@ public class BlobFileFormatTest {
result.clear();
readerFactory
.createReader(context)
- .forEachRemaining(row -> result.add(row.getBlob(0).toBytes()));
+ .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
// assert
assertThat(result).containsOnly(blobs.get(1));
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 6e3bf4782c..5485fc6b6b 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -164,19 +164,19 @@ public class HiveCatalog extends AbstractCatalog {
private final LocationHelper locationHelper;
public HiveCatalog(FileIO fileIO, HiveConf hiveConf, String
clientClassName, String warehouse) {
- this(fileIO, hiveConf, clientClassName, new Options(), warehouse);
+ this(fileIO, hiveConf, clientClassName, CatalogContext.create(new
Options()), warehouse);
}
public HiveCatalog(
FileIO fileIO,
HiveConf hiveConf,
String clientClassName,
- Options options,
+ CatalogContext context,
String warehouse) {
- super(fileIO, options);
+ super(fileIO, context);
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
- this.options = options;
+ this.options = context.options();
this.warehouse = warehouse;
boolean needLocationInProperties =
@@ -206,7 +206,7 @@ public class HiveCatalog extends AbstractCatalog {
public Optional<CatalogLockContext> lockContext() {
return Optional.of(
new HiveCatalogLockContext(
- new SerializableHiveConf(hiveConf), clientClassName,
catalogOptions));
+ new SerializableHiveConf(hiveConf), clientClassName,
options));
}
@Override
@@ -1202,7 +1202,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public boolean caseSensitive() {
- return catalogOptions.getOptional(CASE_SENSITIVE).orElse(false);
+ return options.getOptional(CASE_SENSITIVE).orElse(false);
}
@Override
@@ -1211,7 +1211,7 @@ public class HiveCatalog extends AbstractCatalog {
}
public boolean syncAllProperties() {
- return catalogOptions.get(SYNC_ALL_PROPERTIES);
+ return options.get(SYNC_ALL_PROPERTIES);
}
@Override
@@ -1328,7 +1328,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public CatalogLoader catalogLoader() {
return new HiveCatalogLoader(
- fileIO, new SerializableHiveConf(hiveConf), clientClassName,
options, warehouse);
+ fileIO, new SerializableHiveConf(hiveConf), clientClassName,
context, warehouse);
}
public Table getHmsTable(Identifier identifier)
@@ -1715,7 +1715,7 @@ public class HiveCatalog extends AbstractCatalog {
fileIO,
hiveConf,
options.get(HiveCatalogOptions.METASTORE_CLIENT_CLASS),
- options,
+ context,
warehouse.toUri().toString());
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
index ff389434d4..b7ed1c3ff8 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java
@@ -19,36 +19,36 @@
package org.apache.paimon.hive;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.options.Options;
/** Loader to create {@link HiveCatalog}. */
public class HiveCatalogLoader implements CatalogLoader {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final FileIO fileIO;
private final SerializableHiveConf hiveConf;
private final String clientClassName;
- private final Options options;
+ private final CatalogContext context;
private final String warehouse;
public HiveCatalogLoader(
FileIO fileIO,
SerializableHiveConf hiveConf,
String clientClassName,
- Options options,
+ CatalogContext context,
String warehouse) {
this.fileIO = fileIO;
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
- this.options = options;
+ this.context = context;
this.warehouse = warehouse;
}
@Override
public Catalog load() {
- return new HiveCatalog(fileIO, hiveConf.conf(), clientClassName,
options, warehouse);
+ return new HiveCatalog(fileIO, hiveConf.conf(), clientClassName,
context, warehouse);
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index d6fa84efb8..3043a57464 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
@@ -79,8 +80,8 @@ public class HiveCatalogTest extends CatalogTestBase {
String metastoreClientClass =
"org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.SYNC_ALL_PROPERTIES.key(), "false");
- catalog =
- new HiveCatalog(fileIO, hiveConf, metastoreClientClass,
catalogOptions, warehouse);
+ CatalogContext context = CatalogContext.create(catalogOptions,
hiveConf);
+ catalog = new HiveCatalog(fileIO, hiveConf, metastoreClientClass,
context, warehouse);
}
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
index 33016fd083..35065eb43b 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveTableStatsTest.java
@@ -64,7 +64,7 @@ public class HiveTableStatsTest {
CatalogContext catalogContext = CatalogContext.create(catalogOptions);
FileIO fileIO = FileIO.get(new Path(warehouse), catalogContext);
catalog =
- new HiveCatalog(fileIO, hiveConf, metastoreClientClass,
catalogOptions, warehouse);
+ new HiveCatalog(fileIO, hiveConf, metastoreClientClass,
catalogContext, warehouse);
}
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
index 691c514dfa..6108633398 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
@@ -421,7 +421,7 @@ public class TestCachedClientPool {
fileIO,
hiveConf,
metastoreClientClass,
- options,
+
CatalogContext.create(options),
warehouse);
} catch (Exception e) {
throw new RuntimeException(e);