seropian commented on code in PR #2409:
URL: https://github.com/apache/jackrabbit-oak/pull/2409#discussion_r2419052725
##########
oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java:
##########
@@ -546,142 +457,183 @@ public void deleteRecord(DataIdentifier identifier)
throws DataStoreException {
@Override
public void addMetadataRecord(InputStream input, String name) throws
DataStoreException {
- if (null == input) {
- throw new NullPointerException("input");
- }
- if (StringUtils.isEmpty(name)) {
- throw new IllegalArgumentException("name");
- }
- long start = System.currentTimeMillis();
+ Objects.requireNonNull(input, "input must not be null");
+ Validate.checkArgument(StringUtils.isNotEmpty(name), "name should not
be empty");
+ Stopwatch stopwatch = Stopwatch.createStarted();
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- addMetadataRecordImpl(input, name, -1L);
- LOG.debug("Metadata record added. metadataName={} duration={}",
name, (System.currentTimeMillis() - start));
+ addMetadataRecordImpl(input, name, -1);
+ LOG.debug("Metadata record added. metadataName={} duration={}",
name, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
finally {
- if (null != contextClassLoader) {
+ if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
- public void addMetadataRecord(File input, String name) throws
DataStoreException {
- if (null == input) {
- throw new NullPointerException("input");
- }
- if (StringUtils.isEmpty(name)) {
- throw new IllegalArgumentException("name");
- }
- long start = System.currentTimeMillis();
+ public void addMetadataRecord(File inputFile, String name) throws
DataStoreException {
+ Objects.requireNonNull(inputFile, "input must not be null");
+ Validate.checkArgument(StringUtils.isNoneEmpty(name), "name should not
be empty");
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
- addMetadataRecordImpl(new FileInputStream(input), name,
input.length());
- LOG.debug("Metadata record added. metadataName={} duration={}",
name, (System.currentTimeMillis() - start));
- }
- catch (FileNotFoundException e) {
+ try (InputStream input = new FileInputStream(inputFile)) {
+ addMetadataRecordImpl(input, name, inputFile.length());
+ }
+ LOG.debug("Metadata record added. metadataName={} duration={}",
name, stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (IOException e) {
throw new DataStoreException(e);
- }
- finally {
- if (null != contextClassLoader) {
+ } finally {
+ if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
+ private BlockBlobClient getMetaBlobClient(String name) throws
DataStoreException {
+ return
getAzureContainer().getBlobClient(AzureConstants.AZURE_BlOB_META_DIR_NAME + "/"
+ name).getBlockBlobClient();
+ }
+
private void addMetadataRecordImpl(final InputStream input, String name,
long recordLength) throws DataStoreException {
try {
- CloudBlobDirectory metaDir =
getAzureContainer().getDirectoryReference(META_DIR_NAME);
- CloudBlockBlob blob = metaDir.getBlockBlobReference(name);
- addLastModified(blob);
- blob.upload(input, recordLength);
- }
- catch (StorageException e) {
+ BlockBlobClient blockBlobClient = getMetaBlobClient(name);
+
+ // If length is unknown (-1), use a file buffer for the stream
first
+ // This is necessary because Azure SDK requires a known length for
upload
+ // and loading the entire stream into memory is too risky
+ if (recordLength < 0) {
+ LOG.debug("Metadata record length unknown. metadataName={}.
Saving to temporary file before upload", name);
+ File tempFile = createTempFileFromStream(input, name, ".tmp");
+ LOG.debug("Metadata record temporary file created.
metadataName={} path={}", name, tempFile.getAbsolutePath());
+ try (InputStream fis = new BufferedInputStream(new
FileInputStream(tempFile))) {
+ blockBlobClient.upload(fis, tempFile.length(), true);
+ } finally {
+ tempFile.delete();
+ }
+ } else {
+ LOG.debug("Metadata record length known: {} bytes.
metadataName={}. Uploading directly", recordLength, name);
+ InputStream markableInput = input.markSupported() ? input :
new BufferedInputStream(input);
+ blockBlobClient.upload(markableInput, recordLength, true);
+ }
+ updateLastModifiedMetadata(blockBlobClient);
+ } catch (BlobStorageException | IOException e) {
LOG.info("Error adding metadata record. metadataName={}
length={}", name, recordLength, e);
throw new DataStoreException(e);
}
- catch (URISyntaxException | IOException e) {
- throw new DataStoreException(e);
+ }
+
+ /**
+ * Saves an InputStream to a temporary file with automatic cleanup support.
+ *
+ * <p>This method creates a temporary file and copies the entire contents
of the input stream
+ * to it. The temporary file is marked for deletion on JVM exit as a
safety measure, but
+ * callers should explicitly delete it when done.</p>
+ *
+ * @param input The InputStream to save to a temporary file
+ * @param prefix The prefix string for the temporary file name (min 3
characters)
+ * @param suffix The suffix string for the temporary file name (null =
".tmp")
+ * @return A File object representing the temporary file
+ * @throws IOException if an I/O error occurs
+ */
+ private File createTempFileFromStream(InputStream input, String prefix,
String suffix) throws IOException {
+ Objects.requireNonNull(input, "input must not be null");
+
+ Path tempPath = null;
+ try {
+ // Create temporary file
+ tempPath = Files.createTempFile(prefix + "-" + UUID.randomUUID(),
suffix);
Review Comment:
fixed
##########
oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java:
##########
@@ -307,159 +266,117 @@ public InputStream read(DataIdentifier identifier)
throws DataStoreException {
}
}
- @Override
- public void write(DataIdentifier identifier, File file) throws
DataStoreException {
- if (null == identifier) {
- throw new NullPointerException("identifier");
+ private void tryClose(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close the InputStream {}", is, ioe);
+ }
}
- if (null == file) {
- throw new NullPointerException("file");
+ }
+
+ private void uploadBlob(BlockBlobClient client, File file, long len,
Stopwatch stopwatch, String key) throws IOException {
+
+ boolean useBufferedStream = len < AZURE_BLOB_BUFFERED_STREAM_THRESHOLD;
+ try (InputStream in = useBufferedStream ?
+ new BufferedInputStream(new FileInputStream(file))
+ : new FileInputStream(file)) {
+
+ ParallelTransferOptions parallelTransferOptions = new
ParallelTransferOptions()
+ .setBlockSizeLong(len)
+ .setMaxConcurrency(concurrentRequestCount)
+
.setMaxSingleUploadSizeLong(AZURE_BLOB_MAX_SINGLE_PUT_UPLOAD_SIZE);
+ BlobUploadFromFileOptions options = new
BlobUploadFromFileOptions(file.toString());
+ options.setParallelTransferOptions(parallelTransferOptions);
+ try {
+ BlobClient blobClient =
client.getContainerClient().getBlobClient(key);
+ Response<BlockBlobItem> blockBlob =
blobClient.uploadFromFileWithResponse(options, null, null);
+ LOG.debug("Upload status is {} for blob {}",
blockBlob.getStatusCode(), key);
+ } catch (UncheckedIOException ex) {
+ System.err.printf("Failed to upload from file: %s%n",
ex.getMessage());
Review Comment:
fixed
##########
oak-blob-cloud-azure/src/main/java/org/apache/jackrabbit/oak/blob/cloud/azure/blobstorage/AzureBlobStoreBackend.java:
##########
@@ -468,75 +385,69 @@ public DataRecord getRecord(DataIdentifier identifier)
throws DataStoreException
}
@Override
- public Iterator<DataIdentifier> getAllIdentifiers() {
- return new RecordsIterator<>(
- input -> new
DataIdentifier(getIdentifierName(input.getName())));
+ public Iterator<DataIdentifier> getAllIdentifiers() throws
DataStoreException {
+ return getAzureContainer().listBlobs().stream()
+ .map(blobItem -> new
DataIdentifier(getIdentifierName(blobItem.getName())))
+ .iterator();
}
-
-
@Override
- public Iterator<DataRecord> getAllRecords() {
+ public Iterator<DataRecord> getAllRecords() throws DataStoreException {
final AbstractSharedBackend backend = this;
- return new RecordsIterator<>(
- input -> new AzureBlobStoreDataRecord(
+ final BlobContainerClient containerClient = getAzureContainer();
+ return containerClient.listBlobs().stream()
+ .map(blobItem -> (DataRecord) new AzureBlobStoreDataRecord(
backend,
azureBlobContainerProvider,
- new DataIdentifier(getIdentifierName(input.getName())),
- input.getLastModified(),
- input.getLength())
- );
+ new
DataIdentifier(getIdentifierName(blobItem.getName())),
+
getLastModified(containerClient.getBlobClient(blobItem.getName()).getBlockBlobClient()),
+ blobItem.getProperties().getContentLength()))
+ .iterator();
}
@Override
public boolean exists(DataIdentifier identifier) throws DataStoreException
{
- long start = System.currentTimeMillis();
+ Stopwatch stopwatch = Stopwatch.createStarted();
String key = getKeyName(identifier);
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
- boolean exists
=getAzureContainer().getBlockBlobReference(key).exists();
- LOG.debug("Blob exists={} identifier={} duration={}", exists, key,
(System.currentTimeMillis() - start));
+ boolean exists =
getAzureContainer().getBlobClient(key).getBlockBlobClient().exists();
+ LOG.debug("Blob exists={} identifier={} duration={}", exists, key,
stopwatch.elapsed(TimeUnit.MILLISECONDS));
return exists;
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new DataStoreException(e);
- }
- finally {
- if (null != contextClassLoader) {
+ } finally {
+ if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
}
@Override
- public void close() throws DataStoreException {
+ public void close(){
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]