Author: tomekr Date: Tue Mar 20 10:54:09 2018 New Revision: 1827292 URL: http://svn.apache.org/viewvc?rev=1827292&view=rev Log: OAK-6922: Azure support for the segment-tar
Added: jackrabbit/oak/trunk/oak-segment-azure/ jackrabbit/oak/trunk/oak-segment-azure/pom.xml jackrabbit/oak/trunk/oak-segment-azure/src/ jackrabbit/oak/trunk/oak-segment-azure/src/main/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java jackrabbit/oak/trunk/oak-segment-azure/src/test/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh (with props) Modified: jackrabbit/oak/trunk/pom.xml Added: jackrabbit/oak/trunk/oak-segment-azure/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/pom.xml?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/pom.xml (added) +++ jackrabbit/oak/trunk/oak-segment-azure/pom.xml Tue Mar 20 10:54:09 2018 @@ -0,0 +1,160 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-parent</artifactId> + <version>1.10-SNAPSHOT</version> + <relativePath>../oak-parent/pom.xml</relativePath> + </parent> + + <artifactId>oak-segment-azure</artifactId> + <packaging>bundle</packaging> + + <name>Oak Segment Azure</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Export-Package></Export-Package> + <Embed-Dependency> + azure-storage, + azure-keyvault-core + </Embed-Dependency> + </instructions> + </configuration> + <executions> + <execution> + <id>baseline</id> + <goals> + <goal>baseline</goal> + </goals> + <phase>pre-integration-test</phase> + <configuration> + <!-- + This is required because there's no prior (stable) version of oak-segment-azure + This should be removed post first release + --> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <!-- ====================================================================== --> + <!-- D E P E N D E N C I E S --> + <!-- ====================================================================== --> + <dependencies> + <!-- Optional OSGi dependencies, used only when running within OSGi --> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.annotation</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.component.annotations</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.metatype.annotations</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Dependencies to other Oak components --> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-segment-tar</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-store-spi</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Azure Blob Storage dependency --> + <dependency> + <groupId>com.microsoft.azure</groupId> + <artifactId>azure-storage</artifactId> + <version>5.0.0</version> + </dependency> + <dependency> + <groupId>com.microsoft.azure</groupId> + <artifactId>azure-keyvault-core</artifactId> + <version>0.9.7</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-segment-tar</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-store-spi</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.arakelian</groupId> + <artifactId>docker-junit-rule</artifactId> + <version>2.1.0</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.CopyStatus; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName; + +public class AzureArchiveManager implements SegmentArchiveManager { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class); + + private final CloudBlobDirectory cloudBlobDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + this.cloudBlobDirectory = cloudBlobDirectory; + this.ioMonitor = ioMonitor; + this.monitor = fileStoreMonitor; + } + + @Override + public List<String> listArchives() throws IOException { + try { + return StreamSupport.stream(cloudBlobDirectory + .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null) + .spliterator(), false) + .filter(i -> i instanceof CloudBlobDirectory) + .map(i -> (CloudBlobDirectory) i) + .map(CloudBlobDirectory::getPrefix) + .map(Paths::get) + .map(Path::getFileName) + .map(Path::toString) + .collect(Collectors.toList()); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public SegmentArchiveReader open(String archiveName) throws IOException { + try { + CloudBlobDirectory archiveDirectory = getDirectory(archiveName); + if (!archiveDirectory.getBlockBlobReference("closed").exists()) { + throw new IOException("The archive " + archiveName + " hasn't been closed correctly."); + } + return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + public SegmentArchiveWriter create(String archiveName) throws IOException { + return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor); + } + + @Override + public boolean delete(String archiveName) { + try { + getBlobs(archiveName) + .forEach(cloudBlob -> { + try { + cloudBlob.delete(); + } catch (StorageException e) { + log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't delete archive {}", archiveName, e); + return false; + } + } + + @Override + public boolean renameTo(String from, String to) { + try { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + renameBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't rename archive {} to {}", from, to, e); + return false; + } + } + + @Override + public void copyFile(String from, String to) throws IOException { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + copyBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e); + } + }); + } + + @Override + public boolean exists(String archiveName) { + try { + return listArchives().contains(archiveName); + } catch (IOException e) { + log.error("Can't check the existence of {}", archiveName, e); + return false; + } + } + + @Override + public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException { + Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN); + List<RecoveredEntry> entryList = new ArrayList<>(); + + for (CloudBlob b : getBlobList(archiveName)) { + String name = getName(b); + Matcher m = pattern.matcher(name); + if (!m.matches()) { + continue; + } + int position = Integer.parseInt(m.group(1), 16); + UUID uuid = UUID.fromString(m.group(2)); + long length = b.getProperties().getLength(); + if (length > 0) { + byte[] data = new byte[(int) length]; + try { + b.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + entryList.add(new RecoveredEntry(position, uuid, data, name)); + } + } + Collections.sort(entryList); + + int i = 0; + for (RecoveredEntry e : entryList) { + if (e.position != i) { + log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName); + break; + } + log.info("Recovering segment {}/{}", archiveName, e.fileName); + entries.put(e.uuid, e.data); + i++; + } + } + + + private CloudBlobDirectory getDirectory(String archiveName) throws IOException { + try { + return cloudBlobDirectory.getDirectoryReference(archiveName); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + private Stream<CloudBlob> getBlobs(String archiveName) throws IOException { + return AzureUtilities.getBlobs(getDirectory(archiveName)); + } + + private List<CloudBlob> getBlobList(String archiveName) throws IOException { + return getBlobs(archiveName).collect(Collectors.toList()); + } + + private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + copyBlob(blob, newParent); + try { + blob.delete(); + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename"); + try { + String blobName = getName(blob); + CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName); + newBlob.startCopy(blob.getUri()); + while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) { + Thread.sleep(100); + } + + CopyStatus finalStatus = newBlob.getCopyState().getStatus(); + if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) { + throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus); + } + } catch (StorageException | InterruptedException | URISyntaxException e) { + throw new IOException(e); + } + } + + private static class RecoveredEntry implements Comparable<RecoveredEntry> { + + private final byte[] data; + + private final UUID uuid; + + private final int position; + + private final String fileName; + + public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) { + this.data = data; + this.uuid = uuid; + this.position = position; + this.fileName = fileName; + } + + @Override + public int compareTo(RecoveredEntry o) { + return Integer.compare(this.position, o.position); + } + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public final class AzureBlobMetadata { + + private static final String METADATA_TYPE = "type"; + + private static final String METADATA_SEGMENT_UUID = "segment-uuid"; + + private static final String METADATA_SEGMENT_POSITION = "segment-position"; + + private static final String METADATA_SEGMENT_GENERATION = "segment-generation"; + + private static final String METADATA_SEGMENT_FULL_GENERATION = "segment-fullGeneration"; + + private static final String METADATA_SEGMENT_COMPACTED = "segment-compacted"; + + private static final String TYPE_SEGMENT = "segment"; + + public static HashMap<String, String> toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) { + HashMap<String, String> map = new HashMap<>(); + map.put(METADATA_TYPE, TYPE_SEGMENT); + map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString()); + map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition())); + map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration())); + map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration())); + map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted())); + return map; + } + + public static AzureSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) { + UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID)); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + int position = Integer.parseInt(metadata.get(METADATA_SEGMENT_POSITION)); + int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION)); + int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION)); + boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED)); + return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted); + } + + public static boolean isSegment(Map<String, String> metadata) { + return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE)); + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Charsets; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.List; + +public class AzureGCJournalFile implements GCJournalFile { + + private final CloudAppendBlob gcJournal; + + public AzureGCJournalFile(CloudAppendBlob gcJournal) { + this.gcJournal = gcJournal; + } + + @Override + public void writeLine(String line) throws IOException { + try { + if (!gcJournal.exists()) { + gcJournal.createOrReplace(); + } + gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public List<String> readLines() throws IOException { + try { + if (!gcJournal.exists()) { + return Collections.emptyList(); + } + byte[] data = new byte[(int) gcJournal.getProperties().getLength()]; + gcJournal.downloadToByteArray(data, 0); + return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset()); + } catch (StorageException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AzureJournalFile implements JournalFile { + + private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class); + + private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000); + + private final CloudBlobDirectory directory; + + private final String journalNamePrefix; + + private final int lineLimit; + + AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) { + this.directory = directory; + this.journalNamePrefix = journalNamePrefix; + this.lineLimit = lineLimit; + } + + public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) { + this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT); + } + + @Override + public JournalFileReader openJournalReader() throws IOException { + return new CombinedReader(getJournalBlobs()); + } + + @Override + public JournalFileWriter openJournalWriter() throws IOException { + return new AzureJournalWriter(); + } + + @Override + public String getName() { + return journalNamePrefix; + } + + @Override + public boolean exists() { + try { + return !getJournalBlobs().isEmpty(); + } catch (IOException e) { + log.error("Can't check if the file exists", e); + return false; + } + } + + private String getJournalFileName(int index) { + return String.format("%s.%03d", journalNamePrefix, index); + } + + private List<CloudAppendBlob> getJournalBlobs() throws IOException { + try { + List<CloudAppendBlob> result = new ArrayList<>(); + for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) { + if (b instanceof CloudAppendBlob) { + result.add((CloudAppendBlob) b); + } else { + log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass()); + } + } + result.sort(Comparator.<CloudAppendBlob, String>comparing(AzureUtilities::getName).reversed()); + return result; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private static class AzureJournalReader implements JournalFileReader { + + private final CloudBlob blob; + + private ReverseFileReader reader; + + private AzureJournalReader(CloudBlob blob) { + this.blob = blob; + } + + @Override + public String readLine() throws IOException { + if (reader == null) { + try { + reader = new ReverseFileReader(blob); + } catch (StorageException e) { + throw new IOException(e); + } + } + return reader.readLine(); + } + + @Override + public void close() throws IOException { + } + } + + private class AzureJournalWriter implements JournalFileWriter { + + private CloudAppendBlob currentBlob; + + private int blockCount; + + public AzureJournalWriter() throws IOException { + List<CloudAppendBlob> blobs = getJournalBlobs(); + if (blobs.isEmpty()) { + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(1)); + currentBlob.createOrReplace(); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } else { + currentBlob = blobs.get(0); + } + Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount(); + blockCount = bc == null ? 0 : bc; + } + + @Override + public void truncate() throws IOException { + try { + for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) { + cloudAppendBlob.delete(); + } + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public void writeLine(String line) throws IOException { + if (blockCount >= lineLimit) { + createNewFile(); + } + try { + currentBlob.appendText(line + "\n"); + blockCount++; + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void createNewFile() throws IOException { + String name = AzureUtilities.getName(currentBlob); + Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" ); + Matcher matcher = pattern.matcher(name); + int parsedSuffix; + if (matcher.find()) { + String suffix = matcher.group(1); + try { + parsedSuffix = Integer.parseInt(suffix); + } catch (NumberFormatException e) { + log.warn("Can't parse suffix for journal file {}", name); + parsedSuffix = 0; + } + } else { + log.warn("Can't parse journal file name {}", name); + parsedSuffix = 0; + } + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1)); + currentBlob.createOrReplace(); + blockCount = 0; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class CombinedReader implements JournalFileReader { + + private final Iterator<AzureJournalReader> readers; + + private JournalFileReader currentReader; + + private CombinedReader(List<CloudAppendBlob> blobs) { + readers = blobs.stream().map(AzureJournalReader::new).iterator(); + } + + @Override + public String readLine() throws IOException { + String line; + do { + if (currentReader == null) { + if (!readers.hasNext()) { + return null; + } + currentReader = readers.next(); + } + do { + line = currentReader.readLine(); + } while ("".equals(line)); + if (line == null) { + currentReader.close(); + currentReader = null; + } + } while (line == null); + return line; + } + + @Override + public void close() throws IOException { + while (readers.hasNext()) { + readers.next().close(); + } + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + } + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Properties; + +public class AzureManifestFile implements ManifestFile { + + private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class); + + private final CloudBlockBlob manifestBlob; + + public AzureManifestFile(CloudBlockBlob manifestBlob) { + this.manifestBlob = manifestBlob; + } + + @Override + public boolean exists() { + try { + return manifestBlob.exists(); + } catch (StorageException e) { + log.error("Can't check if the manifest exists", e); + return false; + } + } + + @Override + public Properties load() throws IOException { + Properties properties = new Properties(); + if (exists()) { + long length = manifestBlob.getProperties().getLength(); + byte[] data = new byte[(int) length]; + try { + manifestBlob.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + properties.load(new ByteArrayInputStream(data)); + } + return properties; + } + + @Override + public void save(Properties properties) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + properties.store(bos, null); + + byte[] data = bos.toByteArray(); + try { + manifestBlob.uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.EnumSet; + +public class AzurePersistence implements SegmentNodeStorePersistence { + + private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class); + + private final CloudBlobDirectory segmentstoreDirectory; + + public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) { + this.segmentstoreDirectory = segmentstoreDirectory; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor); + } + + @Override + public boolean segmentFilesExist() { + try { + for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) { + if (i instanceof CloudBlobDirectory) { + CloudBlobDirectory dir = (CloudBlobDirectory) i; + String name = Paths.get(dir.getPrefix()).getFileName().toString(); + if (name.endsWith(".tar")) { + return true; + } + } + } + return false; + } catch (StorageException | URISyntaxException e) { + log.error("Can't check if the segment archives exists", e); + return false; + } + } + + @Override + public JournalFile getJournalFile() { + return new AzureJournalFile(segmentstoreDirectory, "journal.log"); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return new AzureGCJournalFile(getAppendBlob("gc.log")); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return new AzureManifestFile(getBlockBlob("manifest")); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() { + @Override + public void run() { + log.warn("Lost connection to the Azure. The client will be closed."); + // TODO close the connection + } + }).lock(); + } + + private CloudBlockBlob getBlockBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getBlockBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private CloudAppendBlob getAppendBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getAppendBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class AzureRepositoryLock implements RepositoryLock { + + private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class); + + private static int INTERVAL = 60; + + private final Runnable shutdownHook; + + private final CloudBlockBlob blob; + + private final ExecutorService executor; + + private String leaseId; + + private volatile boolean doUpdate; + + public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) { + this.shutdownHook = shutdownHook; + this.blob = blob; + this.executor = Executors.newSingleThreadExecutor(); + } + + public AzureRepositoryLock lock() throws IOException { + try { + blob.openOutputStream().close(); + leaseId = blob.acquireLease(INTERVAL, null); + log.info("Acquired lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + executor.submit(this::refreshLease); + return this; + } + + private void refreshLease() { + doUpdate = true; + long lastUpdate = 0; + while (doUpdate) { + try { + long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000; + if (timeSinceLastUpdate > INTERVAL / 2) { + blob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + lastUpdate = System.currentTimeMillis(); + } + } catch (StorageException e) { + log.error("Can't renew the lease", e); + shutdownHook.run(); + doUpdate = false; + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted the lease renewal loop", e); + } + } + } + + @Override + public void unlock() throws IOException { + doUpdate = false; + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + releaseLease(); + } + } + + private void releaseLease() throws IOException { + try { + blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); + blob.delete(); + log.info("Released lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; + +public class AzureSegmentArchiveEntry implements IndexEntry { + + private final long msb; + + private final long lsb; + + private final int position; + + private final int length; + + private final int generation; + + private final int fullGeneration; + + private final boolean compacted; + + public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) { + this.msb = msb; + this.lsb = lsb; + this.position = position; + this.length = length; + this.generation = generation; + this.fullGeneration = fullGeneration; + this.compacted = compacted; + } + + @Override + public long getMsb() { + return msb; + } + + @Override + public long getLsb() { + return lsb; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public int getLength() { + return length; + } + + @Override + public int getGeneration() { + return generation; + } + + @Override + public int getFullGeneration() { + return fullGeneration; + } + + @Override + public boolean isCompacted() { + return compacted; + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveReader implements SegmentArchiveReader { + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final long length; + + private final Map<UUID, AzureSegmentArchiveEntry> index = new LinkedHashMap<>(); + + private Boolean hasGraph; + + AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) throws IOException { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + long length = 0; + try { + for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory).collect(Collectors.toList())) { + blob.downloadAttributes(); + Map<String, String> metadata = blob.getMetadata(); + if (AzureBlobMetadata.isSegment(metadata)) { + AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength()); + index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry); + } + length += blob.getProperties().getLength(); + } + } catch (StorageException e) { + throw new IOException(e); + } + this.length = length; + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb)); + ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength()); + ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength()); + Stopwatch stopwatch = Stopwatch.createStarted(); + readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer); + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed); + return buffer; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return index.containsKey(new UUID(msb, lsb)); + } + + @Override + public List<SegmentArchiveEntry> listSegments() { + return new ArrayList<>(index.values()); + } + + @Override + public ByteBuffer getGraph() throws IOException { + ByteBuffer graph = readBlob(getName() + ".gph"); + hasGraph = graph != null; + return graph; + } + + @Override + public boolean hasGraph() { + if (hasGraph == null) { + try { + getGraph(); + } catch (IOException ignore) { } + } + return hasGraph; + } + + @Override + public ByteBuffer getBinaryReferences() throws IOException { + return readBlob(getName() + ".brf"); + } + + @Override + public long length() { + return length; + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public int getEntrySize(int size) { + return size; + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private ByteBuffer readBlob(String name) throws IOException { + try { + CloudBlockBlob blob = getBlob(name); + if (!blob.exists()) { + return null; + } + long length = blob.getProperties().getLength(); + ByteBuffer buffer = ByteBuffer.allocate((int) length); + AzureUtilities.readBufferFully(blob, buffer); + return buffer; + } catch (StorageException e) { + throw new IOException(e); + } + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveWriter implements SegmentArchiveWriter { + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final Optional<SegmentWriteQueue> queue; + + private Map<UUID, AzureSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>()); + + private int entries; + + private long totalLength; + + private volatile boolean created = false; + + public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty(); + } + + @Override + public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException { + created = true; + + AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted); + if (queue.isPresent()) { + queue.get().addToQueue(entry, data, offset, size); + } else { + doWriteEntry(entry, data, offset, size); + } + index.put(new UUID(msb, lsb), entry); + + totalLength += size; + monitor.written(size); + } + + private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException { + long msb = indexEntry.getMsb(); + long lsb = indexEntry.getLsb(); + ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size); + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + CloudBlockBlob blob = getBlob(getSegmentFileName(indexEntry)); + blob.setMetadata(AzureBlobMetadata.toSegmentMetadata(indexEntry)); + blob.uploadFromByteArray(data, offset, size); + blob.uploadMetadata(); + } catch (StorageException e) { + throw new IOException(e); + } + ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + UUID uuid = new UUID(msb, lsb); + Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid)); + if (segment.isPresent()) { + return segment.get().toByteBuffer(); + } + AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb)); + if (indexEntry == null) { + return null; + } + ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength()); + readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer); + return buffer; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + UUID uuid = new UUID(msb, lsb); + Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid)); + if (segment.isPresent()) { + return true; + } + return index.containsKey(new UUID(msb, lsb)); + } + + @Override + public void writeGraph(byte[] data) throws IOException { + writeDataFile(data, ".gph"); + } + + @Override + public void writeBinaryReferences(byte[] data) throws IOException { + writeDataFile(data, ".brf"); + } + + private void writeDataFile(byte[] data, String extension) throws IOException { + try { + getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + totalLength += data.length; + monitor.written(data.length); + } + + @Override + public long getLength() { + return totalLength; + } + + @Override + public void close() throws IOException { + if (queue.isPresent()) { // required to handle IOException + SegmentWriteQueue q = queue.get(); + q.flush(); + q.close(); + } + try { + getBlob("closed").uploadFromByteArray(new byte[0], 0, 0); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public boolean isCreated() { + return created || !queueIsEmpty(); + } + + @Override + public void flush() throws IOException { + if (queue.isPresent()) { // required to handle IOException + queue.get().flush(); + } + } + + private boolean queueIsEmpty() { + return queue.map(SegmentWriteQueue::isEmpty).orElse(true); + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + + +@Component( + configurationPolicy = ConfigurationPolicy.REQUIRE, + configurationPid = {Configuration.PID}) +public class AzureSegmentStoreService { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class); + + public static final String DEFAULT_CONTAINER_NAME = "oak"; + + public static final String DEFAULT_ROOT_PATH = "/oak"; + + private ServiceRegistration registration; + + private SegmentNodeStorePersistence persistence; + + @Activate + public void activate(ComponentContext context, Configuration config) throws IOException { + persistence = createAzurePersistence(config); + registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties()); + } + + @Deactivate + public void deactivate() throws IOException { + if (registration != null) { + registration.unregister(); + registration = null; + } + persistence = null; + } + + private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException { + try { + StringBuilder connectionString = new StringBuilder(); + if (configuration.connectionURL() != null) { + connectionString.append(configuration.connectionURL()); + } else { + connectionString.append("DefaultEndpointsProtocol=https;"); + connectionString.append("AccountName=").append(configuration.accountName()).append(';'); + connectionString.append("AccountKey=").append(configuration.accessKey()).append(';'); + } + log.info("Connection string: {}", connectionString.toString()); + CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString()); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.containerName()); + container.createIfNotExists(); + + String path = configuration.rootPath(); + if (path != null && path.length() > 0 && path.charAt(0) == '/') { + path = path.substring(1); + } + + AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path)); + return persistence; + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new IOException(e); + } + } + +} + Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public final class AzureUtilities { + + public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$"; + + private AzureUtilities() { + } + + public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) { + return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb()); + } + + public static String getSegmentFileName(long offset, long msb, long lsb) { + return String.format("%04x.%s", offset, new UUID(msb, lsb).toString()); + } + + public static String getName(CloudBlob blob) { + return Paths.get(blob.getName()).getFileName().toString(); + } + + public static String getName(CloudBlobDirectory directory) { + return Paths.get(directory.getUri().getPath()).getFileName().toString(); + } + + public static Stream<CloudBlob> getBlobs(CloudBlobDirectory directory) throws IOException { + try { + return StreamSupport.stream(directory.listBlobs().spliterator(), false) + .filter(i -> i instanceof CloudBlob) + .map(i -> (CloudBlob) i); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + public static long getDirectorySize(CloudBlobDirectory directory) throws IOException { + long size = 0; + for (CloudBlob b : getBlobs(directory).collect(Collectors.toList())) { + try { + b.downloadAttributes(); + } catch (StorageException e) { + throw new IOException(e); + } + size += b.getProperties().getLength(); + } + return size; + } + + public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException { + try { + buffer.rewind(); + long readBytes = blob.downloadToByteArray(buffer.array(), 0); + if (buffer.limit() != readBytes) { + throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes); + } + } catch (StorageException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; + +import static org.apache.jackrabbit.oak.segment.azure.Configuration.PID; + +@ObjectClassDefinition( + pid = {PID}, + name = "Apache Jackrabbit Oak Azure Segment Store Service", + description = "Azure backend for the Oak Segment Node Store") +@interface Configuration { + + String PID = "org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService"; + + @AttributeDefinition( + name = "Azure account name", + description = "Name of the Azure Storage account to use.") + String accountName(); + + @AttributeDefinition( + name = "Azure container name", + description = "Name of the container to use. If it doesn't exists, it'll be created.") + String containerName() default AzureSegmentStoreService.DEFAULT_CONTAINER_NAME; + + @AttributeDefinition( + name = "Azure account access key", + description = "Access key which should be used to authenticate on the account") + String accessKey(); + + @AttributeDefinition( + name = "Root path", + description = "Names of all the created blobs will be prefixed with this path") + String rootPath() default AzureSegmentStoreService.DEFAULT_ROOT_PATH; + + @AttributeDefinition( + name = "Azure connection URL (optional)", + description = "Connection URL to be used to connect to the Azure Storage. " + + "Setting it will override the accountName, containerName and accessKey properties.") + String connectionURL(); +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.lang.Math.min; + +public class ReverseFileReader { + + private static final int BUFFER_SIZE = 16 * 1024; + + private int bufferSize; + + private final CloudBlob blob; + + private byte[] buffer; + + private int bufferOffset; + + private int fileOffset; + + public ReverseFileReader(CloudBlob blob) throws StorageException { + this(blob, BUFFER_SIZE); + } + + public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException { + this.blob = blob; + if (blob.exists()) { + this.fileOffset = (int) blob.getProperties().getLength(); + } else { + this.fileOffset = 0; + } + this.bufferSize = bufferSize; + } + + private void readBlock() throws IOException { + if (buffer == null) { + buffer = new byte[min(fileOffset, bufferSize)]; + } else if (fileOffset < buffer.length) { + buffer = new byte[fileOffset]; + } + + if (buffer.length > 0) { + fileOffset -= buffer.length; + try { + blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0); + } catch (StorageException e) { + throw new IOException(e); + } + } + bufferOffset = buffer.length; + } + + private String readUntilNewLine() { + if (bufferOffset == -1) { + return ""; + } + int stop = bufferOffset; + while (--bufferOffset >= 0) { + if (buffer[bufferOffset] == '\n') { + break; + } + } + // bufferOffset points either the previous '\n' character or -1 + return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset()); + } + + public String readLine() throws IOException { + if (bufferOffset == -1 && fileOffset == 0) { + return null; + } + + if (buffer == null) { + readBlock(); + } + + List<String> result = new ArrayList<>(1); + while (true) { + result.add(readUntilNewLine()); + if (bufferOffset > -1) { // stopped on the '\n' + break; + } + if (fileOffset == 0) { // reached the beginning of the file + break; + } + readBlock(); + } + Collections.reverse(result); + return String.join("", result); + } +}