http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java new file mode 100644 index 0000000..95cf4da --- /dev/null +++ b/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -0,0 +1,1008 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import static java.util.Objects.requireNonNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * This implementation provides as little Locking as possible in order to + * provide the highest throughput possible. However, this implementation is ONLY + * appropriate if it can be guaranteed that only a single thread will ever issue + * updates for a given Record at any one time. + * </p> + * + * @param <T> + */ +public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> { + + private final Path basePath; + private final Path partialPath; + private final Path snapshotPath; + + private final SerDe<T> serde; + private final SyncListener syncListener; + private final FileChannel lockChannel; + private final AtomicLong transactionIdGenerator = new AtomicLong(0L); + + private final Partition<T>[] partitions; + private final AtomicLong partitionIndex = new AtomicLong(0L); + private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>(); + private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap); + private final Set<String> externalLocations = new CopyOnWriteArraySet<>(); + + private final Set<String> recoveredExternalLocations = new CopyOnWriteArraySet<>(); + + private final AtomicInteger numberBlackListedPartitions = new AtomicInteger(0); + + private static final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); // required to update a partition + private final Lock writeLock = rwLock.writeLock(); // required for checkpoint + + private volatile boolean updated = false; + private volatile boolean recovered = false; + + public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { + this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener); + } + + /** + * + * @param paths a sorted set of Paths to use for the partitions/journals and + * the snapshot. The snapshot will always be written to the first path + * specified. + * + * @param partitionCount the number of partitions/journals to use. For best + * performance, this should be close to the number of threads that are + * expected to update the repository simultaneously + * + * @param serde + * @param syncListener + * @throws IOException + */ + @SuppressWarnings("unchecked") + public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { + this.syncListener = syncListener; + + requireNonNull(paths); + requireNonNull(serde); + + if (paths.isEmpty()) { + throw new IllegalArgumentException("Paths must be non-empty"); + } + + int existingPartitions = 0; + for (final Path path : paths) { + if (!Files.exists(path)) { + Files.createDirectories(path); + } + + final File file = path.toFile(); + if (!file.isDirectory()) { + throw new IOException("Path given [" + path + "] is not a directory"); + } + if (!file.canWrite()) { + throw new IOException("Path given [" + path + "] is not writable"); + } + if (!file.canRead()) { + throw new IOException("Path given [" + path + "] is not readable"); + } + if (!file.canExecute()) { + throw new IOException("Path given [" + path + "] is not executable"); + } + + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + if (child.isDirectory() && child.getName().startsWith("partition-")) { + existingPartitions++; + } + } + + if (existingPartitions != 0 && existingPartitions != partitionCount) { + logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has " + + "{} partitions; ignoring argument and proceeding with {} partitions", + new Object[]{partitionCount, existingPartitions, existingPartitions}); + } + } + } + + this.basePath = paths.iterator().next(); + this.partialPath = basePath.resolve("snapshot.partial"); + this.snapshotPath = basePath.resolve("snapshot"); + this.serde = serde; + + final Path lockPath = basePath.resolve("wali.lock"); + lockChannel = new FileOutputStream(lockPath.toFile()).getChannel(); + lockChannel.lock(); + + partitions = new Partition[partitionCount]; + + Iterator<Path> pathIterator = paths.iterator(); + for (int i = 0; i < partitionCount; i++) { + // If we're out of paths, create a new iterator to start over. + if (!pathIterator.hasNext()) { + pathIterator = paths.iterator(); + } + + final Path partitionBasePath = pathIterator.next(); + + partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion()); + } + } + + @Override + public int update(final Collection<T> records, final boolean forceSync) throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot update repository until record recovery has been performed"); + } + + if (records.isEmpty()) { + return -1; + } + + updated = true; + readLock.lock(); + try { + while (true) { + final int numBlackListed = numberBlackListedPartitions.get(); + if (numBlackListed >= partitions.length) { + throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required."); + } + + final long partitionIdx = partitionIndex.getAndIncrement(); + final int resolvedIdx = (int) (partitionIdx % partitions.length); + final Partition<T> partition = partitions[resolvedIdx]; + if (partition.tryClaim()) { + try { + final long transactionId = transactionIdGenerator.getAndIncrement(); + if (logger.isTraceEnabled()) { + for (final T record : records) { + logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); + } + } + + try { + partition.update(records, transactionId, unmodifiableRecordMap, forceSync); + } catch (final Exception e) { + partition.blackList(); + numberBlackListedPartitions.incrementAndGet(); + throw e; + } + + if (forceSync && syncListener != null) { + syncListener.onSync(resolvedIdx); + } + } finally { + partition.releaseClaim(); + } + + for (final T record : records) { + final UpdateType updateType = serde.getUpdateType(record); + final Object recordIdentifier = serde.getRecordIdentifier(record); + + if (updateType == UpdateType.DELETE) { + recordMap.remove(recordIdentifier); + } else if (updateType == UpdateType.SWAP_OUT) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!"); + } else { + recordMap.remove(recordIdentifier); + this.externalLocations.add(newLocation); + } + } else if (updateType == UpdateType.SWAP_IN) { + final String newLocation = serde.getLocation(record); + if (newLocation == null) { + logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!"); + } else { + externalLocations.remove(newLocation); + } + recordMap.put(recordIdentifier, record); + } else { + recordMap.put(recordIdentifier, record); + } + } + + return resolvedIdx; + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public Collection<T> recoverRecords() throws IOException { + if (updated) { + throw new IllegalStateException("Cannot recover records after updating the repository; must call recoverRecords first"); + } + + final long recoverStart = System.nanoTime(); + writeLock.lock(); + try { + Long maxTransactionId = recoverFromSnapshot(recordMap); + recoverFromEdits(recordMap, maxTransactionId); + + for (final Partition<T> partition : partitions) { + final long transId = partition.getMaxRecoveredTransactionId(); + if (maxTransactionId == null || transId > maxTransactionId) { + maxTransactionId = transId; + } + } + + this.transactionIdGenerator.set(maxTransactionId + 1); + this.externalLocations.addAll(recoveredExternalLocations); + logger.info("{} finished recovering records. Performing Checkpoint to ensure proper state of Partitions before updates", this); + } finally { + writeLock.unlock(); + } + final long recoverNanos = System.nanoTime() - recoverStart; + final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); + logger.info("Successfully recovered {} records in {} milliseconds", recordMap.size(), recoveryMillis); + checkpoint(); + + recovered = true; + return recordMap.values(); + } + + @Override + public Set<String> getRecoveredSwapLocations() throws IOException { + return recoveredExternalLocations; + } + + private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws IOException { + final boolean partialExists = Files.exists(partialPath); + final boolean snapshotExists = Files.exists(snapshotPath); + + if (!partialExists && !snapshotExists) { + return null; + } + + if (partialExists && snapshotExists) { + // both files exist -- assume we failed while checkpointing. Delete + // the partial file + Files.delete(partialPath); + } else if (partialExists) { + // partial exists but snapshot does not -- we must have completed + // creating the partial, deleted the snapshot + // but crashed before renaming the partial to the snapshot. Just + // rename partial to snapshot + Files.move(partialPath, snapshotPath); + } + + if (Files.size(snapshotPath) == 0) { + logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this); + return null; + } + + // at this point, we know the snapshotPath exists because if it didn't, then we either returned null + // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath. + try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath, StandardOpenOption.READ)))) { + final String waliImplementationClass = dataIn.readUTF(); + final int waliImplementationVersion = dataIn.readInt(); + + if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) { + throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName()); + } + + if (waliImplementationVersion > getVersion()) { + throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion()); + } + + dataIn.readUTF(); // ignore serde class name for now + final int serdeVersion = dataIn.readInt(); + final long maxTransactionId = dataIn.readLong(); + final int numRecords = dataIn.readInt(); + + for (int i = 0; i < numRecords; i++) { + final T record = serde.deserializeRecord(dataIn, serdeVersion); + if (record == null) { + throw new EOFException(); + } + + final UpdateType updateType = serde.getUpdateType(record); + if (updateType == UpdateType.DELETE) { + logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored"); + continue; + } + + logger.trace("Recovered from snapshot: {}", record); + recordMap.put(serde.getRecordIdentifier(record), record); + } + + final int numSwapRecords = dataIn.readInt(); + final Set<String> swapLocations = new HashSet<>(); + for (int i = 0; i < numSwapRecords; i++) { + swapLocations.add(dataIn.readUTF()); + } + this.recoveredExternalLocations.addAll(swapLocations); + + logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId}); + return maxTransactionId; + } + } + + /** + * Recovers records from the edit logs via the Partitions. Returns a boolean + * if recovery of a Partition requires the Write-Ahead Log be checkpointed + * before modification. + * + * @param modifiableRecordMap + * @param maxTransactionIdRestored + * @return + * @throws IOException + */ + private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException { + final Map<Object, T> updateMap = new HashMap<>(); + final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(modifiableRecordMap); + final Map<Object, T> ignorableMap = new HashMap<>(); + final Set<String> ignorableSwapLocations = new HashSet<>(); + + // populate a map of the next transaction id for each partition to the + // partition that has that next transaction id. + final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>(); + for (final Partition<T> partition : partitions) { + Long transactionId; + boolean keepTransaction; + do { + transactionId = partition.getNextRecoverableTransactionId(); + + keepTransaction = transactionId == null || maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored; + if (keepTransaction && transactionId != null) { + // map this transaction id to its partition so that we can + // start restoring transactions from this partition, + // starting at 'transactionId' + transactionMap.put(transactionId, partition); + } else if (transactionId != null) { + // skip the next transaction, because our snapshot already + // contained this transaction. + try { + partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations); + } catch (final EOFException e) { + logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.", + new Object[]{this, partition, transactionId}); + } + } + } while (!keepTransaction); + } + + while (!transactionMap.isEmpty()) { + final Map.Entry<Long, Partition<T>> firstEntry = transactionMap.entrySet().iterator().next(); + final Long firstTransactionId = firstEntry.getKey(); + final Partition<T> nextPartition = firstEntry.getValue(); + + try { + updateMap.clear(); + final Set<Object> idsRemoved = nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, recoveredExternalLocations); + modifiableRecordMap.putAll(updateMap); + for (final Object id : idsRemoved) { + modifiableRecordMap.remove(id); + } + } catch (final EOFException e) { + logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", + new Object[]{this, nextPartition, firstTransactionId}); + } + + transactionMap.remove(firstTransactionId); + + Long subsequentTransactionId = null; + try { + subsequentTransactionId = nextPartition.getNextRecoverableTransactionId(); + } catch (final IOException e) { + logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", + new Object[]{this, nextPartition, firstTransactionId}); + } + + if (subsequentTransactionId != null) { + transactionMap.put(subsequentTransactionId, nextPartition); + } + } + + for (final Partition<T> partition : partitions) { + partition.endRecovery(); + } + } + + @Override + public synchronized int checkpoint() throws IOException { + final Set<T> records; + final Set<String> swapLocations; + final long maxTransactionId; + + final long startNanos = System.nanoTime(); + + FileOutputStream fileOut = null; + DataOutputStream dataOut = null; + + long stopTheWorldNanos = -1L; + long stopTheWorldStart = -1L; + try { + writeLock.lock(); + try { + stopTheWorldStart = System.nanoTime(); + // stop the world while we make a copy of the records that must + // be checkpointed and rollover the partitions. + // We copy the records because serializing them is potentially + // very expensive, especially when we have hundreds + // of thousands or even millions of them. We don't want to + // prevent WALI from being used during this time. + + // So the design is to copy all of the records, determine the + // last transaction ID that the records represent, + // and roll over the partitions to new write-ahead logs. + // Then, outside of the write lock, we will serialize the data + // to disk, and then remove the old Partition data. + records = new HashSet<>(recordMap.values()); + maxTransactionId = transactionIdGenerator.get() - 1; + + swapLocations = new HashSet<>(externalLocations); + for (final Partition<T> partition : partitions) { + partition.rollover(); + } + + // notify global sync with the write lock held. We do this because we don't want the repository to get updated + // while the listener is performing its necessary tasks + if (syncListener != null) { + syncListener.onGlobalSync(); + } + } finally { + writeLock.unlock(); + } + + stopTheWorldNanos = System.nanoTime() - stopTheWorldStart; + + // perform checkpoint, writing to .partial file + fileOut = new FileOutputStream(partialPath.toFile()); + dataOut = new DataOutputStream(fileOut); + dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName()); + dataOut.writeInt(getVersion()); + dataOut.writeUTF(serde.getClass().getName()); + dataOut.writeInt(serde.getVersion()); + dataOut.writeLong(maxTransactionId); + dataOut.writeInt(records.size()); + + for (final T record : records) { + logger.trace("Checkpointing {}", record); + serde.serializeRecord(record, dataOut); + } + + dataOut.writeInt(swapLocations.size()); + for (final String swapLocation : swapLocations) { + dataOut.writeUTF(swapLocation); + } + } finally { + if (dataOut != null) { + try { + dataOut.flush(); + fileOut.getFD().sync(); + dataOut.close(); + } catch (final IOException e) { + logger.warn("Failed to close Data Stream due to {}", e.toString(), e); + } + } + } + + // delete the snapshot, if it exists, and rename the .partial to + // snapshot + Files.deleteIfExists(snapshotPath); + Files.move(partialPath, snapshotPath); + + // clear all of the edit logs + final long partitionStart = System.nanoTime(); + for (final Partition<T> partition : partitions) { + // we can call clearOld without claiming the partition because it + // does not change the partition's state + // and the only member variable it touches cannot be modified, other + // than when #rollover() is called. + // And since this method is the only one that calls #rollover() and + // this method is synchronized, + // the value of that member variable will not change. And it's + // volatile, so we will get the correct value. + partition.clearOld(); + } + final long partitionEnd = System.nanoTime(); + numberBlackListedPartitions.set(0); + + final long endNanos = System.nanoTime(); + final long millis = TimeUnit.MILLISECONDS.convert(endNanos - startNanos, TimeUnit.NANOSECONDS); + final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS); + final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos); + + logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}", + new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId}); + + return records.size(); + } + + @Override + public void shutdown() throws IOException { + writeLock.lock(); + try { + for (final Partition<T> partition : partitions) { + partition.close(); + } + } finally { + writeLock.unlock(); + lockChannel.close(); + } + } + + public int getVersion() { + return 1; + } + + /** + * Represents a partition of this repository, which maps directly to a + * .journal file. + * + * All methods with the exceptions of {@link #claim()}, {@link #tryClaim()}, + * and {@link #releaseClaim()} in this Partition MUST be called while + * holding the claim (via {@link #claim} or {@link #tryClaim()). + * + * @param <S> + */ + private static class Partition<S> { + + public static final String JOURNAL_EXTENSION = ".journal"; + private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); + + private final SerDe<S> serde; + + private final Path editDirectory; + private final int writeAheadLogVersion; + + private final Lock lock = new ReentrantLock(); + private DataOutputStream dataOut = null; + private FileOutputStream fileOut = null; + private boolean blackListed = false; + private boolean closed = false; + private DataInputStream recoveryIn; + private int recoveryVersion; + private String currentJournalFilename = ""; + + private static final byte TRANSACTION_CONTINUE = 1; + private static final byte TRANSACTION_COMMIT = 2; + + private final String description; + private final AtomicLong maxTransactionId = new AtomicLong(-1L); + private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); + + private final Queue<Path> recoveryFiles; + + public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException { + this.editDirectory = path; + this.serde = serde; + + final File file = path.toFile(); + if (!file.exists() && !file.mkdirs()) { + throw new IOException("Could not create directory " + file.getAbsolutePath()); + } + + this.recoveryFiles = new LinkedBlockingQueue<>(); + for (final Path recoveryPath : getRecoveryPaths()) { + recoveryFiles.add(recoveryPath); + } + + this.description = "Partition-" + partitionIndex; + this.writeAheadLogVersion = writeAheadLogVersion; + } + + public boolean tryClaim() { + final boolean obtainedLock = lock.tryLock(); + if (!obtainedLock) { + return false; + } + + // Check if the partition is blacklisted. If so, unlock it and return false. Otherwise, + // leave it locked and return true, so that the caller will need to unlock. + if (blackListed) { + lock.unlock(); + return false; + } + + return true; + } + + public void releaseClaim() { + lock.unlock(); + } + + public void close() { + final DataOutputStream out = dataOut; + if (out != null) { + try { + out.close(); + } catch (final Exception e) { + + } + } + + this.closed = true; + this.dataOut = null; + } + + public void blackList() { + lock.lock(); + try { + blackListed = true; + } finally { + lock.unlock(); + } + logger.debug("Blacklisted {}", this); + } + + /** + * Closes resources pointing to the current journal and begins writing + * to a new one + * + * @throws IOException + */ + public void rollover() throws IOException { + lock.lock(); + try { + final DataOutputStream out = dataOut; + if (out != null) { + out.close(); + } + + final Path editPath = getNewEditPath(); + final FileOutputStream fos = new FileOutputStream(editPath.toFile()); + final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); + outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); + outStream.writeInt(writeAheadLogVersion); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + outStream.flush(); + dataOut = outStream; + fileOut = fos; + + currentJournalFilename = editPath.toFile().getName(); + + blackListed = false; + } finally { + lock.unlock(); + } + } + + private long getJournalIndex(final File file) { + final String filename = file.getName(); + final int dotIndex = filename.indexOf("."); + final String number = filename.substring(0, dotIndex); + return Long.parseLong(number); + } + + private Path getNewEditPath() { + final List<Path> recoveryPaths = getRecoveryPaths(); + final long newIndex; + if (recoveryPaths == null || recoveryPaths.isEmpty()) { + newIndex = 1; + } else { + final long lastFileIndex = getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile()); + newIndex = lastFileIndex + 1; + } + + return editDirectory.resolve(newIndex + JOURNAL_EXTENSION); + } + + private List<Path> getRecoveryPaths() { + final List<Path> paths = new ArrayList<>(); + + final File directory = editDirectory.toFile(); + final File[] partitionFiles = directory.listFiles(); + if (partitionFiles == null) { + return paths; + } + + for (final File file : partitionFiles) { + // if file is a journal file but no data has yet been persisted, it may + // very well be a 0-byte file (the journal is not SYNC'ed to disk after + // a header is written out, so it may be lost). In this case, the journal + // is empty, so we can just skip it. + if (file.isDirectory() || file.length() == 0L) { + continue; + } + + if (!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) { + continue; + } + + if (isJournalFile(file)) { + paths.add(file.toPath()); + } else { + logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath()); + } + } + + // Sort journal files by the numeric portion of the filename + Collections.sort(paths, new Comparator<Path>() { + @Override + public int compare(final Path o1, final Path o2) { + if (o1 == null && o2 == null) { + return 0; + } + if (o1 == null) { + return 1; + } + if (o2 == null) { + return -1; + } + + final long index1 = getJournalIndex(o1.toFile()); + final long index2 = getJournalIndex(o2.toFile()); + return Long.compare(index1, index2); + } + }); + + return paths; + } + + void clearOld() { + final List<Path> oldRecoveryFiles = getRecoveryPaths(); + + for (final Path path : oldRecoveryFiles) { + final File file = path.toFile(); + if (file.getName().equals(currentJournalFilename)) { + continue; + } + if (file.exists()) { + file.delete(); + } + } + } + + private boolean isJournalFile(final File file) { + final String expectedStartsWith = MinimalLockingWriteAheadLog.class.getName(); + try { + try (final FileInputStream fis = new FileInputStream(file); + final InputStream bufferedIn = new BufferedInputStream(fis); + final DataInputStream in = new DataInputStream(bufferedIn)) { + final String waliImplClassName = in.readUTF(); + if (!expectedStartsWith.equals(waliImplClassName)) { + return false; + } + } + } catch (final IOException e) { + return false; + } + + return true; + } + + public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException { + if (this.closed) { + throw new IllegalStateException("Partition is closed"); + } + + final DataOutputStream out = dataOut; + out.writeLong(transactionId); + + final int numEditsToSerialize = records.size(); + int editsSerialized = 0; + for (final S record : records) { + final Object recordId = serde.getRecordIdentifier(record); + final S previousVersion = recordMap.get(recordId); + + serde.serializeEdit(previousVersion, record, out); + if (++editsSerialized < numEditsToSerialize) { + out.write(TRANSACTION_CONTINUE); + } else { + out.write(TRANSACTION_COMMIT); + } + } + + out.flush(); + + if (forceSync) { + fileOut.getFD().sync(); + } + } + + private DataInputStream createDataInputStream(final Path path) throws IOException { + return new DataInputStream(new BufferedInputStream(Files.newInputStream(path))); + } + + private DataInputStream getRecoveryStream() throws IOException { + if (recoveryIn != null && hasMoreData(recoveryIn)) { + return recoveryIn; + } + + while (true) { + final Path nextRecoveryPath = recoveryFiles.poll(); + if (nextRecoveryPath == null) { + return null; + } + + recoveryIn = createDataInputStream(nextRecoveryPath); + if (hasMoreData(recoveryIn)) { + final String waliImplementationClass = recoveryIn.readUTF(); + if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) { + continue; + } + + final long waliVersion = recoveryIn.readInt(); + if (waliVersion > writeAheadLogVersion) { + throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion); + } + + @SuppressWarnings("unused") + final String serdeClassName = recoveryIn.readUTF(); + this.recoveryVersion = recoveryIn.readInt(); + + break; + } + } + + return recoveryIn; + } + + public Long getNextRecoverableTransactionId() throws IOException { + while (true) { + DataInputStream recoveryStream = getRecoveryStream(); + if (recoveryStream == null) { + return null; + } + + final long transactionId; + try { + transactionId = recoveryIn.readLong(); + } catch (final EOFException e) { + continue; + } + + this.maxTransactionId.set(transactionId); + return transactionId; + } + } + + private boolean hasMoreData(final InputStream in) throws IOException { + in.mark(1); + final int nextByte = in.read(); + in.reset(); + return nextByte >= 0; + } + + public void endRecovery() throws IOException { + if (recoveryIn != null) { + recoveryIn.close(); + } + + final Path nextRecoveryPath = this.recoveryFiles.poll(); + if (nextRecoveryPath != null) { + throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory); + } + + final Path newEditPath = getNewEditPath(); + + final FileOutputStream fos = new FileOutputStream(newEditPath.toFile()); + final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); + outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); + outStream.writeInt(writeAheadLogVersion); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + outStream.flush(); + dataOut = outStream; + fileOut = fos; + } + + public Set<Object> recoverNextTransaction(final Map<Object, S> currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> swapLocations) throws IOException { + final Set<Object> idsRemoved = new HashSet<>(); + + int transactionFlag; + do { + final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); + if (logger.isTraceEnabled()) { + logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record}); + } + + final Object recordId = serde.getRecordIdentifier(record); + final UpdateType updateType = serde.getUpdateType(record); + if (updateType == UpdateType.DELETE) { + updatedRecordMap.remove(recordId); + idsRemoved.add(recordId); + } else if (updateType == UpdateType.SWAP_IN) { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocations.remove(location); + updatedRecordMap.put(recordId, record); + idsRemoved.remove(recordId); + } + } else if (updateType == UpdateType.SWAP_OUT) { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocations.add(location); + updatedRecordMap.remove(recordId); + idsRemoved.add(recordId); + } + } else { + updatedRecordMap.put(recordId, record); + idsRemoved.remove(recordId); + } + + transactionFlag = recoveryIn.read(); + } while (transactionFlag != TRANSACTION_COMMIT); + + return idsRemoved; + } + + /** + * Must be called after recovery has finished + * + * @return + */ + public long getMaxRecoveredTransactionId() { + return maxTransactionId.get(); + } + + @Override + public String toString() { + return description; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/SerDe.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/main/java/org/wali/SerDe.java b/commons/wali/src/main/java/org/wali/SerDe.java new file mode 100644 index 0000000..bbc7efb --- /dev/null +++ b/commons/wali/src/main/java/org/wali/SerDe.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +/** + * A mechanism for Serializing and De-Serializing a Record of a given Type + * + * @param <T> the type of record that is to be Serialized and De-Serialized by + * this object + */ +public interface SerDe<T> { + + /** + * <p> + * Serializes an Edit Record to the log via the given + * {@link DataOutputStream}. + * </p> + * + * @param previousRecordState + * @param newRecordState + * @param out + * @throws IOException + */ + void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException; + + /** + * <p> + * Serializes a Record in a form suitable for a Snapshot via the given + * {@link DataOutputStream}. + * </p> + * + * @param record + * @param out + * @throws IOException + */ + void serializeRecord(T record, DataOutputStream out) throws IOException; + + /** + * <p> + * Reads an Edit Record from the given {@link DataInputStream} and merges + * that edit with the current version of the record, returning the new, + * merged version. If the Edit Record indicates that the entity was deleted, + * must return a Record with an UpdateType of {@link UpdateType#DELETE}. + * This method must never return <code>null</code>. + * </p> + * + * @param in + * @param currentRecordStates an unmodifiable map of Record ID's to the + * current state of that record + * @param version the version of the SerDe that was used to serialize the + * edit record + * @return + * @throws IOException + */ + T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException; + + /** + * <p> + * Reads a Record from the given {@link DataInputStream} and returns this + * record. If no data is available, returns <code>null</code>. + * </p> + * + * @param in + * @param version the version of the SerDe that was used to serialize the + * record + * @return + * @throws IOException + */ + T deserializeRecord(DataInputStream in, int version) throws IOException; + + /** + * Returns the unique ID for the given record + * + * @param record + * @return + */ + Object getRecordIdentifier(T record); + + /** + * Returns the UpdateType for the given record + * + * @param record + * @return + */ + UpdateType getUpdateType(T record); + + /** + * Returns the external location of the given record; this is used when a + * record is moved away from WALI or is being re-introduced to WALI. For + * example, WALI can be updated with a record of type + * {@link UpdateType#SWAP_OUT} that indicates a Location of + * file://tmp/external1 and can then be re-introduced to WALI by updating + * WALI with a record of type {@link UpdateType#CREATE} that indicates a + * Location of file://tmp/external1 + * + * @param record + * @return + */ + String getLocation(T record); + + /** + * Returns the version that this SerDe will use when writing. This used used + * when serializing/deserializing the edit logs so that if the version + * changes, we are still able to deserialize old versions + * + * @return + */ + int getVersion(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/SyncListener.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/main/java/org/wali/SyncListener.java b/commons/wali/src/main/java/org/wali/SyncListener.java new file mode 100644 index 0000000..ffb11ca --- /dev/null +++ b/commons/wali/src/main/java/org/wali/SyncListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +/** + * <p> + * Provides a callback mechanism by which applicable listeners can be notified + * when a WriteAheadRepository is synched (via the + * {@link WriteAheadRepository#sync()} method) or one of its partitions is + * synched via + * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a + * value of <code>true</code> for the second argument. + * </p> + * + * <p> + * It is not required that an implementation of {@link WriteAheadRepository} + * support this interface. Those that do generally will require that the + * listener be injected via the constructor. + * </p> + * + * <p> + * All implementations of this interface must be thread-safe. + * </p> + * + * <p> + * The {@link #onSync(int)} method will always be called while the associated + * partition is locked. The {@link #onGlobalSync()} will always be called while + * the entire repository is locked. + * </p> + * + */ +public interface SyncListener { + + /** + * This method is called whenever a specific partition is synched via the + * {@link WriteAheadRepository#update(java.util.Collection, boolean)} method + * + * @param partitionIndex the index of the partition that was synched + */ + void onSync(int partitionIndex); + + /** + * This method is called whenever the entire + * <code>WriteAheadRepository</code> is synched via the + * {@link WriteAheadRepository#sync()} method. + */ + void onGlobalSync(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/UpdateType.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/main/java/org/wali/UpdateType.java b/commons/wali/src/main/java/org/wali/UpdateType.java new file mode 100644 index 0000000..1b039f8 --- /dev/null +++ b/commons/wali/src/main/java/org/wali/UpdateType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +/** + * <p> + * Enumerates the valid types of things that can cause a + * {@link WriteAheadRepository} to update its state</p> + */ +public enum UpdateType { + + /** + * Used when a new Record has been created + */ + CREATE, + /** + * Used when a Record has been updated in some way + */ + UPDATE, + /** + * Used to indicate that a Record has been deleted and should be removed + * from the Repository + */ + DELETE, + /** + * Used to indicate that a Record still exists but has been moved elsewhere, + * so that it is no longer maintained by the WALI instance + */ + SWAP_OUT, + /** + * Used to indicate that a Record that was previously Swapped Out is now + * being Swapped In + */ + SWAP_IN; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/main/java/org/wali/WriteAheadRepository.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/main/java/org/wali/WriteAheadRepository.java b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java new file mode 100644 index 0000000..4567872 --- /dev/null +++ b/commons/wali/src/main/java/org/wali/WriteAheadRepository.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +/** + * <p> + * A WriteAheadRepository is used to persist state that is otherwise kept + * in-memory. The Repository does not provide any query capability except to + * allow the data to be recovered upon restart of the system. + * </p> + * + * <p> + * A WriteAheadRepository operates by writing every update to an Edit Log. On + * restart, the data can be recovered by replaying all of the updates that are + * found in the Edit Log. This can, however, eventually result in very large + * Edit Logs, which can both take up massive amounts of disk space and take a + * long time to recover. In order to prevent this, the Repository provides a + * Checkpointing capability. This allows the current in-memory state of the + * Repository to be flushed to disk and the Edit Log to be deleted, thereby + * compacting the amount of space required to store the Repository. After a + * Checkpoint is performed, modifications are again written to an Edit Log. At + * this point, when the system is to be restored, it is restored by first + * loading the Checkpointed version of the Repository and then replaying the + * Edit Log. + * </p> + * + * <p> + * All implementations of <code>WriteAheadRepository</code> use one or more + * partitions to manage their Edit Logs. An implementation may require exactly + * one partition or may allow many partitions. + * </p> + * + * @param <T> + */ +public interface WriteAheadRepository<T> { + + /** + * <p> + * Updates the repository with the specified Records. The Collection must + * not contain multiple records with the same ID + * </p> + * + * @param records the records to update + * @param forceSync specifies whether or not the Repository forces the data + * to be flushed to disk. If false, the data may be stored in Operating + * System buffers, which improves performance but could cause loss of data + * if power is lost or the Operating System crashes + * @throws IOException + * @throws IllegalArgumentException if multiple records within the given + * Collection have the same ID, as specified by {@link Record#getId()} + * method + * + * @return the index of the Partition that performed the update + */ + int update(Collection<T> records, boolean forceSync) throws IOException; + + /** + * <p> + * Recovers all records from the persisted state. This method must be called + * before any updates are issued to the Repository. + * </p> + * + * @return + * @throws IOException + * @throws IllegalStateException if any updates have been issued against + * this Repository before this method is invoked + */ + Collection<T> recoverRecords() throws IOException; + + /** + * <p> + * Recovers all External Swap locations that were persisted. If this method + * is to be called, it must be called AFTER {@link #recoverRecords()} and + * BEFORE {@link update}. + * </p> + * + * @return + * @throws IOException + */ + Set<String> getRecoveredSwapLocations() throws IOException; + + /** + * <p> + * Compacts the contents of the Repository so that rather than having a + * Snapshot and an Edit Log indicating many Updates to the Snapshot, the + * Snapshot is updated to contain the current state of the Repository, and + * the edit log is purged. + * </p> + * + * + * @return the number of records that were written to the new snapshot + * @throws java.io.IOException + */ + int checkpoint() throws IOException; + + /** + * <p> + * Causes the repository to checkpoint and then close any open resources. + * </p> + * + * @throws IOException + */ + void shutdown() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/DummyRecord.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/test/java/org/wali/DummyRecord.java b/commons/wali/src/test/java/org/wali/DummyRecord.java new file mode 100644 index 0000000..e0f7f96 --- /dev/null +++ b/commons/wali/src/test/java/org/wali/DummyRecord.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class DummyRecord { + + private final String id; + private final Map<String, String> props; + private final UpdateType updateType; + + public DummyRecord(final String id, final UpdateType updateType) { + this.id = id; + this.props = new HashMap<>(); + this.updateType = updateType; + } + + public String getId() { + return id; + } + + public UpdateType getUpdateType() { + return updateType; + } + + public DummyRecord setProperties(final Map<String, String> props) { + this.props.clear(); + this.props.putAll(props); + return this; + } + + public DummyRecord setProperty(final String name, final String value) { + this.props.put(name, value); + return this; + } + + public Map<String, String> getProperties() { + return Collections.unmodifiableMap(this.props); + } + + public String getProperty(final String name) { + return props.get(name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/DummyRecordSerde.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/test/java/org/wali/DummyRecordSerde.java b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java new file mode 100644 index 0000000..8cc7860 --- /dev/null +++ b/commons/wali/src/test/java/org/wali/DummyRecordSerde.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Map; + +public class DummyRecordSerde implements SerDe<DummyRecord> { + + public static final int NUM_UPDATE_TYPES = UpdateType.values().length; + private int throwIOEAfterNserializeEdits = -1; + private int serializeEditCount = 0; + + @Override + public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { + if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { + throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); + } + + out.write(record.getUpdateType().ordinal()); + out.writeUTF(record.getId()); + + if (record.getUpdateType() != UpdateType.DELETE) { + final Map<String, String> props = record.getProperties(); + out.writeInt(props.size()); + for (final Map.Entry<String, String> entry : props.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + } + + @Override + public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { + serializeEdit(null, record, out); + } + + @Override + public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + final int index = in.read(); + if (index < 0) { + throw new EOFException(); + } + if (index >= NUM_UPDATE_TYPES) { + throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values"); + } + final UpdateType updateType = UpdateType.values()[index]; + final String id = in.readUTF(); + final DummyRecord record = new DummyRecord(id, updateType); + + if (record.getUpdateType() != UpdateType.DELETE) { + final int numProps = in.readInt(); + for (int i = 0; i < numProps; i++) { + final String key = in.readUTF(); + final String value = in.readUTF(); + record.setProperty(key, value); + } + } + return record; + } + + @Override + public Object getRecordIdentifier(final DummyRecord record) { + return record.getId(); + } + + @Override + public UpdateType getUpdateType(final DummyRecord record) { + return record.getUpdateType(); + } + + @Override + public DummyRecord deserializeEdit(final DataInputStream in, final Map<Object, DummyRecord> currentVersion, final int version) throws IOException { + return deserializeRecord(in, version); + } + + @Override + public int getVersion() { + return 1; + } + + public void setThrowIOEAfterNSerializeEdits(final int n) { + this.throwIOEAfterNserializeEdits = n; + } + + @Override + public String getLocation(final DummyRecord record) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java new file mode 100644 index 0000000..57f3495 --- /dev/null +++ b/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class TestMinimalLockingWriteAheadLog { + + @Test + public void testWrite() throws IOException, InterruptedException { + final int numPartitions = 8; + + final Path path = Paths.get("target/minimal-locking-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final List<InsertThread> threads = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + threads.add(new InsertThread(10000, 1000000 * i, repo)); + } + + final long start = System.nanoTime(); + for (final InsertThread thread : threads) { + thread.start(); + } + for (final InsertThread thread : threads) { + thread.join(); + } + final long nanos = System.nanoTime() - start; + final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); + System.out.println("Took " + millis + " millis to insert 1,000,000 records each in its own transaction"); + repo.shutdown(); + + final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); + assertFalse(recoveredRecords.isEmpty()); + assertEquals(100000, recoveredRecords.size()); + for (final DummyRecord record : recoveredRecords) { + final Map<String, String> recoveredProps = record.getProperties(); + assertEquals(1, recoveredProps.size()); + assertEquals("B", recoveredProps.get("A")); + } + } + + @Test + public void testRecoverAfterIOException() throws IOException { + final int numPartitions = 5; + final Path path = Paths.get("target/minimal-locking-repo-test-recover-after-ioe"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + serde.setThrowIOEAfterNSerializeEdits(7); // serialize the 2 transactions, then the first edit of the third transaction; then throw IOException + + final List<DummyRecord> firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List<DummyRecord> secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List<DummyRecord> thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + repo.update(firstTransaction, true); + repo.update(secondTransaction, true); + try { + repo.update(thirdTransaction, true); + Assert.fail("Did not throw IOException on third transaction"); + } catch (final IOException e) { + // expected behavior. + } + + repo.shutdown(); + + serde.setThrowIOEAfterNSerializeEdits(-1); + final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); + assertFalse(recoveredRecords.isEmpty()); + assertEquals(3, recoveredRecords.size()); + + boolean record1 = false, record2 = false, record3 = false; + for (final DummyRecord record : recoveredRecords) { + switch (record.getId()) { + case "1": + record1 = true; + assertEquals("123", record.getProperty("abc")); + break; + case "2": + record2 = true; + assertEquals("123", record.getProperty("cba")); + break; + case "3": + record3 = true; + assertEquals("123", record.getProperty("aaa")); + break; + } + } + + assertTrue(record1); + assertTrue(record2); + assertTrue(record3); + } + + @Test + public void testCannotModifyLogAfterAllAreBlackListed() throws IOException { + final int numPartitions = 5; + final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + serde.setThrowIOEAfterNSerializeEdits(3); // serialize the first transaction, then fail on all subsequent transactions + + final List<DummyRecord> firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List<DummyRecord> secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List<DummyRecord> thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + repo.update(firstTransaction, true); + + try { + repo.update(secondTransaction, true); + Assert.fail("Did not throw IOException on second transaction"); + } catch (final IOException e) { + // expected behavior. + } + + for (int i = 0; i < 4; i++) { + try { + repo.update(thirdTransaction, true); + Assert.fail("Did not throw IOException on third transaction"); + } catch (final IOException e) { + // expected behavior. + } + } + + serde.setThrowIOEAfterNSerializeEdits(-1); + final List<DummyRecord> fourthTransaction = new ArrayList<>(); + fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + + try { + repo.update(fourthTransaction, true); + Assert.fail("Successfully updated repo for 4th transaction"); + } catch (final IOException e) { + // expected behavior + assertTrue(e.getMessage().contains("All Partitions have been blacklisted")); + } + + repo.shutdown(); + serde.setThrowIOEAfterNSerializeEdits(-1); + + final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); + assertFalse(recoveredRecords.isEmpty()); + assertEquals(3, recoveredRecords.size()); + } + + @Test + public void testStriping() throws IOException { + final int numPartitions = 6; + final Path path = Paths.get("target/minimal-locking-repo-striped"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final SortedSet<Path> paths = new TreeSet<>(); + paths.add(path.resolve("stripe-1")); + paths.add(path.resolve("stripe-2")); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final InsertThread inserter = new InsertThread(100000, 0, repo); + inserter.run(); + + for (final Path partitionPath : paths) { + final File[] files = partitionPath.toFile().listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith("partition"); + } + }); + assertEquals(3, files.length); + + for (final File file : files) { + final File[] journalFiles = file.listFiles(); + assertEquals(1, journalFiles.length); + } + } + + repo.checkpoint(); + + } + + private static class InsertThread extends Thread { + + private final List<List<DummyRecord>> records; + private final WriteAheadRepository<DummyRecord> repo; + + public InsertThread(final int numInsertions, final int startIndex, final WriteAheadRepository<DummyRecord> repo) { + records = new ArrayList<>(); + for (int i = 0; i < numInsertions; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i + startIndex), UpdateType.CREATE); + record.setProperty("A", "B"); + final List<DummyRecord> list = new ArrayList<>(); + list.add(record); + records.add(list); + } + this.repo = repo; + } + + @Override + public void run() { + try { + int counter = 0; + for (final List<DummyRecord> list : records) { + final boolean forceSync = (++counter == records.size()); + repo.update(list, forceSync); + } + } catch (IOException e) { + Assert.fail("Failed to update: " + e.toString()); + e.printStackTrace(); + } + } + } + + private void deleteRecursively(final File file) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + + file.delete(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/file-authorization-provider/pom.xml b/extensions/file-authorization-provider/pom.xml new file mode 100644 index 0000000..f8d823f --- /dev/null +++ b/extensions/file-authorization-provider/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>file-authorization-provider</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>Authorization Provider: File</name> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + <resource> + <directory>src/main/xsd</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>com.sun.tools.xjc.maven2</groupId> + <artifactId>maven-jaxb-plugin</artifactId> + <executions> + <execution> + <id>current</id> + <goals> + <goal>generate</goal> + </goals> + <configuration> + <schemaDirectory>src/main/xsd</schemaDirectory> + <includeSchemas> + <includeSchema>**/*.xsd</includeSchema> + </includeSchemas> + <generatePackage>org.apache.nifi.user.generated</generatePackage> + </configuration> + </execution> + </executions> + <configuration> + <generateDirectory>${project.build.directory}/generated-sources/jaxb</generateDirectory> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.10</version> + <scope>test</scope> + </dependency> + </dependencies> +</project>