Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java?rev=1824115&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java Tue Feb 13 11:17:42 2018 @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.CRC32; + +import static com.google.common.base.Charsets.UTF_8; +import static java.nio.ByteBuffer.wrap; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; + +public class SegmentTarManager implements SegmentArchiveManager { + + /** + * Pattern of the segment entry names. Note the trailing (\\..*)? group + * that's included for compatibility with possible future extensions. + */ + private static final Pattern NAME_PATTERN = Pattern.compile( + "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" + + "(\\.([0-9a-f]{8}))?(\\..*)?"); + + private static final Logger log = LoggerFactory.getLogger(SegmentTarManager.class); + + private final File segmentstoreDir; + + private final FileStoreMonitor fileStoreMonitor; + + private final IOMonitor ioMonitor; + + private final boolean memoryMapping; + + public SegmentTarManager(File segmentstoreDir, FileStoreMonitor fileStoreMonitor, IOMonitor ioMonitor, boolean memoryMapping) { + this.segmentstoreDir = segmentstoreDir; + this.fileStoreMonitor = fileStoreMonitor; + this.ioMonitor = ioMonitor; + this.memoryMapping = memoryMapping; + } + + @Override + public List<String> listArchives() { + return Arrays.asList(segmentstoreDir.list()); + } + + @Override + public SegmentArchiveReader open(String name) throws IOException { + File file = new File(segmentstoreDir, name); + RandomAccessFile access = new RandomAccessFile(file, "r"); + try { + Index index = SegmentTarReader.loadAndValidateIndex(access, name); + if (index == null) { + log.info("No index found in tar file {}, skipping...", name); + return null; + } else { + if (memoryMapping) { + try { + FileAccess mapped = new FileAccess.Mapped(access); + return new SegmentTarReader(file, mapped, index, ioMonitor); + } catch (IOException e) { + log.warn("Failed to mmap tar file {}. Falling back to normal file " + + "IO, which will negatively impact repository performance. " + + "This problem may have been caused by restrictions on the " + + "amount of virtual memory available to the JVM. Please make " + + "sure that a 64-bit JVM is being used and that the process " + + "has access to unlimited virtual memory (ulimit option -v).", + name, e); + } + } + + FileAccess random = new FileAccess.Random(access); + // prevent the finally block from closing the file + // as the returned TarReader will take care of that + access = null; + return new SegmentTarReader(file, random, index, ioMonitor); + } + } finally { + if (access != null) { + access.close(); + } + } + } + + @Override + public SegmentArchiveWriter create(String archiveName) { + return new SegmentTarWriter(new File(segmentstoreDir, archiveName), fileStoreMonitor, ioMonitor); + } + + @Override + public boolean delete(String archiveName) { + try { + return Files.deleteIfExists(new File(segmentstoreDir, archiveName).toPath()); + } catch (IOException e) { + log.error("Can't remove archive {}", archiveName, e); + return false; + } + } + + @Override + public boolean renameTo(String from, String to) { + try { + Files.move(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath()); + return true; + } catch (IOException e) { + log.error("Can't move archive {} to {}", from, to, e); + return false; + } + } + + @Override + public void copyFile(String from, String to) throws IOException { + Files.copy(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath()); + } + + @Override + public boolean exists(String archiveName) { + return new File(segmentstoreDir, archiveName).exists(); + } + + @Override + public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException { + File file = new File(segmentstoreDir, archiveName); + RandomAccessFile access = new RandomAccessFile(file, "r"); + try { + recoverEntries(file, access, entries); + } finally { + access.close(); + } + } + + /** + * Scans through the tar file, looking for all segment entries. + * + * @param file The path of the TAR file. + * @param access The contents of the TAR file. + * @param entries The map that will contain the recovered entries. The + * entries are inserted in the {@link LinkedHashMap} in the + * order they appear in the TAR file. + */ + private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap<UUID, byte[]> entries) throws IOException { + byte[] header = new byte[BLOCK_SIZE]; + while (access.getFilePointer() + BLOCK_SIZE <= access.length()) { + // read the tar header block + access.readFully(header); + + // compute the header checksum + int sum = 0; + for (int i = 0; i < BLOCK_SIZE; i++) { + sum += header[i] & 0xff; + } + + + // identify possible zero block + if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) { + return; // found the zero blocks at the end of the file + } + + // replace the actual stored checksum with spaces for comparison + for (int i = 148; i < 148 + 8; i++) { + sum -= header[i] & 0xff; + sum += ' '; + } + + byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8); + for (int i = 0; i < checkbytes.length; i++) { + if (checkbytes[i] != header[148 + i]) { + log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...", + access.getFilePointer() - BLOCK_SIZE, file); + } + } + + // The header checksum passes, so read the entry name and size + ByteBuffer buffer = wrap(header); + String name = readString(buffer, 100); + buffer.position(124); + int size = readNumber(buffer, 12); + if (access.getFilePointer() + size > access.length()) { + // checksum was correct, so the size field should be accurate + log.warn("Partial entry {} in tar file {}, ignoring...", name, file); + return; + } + + Matcher matcher = NAME_PATTERN.matcher(name); + if (matcher.matches()) { + UUID id = UUID.fromString(matcher.group(1)); + + String checksum = matcher.group(3); + if (checksum != null || !entries.containsKey(id)) { + byte[] data = new byte[size]; + access.readFully(data); + + // skip possible padding to stay at block boundaries + long position = access.getFilePointer(); + long remainder = position % BLOCK_SIZE; + if (remainder != 0) { + access.seek(position + (BLOCK_SIZE - remainder)); + } + + if (checksum != null) { + CRC32 crc = new CRC32(); + crc.update(data); + if (crc.getValue() != Long.parseLong(checksum, 16)) { + log.warn("Checksum mismatch in entry {} of tar file {}, skipping...", + name, file); + continue; + } + } + + entries.put(id, data); + } + } else if (!name.equals(file.getName() + ".idx")) { + log.warn("Unexpected entry {} in tar file {}, skipping...", + name, file); + long position = access.getFilePointer() + size; + long remainder = position % BLOCK_SIZE; + if (remainder != 0) { + position += BLOCK_SIZE - remainder; + } + access.seek(position); + } + } + } + + private static String readString(ByteBuffer buffer, int fieldSize) { + byte[] b = new byte[fieldSize]; + buffer.get(b); + int n = 0; + while (n < fieldSize && b[n] != 0) { + n++; + } + return new String(b, 0, n, UTF_8); + } + + private static int readNumber(ByteBuffer buffer, int fieldSize) { + byte[] b = new byte[fieldSize]; + buffer.get(b); + int number = 0; + for (int i = 0; i < fieldSize; i++) { + int digit = b[i] & 0xff; + if ('0' <= digit && digit <= '7') { + number = number * 8 + digit - '0'; + } else { + break; + } + } + return number; + } + +}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java?rev=1824115&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java Tue Feb 13 11:17:42 2018 @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException; +import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.file.tar.SegmentTarWriter.getPaddingSize; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; +import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader; + +public class SegmentTarReader implements SegmentArchiveManager.SegmentArchiveReader { + + private static final Logger log = LoggerFactory.getLogger(SegmentTarReader.class); + + private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE); + + private final FileAccess access; + + private final File file; + + private final IOMonitor ioMonitor; + + private final String name; + + private final Index index; + + private volatile Boolean hasGraph; + + public SegmentTarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) { + this.access = access; + this.file = file; + this.index = index; + this.name = file.getName(); + this.ioMonitor = ioMonitor; + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + int i = index.findEntry(msb, lsb); + if (i == -1) { + return null; + } + IndexEntry indexEntry = index.entry(i); + ioMonitor.beforeSegmentRead(file, msb, lsb, indexEntry.getLength()); + Stopwatch stopwatch = Stopwatch.createStarted(); + ByteBuffer buffer = access.read(indexEntry.getPosition(), indexEntry.getLength()); + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + ioMonitor.afterSegmentRead(file, msb, lsb, indexEntry.getLength(), elapsed); + return buffer; + } + + @Override + public Index getIndex() { + return index; + } + + public static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException { + long length = file.length(); + if (length % BLOCK_SIZE != 0) { + log.warn("Unable to load index of file {}: Invalid alignment", name); + return null; + } + if (length < 6 * BLOCK_SIZE) { + log.warn("Unable to load index of file {}: File too short", name); + return null; + } + if (length > Integer.MAX_VALUE) { + log.warn("Unable to load index of file {}: File too long", name); + return null; + } + ReaderAtEnd r = (whence, size) -> { + ByteBuffer buffer = ByteBuffer.allocate(size); + file.seek(length - 2 * BLOCK_SIZE - whence); + file.readFully(buffer.array()); + return buffer; + }; + try { + return indexLoader.loadIndex(r); + } catch (InvalidIndexException e) { + log.warn("Unable to load index of file {}: {}", name, e.getMessage()); + } + return null; + } + + @Override + public Map<UUID, List<UUID>> getGraph() throws IOException { + ByteBuffer graph = loadGraph(); + if (graph == null) { + return null; + } else { + return GraphLoader.parseGraph(graph); + } + } + + @Override + public boolean hasGraph() { + if (hasGraph == null) { + try { + loadGraph(); + } catch (IOException ignore) { } + } + return hasGraph; + } + + private ByteBuffer loadGraph() throws IOException { + int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize(); + ByteBuffer graph = GraphLoader.loadGraph((whence, amount) -> access.read(end - whence, amount)); + hasGraph = graph != null; + return graph; + } + + @Override + public BinaryReferencesIndex getBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException { + int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize(); + return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size)); + } + + @Override + public long length() { + return file.length(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void close() throws IOException { + access.close(); + } + + @Override + public int getEntrySize(int size) { + return BLOCK_SIZE + size + getPaddingSize(size); + } + + private int getIndexEntrySize() { + return getEntrySize(index.size()); + } + + private int getGraphEntrySize() { + ByteBuffer buffer; + + try { + buffer = loadGraph(); + } catch (IOException e) { + log.warn("Exception while loading pre-compiled tar graph", e); + return 0; + } + + if (buffer == null) { + return 0; + } + + return getEntrySize(buffer.getInt(buffer.limit() - 8)); + } + + +} Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java?rev=1824115&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java Tue Feb 13 11:17:42 2018 @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; + +import static com.google.common.base.Charsets.UTF_8; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; + +public class SegmentTarWriter implements SegmentArchiveManager.SegmentArchiveWriter { + + private static final Logger log = LoggerFactory.getLogger(SegmentTarWriter.class); + + private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE]; + + private final FileStoreMonitor monitor; + + /** + * The file being written. This instance is also used as an additional + * synchronization point by {@link #flush()} and {@link #close()} to + * allow {@link #flush()} to work concurrently with normal reads and + * writes, but not with a concurrent {@link #close()}. + */ + private final File file; + + private final IOMonitor ioMonitor; + + /** + * File handle. Initialized lazily in {@link #writeSegment(long, long, byte[], int, int, GCGeneration)} + * to avoid creating an extra empty file when just reading from the repository. + * Should only be accessed from synchronized code. + */ + private RandomAccessFile access = null; + + private FileChannel channel = null; + + private volatile long length; + + public SegmentTarWriter(File file, FileStoreMonitor monitor, IOMonitor ioMonitor) { + this.file = file; + this.monitor = monitor; + this.ioMonitor = ioMonitor; + } + + @Override + public TarEntry writeSegment(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException { + UUID uuid = new UUID(msb, lsb); + CRC32 checksum = new CRC32(); + checksum.update(data, offset, size); + String entryName = String.format("%s.%08x", uuid, checksum.getValue()); + byte[] header = newEntryHeader(entryName, size); + + log.debug("Writing segment {} to {}", uuid, file); + + if (access == null) { + access = new RandomAccessFile(file, "rw"); + channel = access.getChannel(); + } + + int padding = getPaddingSize(size); + + long initialLength = access.getFilePointer(); + + access.write(header); + + long dataOffset = access.getFilePointer(); + + ioMonitor.beforeSegmentWrite(file, msb, lsb, size); + Stopwatch stopwatch = Stopwatch.createStarted(); + access.write(data, offset, size); + ioMonitor.afterSegmentWrite(file, msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + + if (padding > 0) { + access.write(ZERO_BYTES, 0, padding); + } + + long currentLength = access.getFilePointer(); + monitor.written(currentLength - initialLength); + + length = currentLength; + + return new TarEntry(msb, lsb, (int) dataOffset, size, generation); + } + + @Override + public ByteBuffer readSegment(TarEntry tarEntry) throws IOException { + checkState(channel != null); // implied by entry != null + ByteBuffer data = ByteBuffer.allocate(tarEntry.size()); + channel.read(data, tarEntry.offset()); + data.rewind(); + return data; + } + + @Override + public void writeIndex(byte[] data) throws IOException { + byte[] header = newEntryHeader(file.getName() + ".idx", data.length); + access.write(header); + access.write(data); + monitor.written(header.length + data.length); + + length = access.getFilePointer(); + } + + @Override + public void writeGraph(byte[] data) throws IOException { + int paddingSize = getPaddingSize(data.length); + byte[] header = newEntryHeader(file.getName() + ".gph", data.length + paddingSize); + access.write(header); + if (paddingSize > 0) { + access.write(ZERO_BYTES, 0, paddingSize); + } + access.write(data); + monitor.written(header.length + paddingSize + data.length); + + length = access.getFilePointer(); + } + + @Override + public void writeBinaryReferences(byte[] data) throws IOException { + int paddingSize = getPaddingSize(data.length); + byte[] header = newEntryHeader(file.getName() + ".brf", data.length + paddingSize); + access.write(header); + if (paddingSize > 0) { + access.write(ZERO_BYTES, 0, paddingSize); + } + access.write(data); + monitor.written(header.length + paddingSize + data.length); + + length = access.getFilePointer(); + } + + @Override + public long getLength() { + return length; + } + + @Override + public void close() throws IOException { + access.write(ZERO_BYTES); + access.write(ZERO_BYTES); + access.close(); + + monitor.written(BLOCK_SIZE * 2); + } + + @Override + public boolean isCreated() { + return access != null; + } + + @Override + public void flush() throws IOException { + access.getFD().sync(); + } + + @Override + public String getName() { + return file.getName(); + } + + private static byte[] newEntryHeader(String name, int size) { + byte[] header = new byte[BLOCK_SIZE]; + + // File name + byte[] nameBytes = name.getBytes(UTF_8); + System.arraycopy( + nameBytes, 0, header, 0, Math.min(nameBytes.length, 100)); + + // File mode + System.arraycopy( + String.format("%07o", 0400).getBytes(UTF_8), 0, + header, 100, 7); + + // User's numeric user ID + System.arraycopy( + String.format("%07o", 0).getBytes(UTF_8), 0, + header, 108, 7); + + // Group's numeric user ID + System.arraycopy( + String.format("%07o", 0).getBytes(UTF_8), 0, + header, 116, 7); + + // File size in bytes (octal basis) + System.arraycopy( + String.format("%011o", size).getBytes(UTF_8), 0, + header, 124, 11); + + // Last modification time in numeric Unix time format (octal) + long time = System.currentTimeMillis() / 1000; + System.arraycopy( + String.format("%011o", time).getBytes(UTF_8), 0, + header, 136, 11); + + // Checksum for header record + System.arraycopy( + new byte[] {' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '}, 0, + header, 148, 8); + + // Type flag + header[156] = '0'; + + // Compute checksum + int checksum = 0; + for (byte aHeader : header) { + checksum += aHeader & 0xff; + } + System.arraycopy( + String.format("%06o\0 ", checksum).getBytes(UTF_8), 0, + header, 148, 8); + + return header; + } + + static int getPaddingSize(int size) { + int remainder = size % BLOCK_SIZE; + if (remainder > 0) { + return BLOCK_SIZE - remainder; + } else { + return 0; + } + } +} Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java?rev=1824115&r1=1824114&r2=1824115&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java Tue Feb 13 11:17:42 2018 @@ -17,7 +17,7 @@ package org.apache.jackrabbit.oak.segment.file.tar; -class TarConstants { +public class TarConstants { private TarConstants() { // Prevent instantiation. @@ -42,11 +42,11 @@ class TarConstants { * (size, checksum, the number of UUIDs).</li> * </ul> */ - static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n'; + public static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n'; /** * The tar file block size. */ - static final int BLOCK_SIZE = 512; + public static final int BLOCK_SIZE = 512; } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java?rev=1824115&r1=1824114&r2=1824115&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java Tue Feb 13 11:17:42 2018 @@ -24,7 +24,7 @@ import java.util.Comparator; * A file entry location in a tar file. This is used for the index with a tar * file. */ -class TarEntry { +public class TarEntry { /** Size in bytes a tar entry takes up in the tar file */ static final int SIZE = 33; @@ -52,7 +52,7 @@ class TarEntry { private final GCGeneration generation; - TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) { + public TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) { this.msb = msb; this.lsb = lsb; this.offset = offset; @@ -60,19 +60,19 @@ class TarEntry { this.generation = generation; } - long msb() { + public long msb() { return msb; } - long lsb() { + public long lsb() { return lsb; } - int offset() { + public int offset() { return offset; } - int size() { + public int size() { return size; } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java?rev=1824115&r1=1824114&r2=1824115&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java Tue Feb 13 11:17:42 2018 @@ -24,7 +24,6 @@ import static com.google.common.collect. import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Sets.newHashSet; import static java.util.Collections.emptySet; -import static org.apache.commons.io.FileUtils.listFiles; import java.io.Closeable; import java.io.File; @@ -52,6 +51,9 @@ import javax.annotation.Nonnull; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.FileReaper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +78,7 @@ public class TarFiles implements Closeab private long reclaimedSize; - private List<File> removableFiles; + private List<String> removableFiles; private Set<UUID> reclaimedSegmentIds; @@ -88,7 +90,7 @@ public class TarFiles implements Closeab return reclaimedSize; } - public List<File> getRemovableFiles() { + public List<String> getRemovableFiles() { return removableFiles; } @@ -118,6 +120,8 @@ public class TarFiles implements Closeab private boolean readOnly; + private SegmentNodeStorePersistence persistence; + private Builder() { // Prevent external instantiation. } @@ -158,15 +162,54 @@ public class TarFiles implements Closeab return this; } + public Builder withPersistence(SegmentNodeStorePersistence persistence) { + this.persistence = persistence; + return this; + } + public TarFiles build() throws IOException { checkState(directory != null, "Directory not specified"); checkState(tarRecovery != null, "TAR recovery strategy not specified"); checkState(ioMonitor != null, "I/O monitor not specified"); checkState(readOnly || fileStoreMonitor != null, "File store statistics not specified"); checkState(readOnly || maxFileSize != 0, "Max file size not specified"); + if (persistence == null) { + persistence = new TarPersistence(directory); + } return new TarFiles(this); } + public File getDirectory() { + return directory; + } + + public boolean isMemoryMapping() { + return memoryMapping; + } + + public TarRecovery getTarRecovery() { + return tarRecovery; + } + + public IOMonitor getIoMonitor() { + return ioMonitor; + } + + public FileStoreMonitor getFileStoreMonitor() { + return fileStoreMonitor; + } + + public long getMaxFileSize() { + return maxFileSize; + } + + public boolean isReadOnly() { + return readOnly; + } + + private SegmentArchiveManager buildArchiveManager() throws IOException { + return persistence.createArchiveManager(memoryMapping, ioMonitor, readOnly && fileStoreMonitor == null ? new FileStoreMonitorAdapter() : fileStoreMonitor); + } } private static final Logger log = LoggerFactory.getLogger(TarFiles.class); @@ -218,13 +261,13 @@ public class TarFiles implements Closeab }; } - private static Map<Integer, Map<Character, File>> collectFiles(File directory) { - Map<Integer, Map<Character, File>> dataFiles = newHashMap(); - for (File file : listFiles(directory, null, false)) { - Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName()); + private static Map<Integer, Map<Character, String>> collectFiles(SegmentArchiveManager archiveManager) throws IOException { + Map<Integer, Map<Character, String>> dataFiles = newHashMap(); + for (String file : archiveManager.listArchives()) { + Matcher matcher = FILE_NAME_PATTERN.matcher(file); if (matcher.matches()) { Integer index = Integer.parseInt(matcher.group(2)); - Map<Character, File> files = dataFiles.get(index); + Map<Character, String> files = dataFiles.get(index); if (files == null) { files = newHashMap(); dataFiles.put(index, files); @@ -245,9 +288,7 @@ public class TarFiles implements Closeab private final long maxFileSize; - private final boolean memoryMapping; - - private final IOMonitor ioMonitor; + private SegmentArchiveManager archiveManager; /** * Guards access to the {@link #readers} and {@link #writer} references. @@ -281,9 +322,9 @@ public class TarFiles implements Closeab private TarFiles(Builder builder) throws IOException { maxFileSize = builder.maxFileSize; - memoryMapping = builder.memoryMapping; - ioMonitor = builder.ioMonitor; - Map<Integer, Map<Character, File>> map = collectFiles(builder.directory); + archiveManager = builder.buildArchiveManager(); + + Map<Integer, Map<Character, String>> map = collectFiles(archiveManager); Integer[] indices = map.keySet().toArray(new Integer[map.size()]); Arrays.sort(indices); @@ -295,9 +336,9 @@ public class TarFiles implements Closeab for (Integer index : indices) { TarReader r; if (builder.readOnly) { - r = TarReader.openRO(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor); + r = TarReader.openRO(map.get(index), builder.tarRecovery, archiveManager); } else { - r = TarReader.open(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor); + r = TarReader.open(map.get(index), builder.tarRecovery, archiveManager); } readers = new Node(r, readers); } @@ -308,10 +349,9 @@ public class TarFiles implements Closeab if (indices.length > 0) { writeNumber = indices[indices.length - 1] + 1; } - writer = new TarWriter(builder.directory, builder.fileStoreMonitor, writeNumber, builder.ioMonitor); + writer = new TarWriter(archiveManager, writeNumber); } - @Override public void close() throws IOException { shutdown = true; @@ -510,7 +550,7 @@ public class TarFiles implements Closeab if (newWriter == writer) { return; } - readers = new Node(TarReader.open(writer.getFile(), memoryMapping, ioMonitor), readers); + readers = new Node(TarReader.open(writer.getFileName(), archiveManager), readers); writer = newWriter; } @@ -657,7 +697,7 @@ public class TarFiles implements Closeab } catch (IOException e) { log.warn("Unable to close swept TAR reader", e); } - result.removableFiles.add(closeable.getFile()); + result.removableFiles.add(closeable.getFileName()); } return result; @@ -711,7 +751,7 @@ public class TarFiles implements Closeab Map<UUID, List<UUID>> graph = null; for (TarReader reader : iterable(head)) { - if (fileName.equals(reader.getFile().getName())) { + if (fileName.equals(reader.getFileName())) { index = reader.getUUIDs(); graph = reader.getGraph(); break; @@ -744,9 +784,12 @@ public class TarFiles implements Closeab Map<String, Set<UUID>> index = new HashMap<>(); for (TarReader reader : iterable(head)) { - index.put(reader.getFile().getName(), reader.getUUIDs()); + index.put(reader.getFileName(), reader.getUUIDs()); } return index; } + public FileReaper createFileReaper() { + return new FileReaper(archiveManager); + } } Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java?rev=1824115&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java Tue Feb 13 11:17:42 2018 @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.tar; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.GCJournal; +import org.apache.jackrabbit.oak.segment.file.LocalGCJournalFile; +import org.apache.jackrabbit.oak.segment.file.LocalManifestFile; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.Collection; + +public class TarPersistence implements SegmentNodeStorePersistence { + + private static final String LOCK_FILE_NAME = "repo.lock"; + + private static final String GC_JOURNAL = "gc.log"; + + private static final String MANIFEST_FILE_NAME = "manifest"; + + private static final String JOURNAL_FILE_NAME = "journal.log"; + + private final File directory; + + public TarPersistence(File directory) { + this.directory = directory; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean memoryMapping, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + return new SegmentTarManager(directory, fileStoreMonitor, ioMonitor, memoryMapping); + } + + @Override + public boolean segmentFilesExist() { + Collection<File> entries = FileUtils.listFiles(directory, new String[] {"tar"}, false); + return !entries.isEmpty(); + } + + @Override + public JournalFile getJournalFile() { + return new LocalJournalFile(directory, JOURNAL_FILE_NAME); + } + + @Override + public GCJournalFile getGCJournalFile() { + return new LocalGCJournalFile(directory, GC_JOURNAL); + } + + @Override + public ManifestFile getManifestFile() { + return new LocalManifestFile(directory, MANIFEST_FILE_NAME); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + RandomAccessFile lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw"); + try { + FileLock lock = lockFile.getChannel().lock(); + return () -> { + lock.release(); + lockFile.close(); + }; + } catch (OverlappingFileLockException ex) { + throw new IllegalStateException(directory.getAbsolutePath() + " is in use by another store.", ex); + } + } + +} \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java?rev=1824115&r1=1824114&r2=1824115&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java Tue Feb 13 11:17:42 2018 @@ -19,24 +19,16 @@ package org.apache.jackrabbit.oak.segment.file.tar; -import static com.google.common.base.Charsets.UTF_8; import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.newArrayListWithCapacity; -import static com.google.common.collect.Maps.newHashMapWithExpectedSize; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Maps.newTreeMap; import static com.google.common.collect.Sets.newHashSet; -import static java.nio.ByteBuffer.wrap; import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration; -import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; -import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGIC; -import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -47,48 +39,25 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.CRC32; import javax.annotation.Nonnull; import com.google.common.base.Predicate; -import com.google.common.base.Stopwatch; -import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; -import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; import org.apache.jackrabbit.oak.segment.file.tar.index.Index; import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; -import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader; -import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException; -import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class TarReader implements Closeable { +public class TarReader implements Closeable { private static final Logger log = LoggerFactory.getLogger(TarReader.class); - private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE); - - /** - * Pattern of the segment entry names. Note the trailing (\\..*)? group - * that's included for compatibility with possible future extensions. - */ - private static final Pattern NAME_PATTERN = Pattern.compile( - "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" - + "(\\.([0-9a-f]{8}))?(\\..*)?"); - - private static int getEntrySize(int size) { - return BLOCK_SIZE + size + TarWriter.getPaddingSize(size); - } - - static TarReader open(File file, boolean memoryMapping, IOMonitor ioMonitor) throws IOException { - TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + static TarReader open(String file, SegmentArchiveManager archiveManager) throws IOException { + TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } else { @@ -107,21 +76,17 @@ class TarReader implements Closeable { * generations. * * @param files The generations of the same TAR file. - * @param memoryMapping If {@code true}, opens the TAR file with memory - * mapping enabled. * @param recovery Strategy for recovering a damaged TAR file. - * @param ioMonitor Callbacks to track internal operations for the open - * TAR file. * @return An instance of {@link TarReader}. */ - static TarReader open(Map<Character, File> files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { - SortedMap<Character, File> sorted = newTreeMap(); + static TarReader open(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { + SortedMap<Character, String> sorted = newTreeMap(); sorted.putAll(files); - List<File> list = newArrayList(sorted.values()); + List<String> list = newArrayList(sorted.values()); Collections.reverse(list); - TarReader reader = openFirstFileWithValidIndex(list, memoryMapping, ioMonitor); + TarReader reader = openFirstFileWithValidIndex(list, archiveManager); if (reader != null) { return reader; } @@ -129,15 +94,15 @@ class TarReader implements Closeable { // no generation has a valid index, so recover as much as we can log.warn("Could not find a valid tar index in {}, recovering...", list); LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap(); - for (File file : sorted.values()) { - collectFileEntries(file, entries, true); + for (String file : sorted.values()) { + collectFileEntries(file, entries, true, archiveManager); } // regenerate the first generation based on the recovered data - File file = sorted.values().iterator().next(); - generateTarFile(entries, file, recovery, ioMonitor); + String file = sorted.values().iterator().next(); + generateTarFile(entries, file, recovery, archiveManager); - reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } else { @@ -145,11 +110,11 @@ class TarReader implements Closeable { } } - static TarReader openRO(Map<Character, File> files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { + static TarReader openRO(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { // for readonly store only try the latest generation of a given // tar file to prevent any rollback or rewrite - File file = files.get(Collections.max(files.keySet())); - TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + String file = files.get(Collections.max(files.keySet())); + TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } @@ -157,10 +122,10 @@ class TarReader implements Closeable { // collecting the entries (without touching the original file) and // writing them into an artificial tar file '.ro.bak' LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap(); - collectFileEntries(file, entries, false); - file = findAvailGen(file, ".ro.bak"); - generateTarFile(entries, file, recovery, ioMonitor); - reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor); + collectFileEntries(file, entries, false, archiveManager); + file = findAvailGen(file, ".ro.bak", archiveManager); + generateTarFile(entries, file, recovery, archiveManager); + reader = openFirstFileWithValidIndex(singletonList(file), archiveManager); if (reader != null) { return reader; } @@ -176,21 +141,16 @@ class TarReader implements Closeable { * into. * @param backup If {@code true}, performs a backup of the TAR file. */ - private static void collectFileEntries(File file, LinkedHashMap<UUID, byte[]> entries, boolean backup) throws IOException { + private static void collectFileEntries(String file, LinkedHashMap<UUID, byte[]> entries, boolean backup, SegmentArchiveManager archiveManager) throws IOException { log.info("Recovering segments from tar file {}", file); try { - RandomAccessFile access = new RandomAccessFile(file, "r"); - try { - recoverEntries(file, access, entries); - } finally { - access.close(); - } + archiveManager.recoverEntries(file, entries); } catch (IOException e) { log.warn("Could not read tar file {}, skipping...", file, e); } if (backup) { - backupSafely(file); + backupSafely(archiveManager, file); } } @@ -202,12 +162,11 @@ class TarReader implements Closeable { * @param file The output file that will contain the recovered * entries. * @param recovery The recovery strategy to execute. - * @param ioMonitor An instance of {@link IOMonitor}. */ - private static void generateTarFile(LinkedHashMap<UUID, byte[]> entries, File file, TarRecovery recovery, IOMonitor ioMonitor) throws IOException { + private static void generateTarFile(LinkedHashMap<UUID, byte[]> entries, String file, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException { log.info("Regenerating tar file {}", file); - try (TarWriter writer = new TarWriter(file, ioMonitor)) { + try (TarWriter writer = new TarWriter(archiveManager, file)) { for (Entry<UUID, byte[]> entry : entries.entrySet()) { try { recovery.recoverEntry(entry.getKey(), entry.getValue(), new EntryRecovery() { @@ -242,13 +201,13 @@ class TarReader implements Closeable { * * @param file File to backup. */ - private static void backupSafely(File file) throws IOException { - File backup = findAvailGen(file, ".bak"); - log.info("Backing up {} to {}", file, backup.getName()); - if (!file.renameTo(backup)) { + private static void backupSafely(SegmentArchiveManager archiveManager, String file) throws IOException { + String backup = findAvailGen(file, ".bak", archiveManager); + log.info("Backing up {} to {}", file, backup); + if (!archiveManager.renameTo(file, backup)) { log.warn("Renaming failed, so using copy to backup {}", file); - FileUtils.copyFile(file, backup); - if (!file.delete()) { + archiveManager.copyFile(file, backup); + if (!archiveManager.delete(file)) { throw new IOException( "Could not remove broken tar file " + file); } @@ -259,62 +218,29 @@ class TarReader implements Closeable { * Fine next available generation number so that a generated file doesn't * overwrite another existing file. * - * @param file The file to backup. + * @param name The file to backup. * @param ext The extension of the backed up file. */ - private static File findAvailGen(File file, String ext) { - File parent = file.getParentFile(); - String name = file.getName(); - File backup = new File(parent, name + ext); - for (int i = 2; backup.exists(); i++) { - backup = new File(parent, name + "." + i + ext); + private static String findAvailGen(String name, String ext, SegmentArchiveManager archiveManager) { + String backup = name + ext; + for (int i = 2; archiveManager.exists(backup); i++) { + backup = name + "." + i + ext; } return backup; } - private static TarReader openFirstFileWithValidIndex(List<File> files, boolean memoryMapping, IOMonitor ioMonitor) { - for (File file : files) { - String name = file.getName(); + private static TarReader openFirstFileWithValidIndex(List<String> archives, SegmentArchiveManager archiveManager) { + for (String name : archives) { try { - RandomAccessFile access = new RandomAccessFile(file, "r"); - try { - Index index = loadAndValidateIndex(access, name); - if (index == null) { - log.info("No index found in tar file {}, skipping...", name); - } else { - // found a file with a valid index, drop the others - for (File other : files) { - if (other != file) { - log.info("Removing unused tar file {}", other.getName()); - other.delete(); - } - } - - if (memoryMapping) { - try { - FileAccess mapped = new FileAccess.Mapped(access); - return new TarReader(file, mapped, index, ioMonitor); - } catch (IOException e) { - log.warn("Failed to mmap tar file {}. Falling back to normal file " + - "IO, which will negatively impact repository performance. " + - "This problem may have been caused by restrictions on the " + - "amount of virtual memory available to the JVM. Please make " + - "sure that a 64-bit JVM is being used and that the process " + - "has access to unlimited virtual memory (ulimit option -v).", - name, e); - } + SegmentArchiveManager.SegmentArchiveReader reader = archiveManager.open(name); + if (reader != null) { + for (String other : archives) { + if (other != name) { + log.info("Removing unused tar file {}", other); + archiveManager.delete(other); } - - FileAccess random = new FileAccess.Random(access); - // prevent the finally block from closing the file - // as the returned TarReader will take care of that - access = null; - return new TarReader(file, random, index, ioMonitor); - } - } finally { - if (access != null) { - access.close(); } + return new TarReader(archiveManager, reader); } } catch (IOException e) { log.warn("Could not read tar file {}, skipping...", name, e); @@ -324,160 +250,22 @@ class TarReader implements Closeable { return null; } - /** - * Tries to read an existing index from the given tar file. The index is - * returned if it is found and looks valid (correct checksum, passes sanity - * checks). - * - * @param file The TAR file. - * @param name Name of the TAR file, for logging purposes. - * @return An instance of {@link ByteBuffer} populated with the content of - * the index. If the TAR doesn't contain any index, {@code null} is returned - * instead. - */ - private static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException { - long length = file.length(); - - if (length % BLOCK_SIZE != 0) { - log.warn("Unable to load index of file {}: Invalid alignment", name); - return null; - } - if (length < 6 * BLOCK_SIZE) { - log.warn("Unable to load index of file {}: File too short", name); - return null; - } - if (length > Integer.MAX_VALUE) { - log.warn("Unable to load index of file {}: File too long", name); - return null; - } - - ReaderAtEnd r = (whence, size) -> { - ByteBuffer buffer = ByteBuffer.allocate(size); - file.seek(length - 2 * BLOCK_SIZE - whence); - file.readFully(buffer.array()); - return buffer; - }; - - try { - return indexLoader.loadIndex(r); - } catch (InvalidIndexException e) { - log.warn("Unable to load index of file {}: {}", name, e.getMessage()); - } - - return null; - } - - /** - * Scans through the tar file, looking for all segment entries. - * - * @param file The path of the TAR file. - * @param access The contents of the TAR file. - * @param entries The map that will contain the recovered entries. The - * entries are inserted in the {@link LinkedHashMap} in the - * order they appear in the TAR file. - */ - private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap<UUID, byte[]> entries) throws IOException { - byte[] header = new byte[BLOCK_SIZE]; - while (access.getFilePointer() + BLOCK_SIZE <= access.length()) { - // read the tar header block - access.readFully(header); - - // compute the header checksum - int sum = 0; - for (int i = 0; i < BLOCK_SIZE; i++) { - sum += header[i] & 0xff; - } - - // identify possible zero block - if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) { - return; // found the zero blocks at the end of the file - } - - // replace the actual stored checksum with spaces for comparison - for (int i = 148; i < 148 + 8; i++) { - sum -= header[i] & 0xff; - sum += ' '; - } - - byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8); - for (int i = 0; i < checkbytes.length; i++) { - if (checkbytes[i] != header[148 + i]) { - log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...", - access.getFilePointer() - BLOCK_SIZE, file); - } - } - - // The header checksum passes, so read the entry name and size - ByteBuffer buffer = wrap(header); - String name = readString(buffer, 100); - buffer.position(124); - int size = readNumber(buffer, 12); - if (access.getFilePointer() + size > access.length()) { - // checksum was correct, so the size field should be accurate - log.warn("Partial entry {} in tar file {}, ignoring...", name, file); - return; - } - - Matcher matcher = NAME_PATTERN.matcher(name); - if (matcher.matches()) { - UUID id = UUID.fromString(matcher.group(1)); - - String checksum = matcher.group(3); - if (checksum != null || !entries.containsKey(id)) { - byte[] data = new byte[size]; - access.readFully(data); - - // skip possible padding to stay at block boundaries - long position = access.getFilePointer(); - long remainder = position % BLOCK_SIZE; - if (remainder != 0) { - access.seek(position + (BLOCK_SIZE - remainder)); - } - - if (checksum != null) { - CRC32 crc = new CRC32(); - crc.update(data); - if (crc.getValue() != Long.parseLong(checksum, 16)) { - log.warn("Checksum mismatch in entry {} of tar file {}, skipping...", - name, file); - continue; - } - } - - entries.put(id, data); - } - } else if (!name.equals(file.getName() + ".idx")) { - log.warn("Unexpected entry {} in tar file {}, skipping...", - name, file); - long position = access.getFilePointer() + size; - long remainder = position % BLOCK_SIZE; - if (remainder != 0) { - position += BLOCK_SIZE - remainder; - } - access.seek(position); - } - } - } - - private final File file; + private final SegmentArchiveManager archiveManager; - private final FileAccess access; + private final SegmentArchiveManager.SegmentArchiveReader archive; private final Index index; private volatile boolean hasGraph; - private final IOMonitor ioMonitor; - - private TarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) { - this.file = file; - this.access = access; - this.index = index; - this.ioMonitor = ioMonitor; + private TarReader(SegmentArchiveManager archiveManager, SegmentArchiveManager.SegmentArchiveReader archive) { + this.archiveManager = archiveManager; + this.archive = archive; + this.index = archive.getIndex(); } long size() { - return file.length(); + return archive.length(); } /** @@ -514,15 +302,7 @@ class TarReader implements Closeable { * @return the byte buffer, or null if not in this file. */ ByteBuffer readEntry(long msb, long lsb) throws IOException { - int idx = findEntry(msb, lsb); - if (idx == -1) { - return null; - } - return readEntry(msb, lsb, index.entry(idx)); - } - - private ByteBuffer readEntry(long msb, long lsb, IndexEntry entry) throws IOException { - return readSegment(msb, lsb, entry.getPosition(), entry.getLength()); + return archive.readSegment(msb, lsb); } /** @@ -705,7 +485,7 @@ class TarReader implements Closeable { * TarReader}, or {@code null}. */ TarReader sweep(@Nonnull Set<UUID> reclaim, @Nonnull Set<UUID> reclaimed) throws IOException { - String name = file.getName(); + String name = archive.getName(); log.debug("Cleaning up {}", name); Set<UUID> cleaned = newHashSet(); @@ -716,13 +496,13 @@ class TarReader implements Closeable { TarEntry[] entries = getEntries(); for (int i = 0; i < entries.length; i++) { TarEntry entry = entries[i]; - beforeSize += getEntrySize(entry.size()); + beforeSize += archive.getEntrySize(entry.size()); UUID id = new UUID(entry.msb(), entry.lsb()); if (reclaim.contains(id)) { cleaned.add(id); entries[i] = null; } else { - afterSize += getEntrySize(entry.size()); + afterSize += archive.getEntrySize(entry.size()); afterCount += 1; } } @@ -737,7 +517,7 @@ class TarReader implements Closeable { // in which case we'll always generate a new tar file with // the graph to speed up future garbage collection runs. log.debug("Not enough space savings. ({}/{}). Skipping clean up of {}", - access.length() - afterSize, access.length(), name); + archive.length() - afterSize, archive.length(), name); return this; } if (!hasGraph()) { @@ -751,21 +531,18 @@ class TarReader implements Closeable { return this; } - File newFile = new File( - file.getParentFile(), - name.substring(0, pos) + (char) (generation + 1) + ".tar"); + String newFile = name.substring(0, pos) + (char) (generation + 1) + ".tar"; - log.debug("Writing new generation {}", newFile.getName()); - TarWriter writer = new TarWriter(newFile, ioMonitor); + log.debug("Writing new generation {}", newFile); + TarWriter writer = new TarWriter(archiveManager, newFile); for (TarEntry entry : entries) { if (entry != null) { long msb = entry.msb(); long lsb = entry.lsb(); - int offset = entry.offset(); int size = entry.size(); GCGeneration gen = entry.generation(); byte[] data = new byte[size]; - readSegment(msb, lsb, offset, size).get(data); + archive.readSegment(msb, lsb).get(data); writer.writeEntry(msb, lsb, data, 0, size, gen); } } @@ -809,19 +586,19 @@ class TarReader implements Closeable { writer.close(); - TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), access.isMemoryMapped(), ioMonitor); + TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), archiveManager); if (reader != null) { reclaimed.addAll(cleaned); return reader; } else { - log.warn("Failed to open cleaned up tar file {}", file); + log.warn("Failed to open cleaned up tar file {}", getFileName()); return this; } } @Override public void close() throws IOException { - access.close(); + archive.close(); } /** @@ -831,42 +608,11 @@ class TarReader implements Closeable { * @return The parsed graph, or {@code null} if one was not found. */ Map<UUID, List<UUID>> getGraph() throws IOException { - ByteBuffer graph = loadGraph(); - if (graph == null) { - return null; - } else { - return parseGraph(graph); - } + return archive.getGraph(); } private boolean hasGraph() { - if (!hasGraph) { - try { - loadGraph(); - } catch (IOException ignore) { } - } - return hasGraph; - } - - private int getIndexEntrySize() { - return getEntrySize(index.size()); - } - - private int getGraphEntrySize() { - ByteBuffer buffer; - - try { - buffer = loadGraph(); - } catch (IOException e) { - log.warn("Exception while loading pre-compiled tar graph", e); - return 0; - } - - if (buffer == null) { - return 0; - } - - return getEntrySize(buffer.getInt(buffer.limit() - 8)); + return archive.hasGraph(); } /** @@ -883,142 +629,27 @@ class TarReader implements Closeable { BinaryReferencesIndex getBinaryReferences() { BinaryReferencesIndex index = null; try { - index = loadBinaryReferences(); + index = archive.getBinaryReferences(); } catch (InvalidBinaryReferencesIndexException | IOException e) { log.warn("Exception while loading binary reference", e); } return index; } - private BinaryReferencesIndex loadBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException { - int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize(); - return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size)); - } - - /** - * Loads the optional pre-compiled graph entry from the given tar file. - * - * @return graph buffer, or {@code null} if one was not found - * @throws IOException if the tar file could not be read - */ - private ByteBuffer loadGraph() throws IOException { - int pos = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize(); - - ByteBuffer meta = access.read(pos - 16, 16); - - int crc32 = meta.getInt(); - int count = meta.getInt(); - int bytes = meta.getInt(); - int magic = meta.getInt(); - - if (magic != GRAPH_MAGIC) { - log.warn("Invalid graph magic number in {}", file); - return null; - } - - if (count < 0) { - log.warn("Invalid number of entries in {}", file); - return null; - } - - if (bytes < 4 + count * 34) { - log.warn("Invalid entry size in {}", file); - return null; - } - - ByteBuffer graph = access.read(pos - bytes, bytes); - - byte[] b = new byte[bytes - 16]; - - graph.mark(); - graph.get(b); - graph.reset(); - - CRC32 checksum = new CRC32(); - checksum.update(b); - - if (crc32 != (int) checksum.getValue()) { - log.warn("Invalid graph checksum in tar file {}", file); - return null; - } - - hasGraph = true; - - return graph; - } - - private ByteBuffer readSegment(long msb, long lsb, int offset, int size) throws IOException { - ioMonitor.beforeSegmentRead(file, msb, lsb, size); - Stopwatch stopwatch = Stopwatch.createStarted(); - ByteBuffer buffer = access.read(offset, size); - long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - ioMonitor.afterSegmentRead(file, msb, lsb, size, elapsed); - return buffer; - } - - private static Map<UUID, List<UUID>> parseGraph(ByteBuffer buffer) { - int nEntries = buffer.getInt(buffer.limit() - 12); - - Map<UUID, List<UUID>> graph = newHashMapWithExpectedSize(nEntries); - - for (int i = 0; i < nEntries; i++) { - long msb = buffer.getLong(); - long lsb = buffer.getLong(); - int nVertices = buffer.getInt(); - - List<UUID> vertices = newArrayListWithCapacity(nVertices); - - for (int j = 0; j < nVertices; j++) { - long vMsb = buffer.getLong(); - long vLsb = buffer.getLong(); - vertices.add(new UUID(vMsb, vLsb)); - } - - graph.put(new UUID(msb, lsb), vertices); - } - - return graph; - } - - private static String readString(ByteBuffer buffer, int fieldSize) { - byte[] b = new byte[fieldSize]; - buffer.get(b); - int n = 0; - while (n < fieldSize && b[n] != 0) { - n++; - } - return new String(b, 0, n, UTF_8); - } - - private static int readNumber(ByteBuffer buffer, int fieldSize) { - byte[] b = new byte[fieldSize]; - buffer.get(b); - int number = 0; - for (int i = 0; i < fieldSize; i++) { - int digit = b[i] & 0xff; - if ('0' <= digit && digit <= '7') { - number = number * 8 + digit - '0'; - } else { - break; - } - } - return number; - } - /** * Return the path of this TAR file. * * @return An instance of {@link File}. */ - File getFile() { - return file; + String getFileName() { + return archive.getName(); } //------------------------------------------------------------< Object >-- @Override public String toString() { - return file.toString(); + return getFileName(); } }