This is an automated email from the ASF dual-hosted git repository. hqtran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f5e18fdc139ce09323618f98d187db16b770bfd7 Author: Quan Tran <[email protected]> AuthorDate: Fri Apr 10 16:53:34 2026 +0700 JAMES-4182 Implement blob metadata storage for file --- .../apache/james/blob/file/FileBlobStoreDAO.java | 143 ++++++++++++++++----- .../james/blob/file/FileBlobStoreDAOTest.java | 3 +- 2 files changed, 115 insertions(+), 31 deletions(-) diff --git a/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java b/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java index 4f19207b91..b181499d9d 100644 --- a/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java +++ b/server/blob/blob-file/src/main/java/org/apache/james/blob/file/FileBlobStoreDAO.java @@ -19,19 +19,26 @@ package org.apache.james.blob.file; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.UserDefinedFileAttributeView; import java.time.Duration; import java.util.Collection; +import java.util.Map; import jakarta.inject.Inject; @@ -43,9 +50,12 @@ import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; import org.apache.james.filesystem.api.FileSystem; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import reactor.core.publisher.Flux; @@ -54,6 +64,8 @@ import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; public class FileBlobStoreDAO implements BlobStoreDAO { + private static final Logger LOGGER = LoggerFactory.getLogger(FileBlobStoreDAO.class); + private static final String JAMES_BLOB_METADATA_ATTRIBUTE_PREFIX = "james-blob-metadata-"; private final File root; private final BlobId.Factory blobIdFactory; @@ -69,7 +81,7 @@ public class FileBlobStoreDAO implements BlobStoreDAO { File bucketRoot = getBucketRoot(bucketName); File blob = new File(bucketRoot, blobId.asString()); try { - return InputStreamBlob.of(new FileInputStream(blob)); + return InputStreamBlob.of(new FileInputStream(blob), readMetadata(blob.toPath())); } catch (FileNotFoundException e) { throw new ObjectNotFoundException(String.format("Cannot locate %s within %s", blobId.asString(), bucketName.asString()), e); } @@ -98,39 +110,38 @@ public class FileBlobStoreDAO implements BlobStoreDAO { return Mono.fromCallable(() -> { File bucketRoot = getBucketRoot(bucketName); File blob = new File(bucketRoot, blobId.asString()); - return FileUtils.readFileToByteArray(blob); + return BytesBlob.of(FileUtils.readFileToByteArray(blob), readMetadata(blob.toPath())); }).onErrorResume(NoSuchFileException.class, e -> Mono.error(new ObjectNotFoundException(String.format("Cannot locate %s within %s", blobId.asString(), bucketName.asString()), e))) - .subscribeOn(Schedulers.boundedElastic()) - .map(BytesBlob::of); + .subscribeOn(Schedulers.boundedElastic()); } @Override public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob blob) { return switch (blob) { - case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); - case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); - case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload(), bytesBlob.metadata()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload(), inputStreamBlob.metadata()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload(), byteSourceBlob.metadata()); }; } - public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, BlobMetadata metadata) { Preconditions.checkNotNull(data); return Mono.fromRunnable(() -> { File bucketRoot = getBucketRoot(bucketName); File blob = new File(bucketRoot, blobId.asString()); - save(data, blob); + save(data, blob, metadata); }) .subscribeOn(Schedulers.boundedElastic()) .then(); } - public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream, BlobMetadata metadata) { Preconditions.checkNotNull(inputStream); return Mono.fromRunnable(() -> { File bucketRoot = getBucketRoot(bucketName); File blob = new File(bucketRoot, blobId.asString()); - save(inputStream, blob); + save(inputStream, blob, metadata); }) .subscribeOn(Schedulers.boundedElastic()) .then() @@ -138,35 +149,38 @@ public class FileBlobStoreDAO implements BlobStoreDAO { .filter(e -> e instanceof OverlappingFileLockException)); } - private void save(InputStream inputStream, File blob) { - if (blob.exists()) { - return; - } - - try (FileOutputStream out = new FileOutputStream(blob); - FileChannel channel = out.getChannel(); - FileLock fileLock = channel.lock()) { - inputStream.transferTo(out); + private void save(InputStream inputStream, File blob, BlobMetadata metadata) { + File tempFile = createTempFile(blob); + boolean tempFileHandled = false; + try { + // Overwrites blob (and its metadata) should be supported. Prepare payload and metadata on a temp file first, + // then atomically replace the target blob so concurrent saves do not expose partially written data. + writeToTempFile(tempFile, inputStream, metadata); + replaceBlob(tempFile, blob); + tempFileHandled = true; } catch (IOException e) { throw new ObjectStoreIOException("IOException occured", e); + } finally { + if (!tempFileHandled) { + FileUtils.deleteQuietly(tempFile); + } } } - private void save(byte[] data, File blob) { - if (blob.exists()) { - return; - } + private void save(byte[] data, File blob, BlobMetadata metadata) { + save(new ByteArrayInputStream(data), blob, metadata); + } - try (FileOutputStream out = new FileOutputStream(blob); + private void writeToTempFile(File tempFile, InputStream inputStream, BlobMetadata metadata) throws IOException { + try (FileOutputStream out = new FileOutputStream(tempFile); FileChannel channel = out.getChannel(); FileLock fileLock = channel.lock()) { - out.write(data); - } catch (IOException e) { - throw new ObjectStoreIOException("IOException occured", e); + inputStream.transferTo(out); + writeMetadata(tempFile.toPath(), metadata); } } - public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content, BlobMetadata metadata) { return Mono.fromCallable(() -> { try { return content.read(); @@ -174,7 +188,7 @@ public class FileBlobStoreDAO implements BlobStoreDAO { throw new ObjectStoreIOException("IOException occured", e); } }) - .flatMap(bytes -> save(bucketName, blobId, bytes)); + .flatMap(bytes -> save(bucketName, blobId, bytes, metadata)); } @Override @@ -226,4 +240,73 @@ public class FileBlobStoreDAO implements BlobStoreDAO { .map(path -> blobIdFactory.parse(path.getFileName().toString())) .subscribeOn(Schedulers.boundedElastic()); } + + private BlobMetadata readMetadata(Path path) { + UserDefinedFileAttributeView attributeView = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + if (attributeView == null) { + return BlobMetadata.empty(); + } + + try { + return new BlobMetadata(attributeView.list().stream() + .filter(this::isBlobMetadataAttribute) + .collect(ImmutableMap.toImmutableMap( + this::asBlobMetadataName, + Throwing.function((String attributeName) -> new BlobMetadataValue(readFileAttributeValue(attributeView, attributeName))) + .sneakyThrow()))); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occured", e); + } + } + + private void writeMetadata(Path path, BlobMetadata metadata) throws IOException { + if (metadata.equals(BlobMetadata.empty())) { + return; + } + + UserDefinedFileAttributeView attributeView = Files.getFileAttributeView(path, UserDefinedFileAttributeView.class); + if (attributeView == null) { + LOGGER.warn("Skipping blob metadata persistence for {} because user defined file attributes are not supported by the file system", path); + return; + } + writeMetadata(attributeView, metadata); + } + + private void writeMetadata(UserDefinedFileAttributeView attributeView, BlobMetadata metadata) throws IOException { + for (Map.Entry<BlobMetadataName, BlobMetadataValue> entry : metadata.underlyingMap().entrySet()) { + attributeView.write(asFileAttributeName(entry.getKey()), StandardCharsets.UTF_8.encode(entry.getValue().value())); + } + } + + private boolean isBlobMetadataAttribute(String extendedAttributeName) { + return extendedAttributeName.startsWith(JAMES_BLOB_METADATA_ATTRIBUTE_PREFIX); + } + + private String asFileAttributeName(BlobMetadataName metadataName) { + return JAMES_BLOB_METADATA_ATTRIBUTE_PREFIX + metadataName.name(); + } + + private BlobMetadataName asBlobMetadataName(String attributeName) { + return new BlobMetadataName(attributeName.substring(JAMES_BLOB_METADATA_ATTRIBUTE_PREFIX.length())); + } + + private String readFileAttributeValue(UserDefinedFileAttributeView attributeView, String attributeName) throws IOException { + ByteBuffer byteBuffer = ByteBuffer.allocate(attributeView.size(attributeName)); + attributeView.read(attributeName, byteBuffer); + byteBuffer.flip(); + return StandardCharsets.UTF_8.decode(byteBuffer).toString(); + } + + private File createTempFile(File blob) { + try { + return Files.createTempFile(blob.getParentFile().toPath(), blob.getName(), ".tmp").toFile(); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occured", e); + } + } + + private void replaceBlob(File tempFile, File target) throws IOException { + Files.move(tempFile.toPath(), target.toPath(), StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + } + } diff --git a/server/blob/blob-file/src/test/java/org/apache/james/blob/file/FileBlobStoreDAOTest.java b/server/blob/blob-file/src/test/java/org/apache/james/blob/file/FileBlobStoreDAOTest.java index f08b490c9f..9fd98c0747 100644 --- a/server/blob/blob-file/src/test/java/org/apache/james/blob/file/FileBlobStoreDAOTest.java +++ b/server/blob/blob-file/src/test/java/org/apache/james/blob/file/FileBlobStoreDAOTest.java @@ -21,12 +21,13 @@ package org.apache.james.blob.file; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.server.core.filesystem.FileSystemImpl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -class FileBlobStoreDAOTest implements BlobStoreDAOContract { +class FileBlobStoreDAOTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { private FileBlobStoreDAO blobStore; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
