http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/pom.xml b/nifi/commons/wali/pom.xml deleted file mode 100644 index 0d653f8..0000000 --- a/nifi/commons/wali/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-commons-parent</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - </parent> - - <artifactId>wali</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - <packaging>jar</packaging> - - <name>WALI : Write-Ahead Log Implementation</name> - - <dependencies> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java deleted file mode 100644 index 19208d3..0000000 --- a/nifi/commons/wali/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ /dev/null @@ -1,1008 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import static java.util.Objects.requireNonNull; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p> - * This implementation provides as little Locking as possible in order to - * provide the highest throughput possible. However, this implementation is ONLY - * appropriate if it can be guaranteed that only a single thread will ever issue - * updates for a given Record at any one time. - * </p> - * - * @param <T> - */ -public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> { - - private final Path basePath; - private final Path partialPath; - private final Path snapshotPath; - - private final SerDe<T> serde; - private final SyncListener syncListener; - private final FileChannel lockChannel; - private final AtomicLong transactionIdGenerator = new AtomicLong(0L); - - private final Partition<T>[] partitions; - private final AtomicLong partitionIndex = new AtomicLong(0L); - private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>(); - private final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(recordMap); - private final Set<String> externalLocations = new CopyOnWriteArraySet<>(); - - private final Set<String> recoveredExternalLocations = new CopyOnWriteArraySet<>(); - - private final AtomicInteger numberBlackListedPartitions = new AtomicInteger(0); - - private static final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); // required to update a partition - private final Lock writeLock = rwLock.writeLock(); // required for checkpoint - - private volatile boolean updated = false; - private volatile boolean recovered = false; - - public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { - this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener); - } - - /** - * - * @param paths a sorted set of Paths to use for the partitions/journals and - * the snapshot. The snapshot will always be written to the first path - * specified. - * - * @param partitionCount the number of partitions/journals to use. For best - * performance, this should be close to the number of threads that are - * expected to update the repository simultaneously - * - * @param serde - * @param syncListener - * @throws IOException - */ - @SuppressWarnings("unchecked") - public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { - this.syncListener = syncListener; - - requireNonNull(paths); - requireNonNull(serde); - - if (paths.isEmpty()) { - throw new IllegalArgumentException("Paths must be non-empty"); - } - - int existingPartitions = 0; - for (final Path path : paths) { - if (!Files.exists(path)) { - Files.createDirectories(path); - } - - final File file = path.toFile(); - if (!file.isDirectory()) { - throw new IOException("Path given [" + path + "] is not a directory"); - } - if (!file.canWrite()) { - throw new IOException("Path given [" + path + "] is not writable"); - } - if (!file.canRead()) { - throw new IOException("Path given [" + path + "] is not readable"); - } - if (!file.canExecute()) { - throw new IOException("Path given [" + path + "] is not executable"); - } - - final File[] children = file.listFiles(); - if (children != null) { - for (final File child : children) { - if (child.isDirectory() && child.getName().startsWith("partition-")) { - existingPartitions++; - } - } - - if (existingPartitions != 0 && existingPartitions != partitionCount) { - logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has " - + "{} partitions; ignoring argument and proceeding with {} partitions", - new Object[]{partitionCount, existingPartitions, existingPartitions}); - } - } - } - - this.basePath = paths.iterator().next(); - this.partialPath = basePath.resolve("snapshot.partial"); - this.snapshotPath = basePath.resolve("snapshot"); - this.serde = serde; - - final Path lockPath = basePath.resolve("wali.lock"); - lockChannel = new FileOutputStream(lockPath.toFile()).getChannel(); - lockChannel.lock(); - - partitions = new Partition[partitionCount]; - - Iterator<Path> pathIterator = paths.iterator(); - for (int i = 0; i < partitionCount; i++) { - // If we're out of paths, create a new iterator to start over. - if (!pathIterator.hasNext()) { - pathIterator = paths.iterator(); - } - - final Path partitionBasePath = pathIterator.next(); - - partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion()); - } - } - - @Override - public int update(final Collection<T> records, final boolean forceSync) throws IOException { - if (!recovered) { - throw new IllegalStateException("Cannot update repository until record recovery has been performed"); - } - - if (records.isEmpty()) { - return -1; - } - - updated = true; - readLock.lock(); - try { - while (true) { - final int numBlackListed = numberBlackListedPartitions.get(); - if (numBlackListed >= partitions.length) { - throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required."); - } - - final long partitionIdx = partitionIndex.getAndIncrement(); - final int resolvedIdx = (int) (partitionIdx % partitions.length); - final Partition<T> partition = partitions[resolvedIdx]; - if (partition.tryClaim()) { - try { - final long transactionId = transactionIdGenerator.getAndIncrement(); - if (logger.isTraceEnabled()) { - for (final T record : records) { - logger.trace("Partition {} performing Transaction {}: {}", new Object[]{partition, transactionId, record}); - } - } - - try { - partition.update(records, transactionId, unmodifiableRecordMap, forceSync); - } catch (final Exception e) { - partition.blackList(); - numberBlackListedPartitions.incrementAndGet(); - throw e; - } - - if (forceSync && syncListener != null) { - syncListener.onSync(resolvedIdx); - } - } finally { - partition.releaseClaim(); - } - - for (final T record : records) { - final UpdateType updateType = serde.getUpdateType(record); - final Object recordIdentifier = serde.getRecordIdentifier(record); - - if (updateType == UpdateType.DELETE) { - recordMap.remove(recordIdentifier); - } else if (updateType == UpdateType.SWAP_OUT) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!"); - } else { - recordMap.remove(recordIdentifier); - this.externalLocations.add(newLocation); - } - } else if (updateType == UpdateType.SWAP_IN) { - final String newLocation = serde.getLocation(record); - if (newLocation == null) { - logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!"); - } else { - externalLocations.remove(newLocation); - } - recordMap.put(recordIdentifier, record); - } else { - recordMap.put(recordIdentifier, record); - } - } - - return resolvedIdx; - } - } - } finally { - readLock.unlock(); - } - } - - @Override - public Collection<T> recoverRecords() throws IOException { - if (updated) { - throw new IllegalStateException("Cannot recover records after updating the repository; must call recoverRecords first"); - } - - final long recoverStart = System.nanoTime(); - writeLock.lock(); - try { - Long maxTransactionId = recoverFromSnapshot(recordMap); - recoverFromEdits(recordMap, maxTransactionId); - - for (final Partition<T> partition : partitions) { - final long transId = partition.getMaxRecoveredTransactionId(); - if (maxTransactionId == null || transId > maxTransactionId) { - maxTransactionId = transId; - } - } - - this.transactionIdGenerator.set(maxTransactionId + 1); - this.externalLocations.addAll(recoveredExternalLocations); - logger.info("{} finished recovering records. Performing Checkpoint to ensure proper state of Partitions before updates", this); - } finally { - writeLock.unlock(); - } - final long recoverNanos = System.nanoTime() - recoverStart; - final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); - logger.info("Successfully recovered {} records in {} milliseconds", recordMap.size(), recoveryMillis); - checkpoint(); - - recovered = true; - return recordMap.values(); - } - - @Override - public Set<String> getRecoveredSwapLocations() throws IOException { - return recoveredExternalLocations; - } - - private Long recoverFromSnapshot(final Map<Object, T> recordMap) throws IOException { - final boolean partialExists = Files.exists(partialPath); - final boolean snapshotExists = Files.exists(snapshotPath); - - if (!partialExists && !snapshotExists) { - return null; - } - - if (partialExists && snapshotExists) { - // both files exist -- assume we failed while checkpointing. Delete - // the partial file - Files.delete(partialPath); - } else if (partialExists) { - // partial exists but snapshot does not -- we must have completed - // creating the partial, deleted the snapshot - // but crashed before renaming the partial to the snapshot. Just - // rename partial to snapshot - Files.move(partialPath, snapshotPath); - } - - if (Files.size(snapshotPath) == 0) { - logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this); - return null; - } - - // at this point, we know the snapshotPath exists because if it didn't, then we either returned null - // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath. - try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath, StandardOpenOption.READ)))) { - final String waliImplementationClass = dataIn.readUTF(); - final int waliImplementationVersion = dataIn.readInt(); - - if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) { - throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName()); - } - - if (waliImplementationVersion > getVersion()) { - throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion()); - } - - dataIn.readUTF(); // ignore serde class name for now - final int serdeVersion = dataIn.readInt(); - final long maxTransactionId = dataIn.readLong(); - final int numRecords = dataIn.readInt(); - - for (int i = 0; i < numRecords; i++) { - final T record = serde.deserializeRecord(dataIn, serdeVersion); - if (record == null) { - throw new EOFException(); - } - - final UpdateType updateType = serde.getUpdateType(record); - if (updateType == UpdateType.DELETE) { - logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored"); - continue; - } - - logger.trace("Recovered from snapshot: {}", record); - recordMap.put(serde.getRecordIdentifier(record), record); - } - - final int numSwapRecords = dataIn.readInt(); - final Set<String> swapLocations = new HashSet<>(); - for (int i = 0; i < numSwapRecords; i++) { - swapLocations.add(dataIn.readUTF()); - } - this.recoveredExternalLocations.addAll(swapLocations); - - logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId}); - return maxTransactionId; - } - } - - /** - * Recovers records from the edit logs via the Partitions. Returns a boolean - * if recovery of a Partition requires the Write-Ahead Log be checkpointed - * before modification. - * - * @param modifiableRecordMap - * @param maxTransactionIdRestored - * @return - * @throws IOException - */ - private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException { - final Map<Object, T> updateMap = new HashMap<>(); - final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(modifiableRecordMap); - final Map<Object, T> ignorableMap = new HashMap<>(); - final Set<String> ignorableSwapLocations = new HashSet<>(); - - // populate a map of the next transaction id for each partition to the - // partition that has that next transaction id. - final SortedMap<Long, Partition<T>> transactionMap = new TreeMap<>(); - for (final Partition<T> partition : partitions) { - Long transactionId; - boolean keepTransaction; - do { - transactionId = partition.getNextRecoverableTransactionId(); - - keepTransaction = transactionId == null || maxTransactionIdRestored == null || transactionId > maxTransactionIdRestored; - if (keepTransaction && transactionId != null) { - // map this transaction id to its partition so that we can - // start restoring transactions from this partition, - // starting at 'transactionId' - transactionMap.put(transactionId, partition); - } else if (transactionId != null) { - // skip the next transaction, because our snapshot already - // contained this transaction. - try { - partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations); - } catch (final EOFException e) { - logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.", - new Object[]{this, partition, transactionId}); - } - } - } while (!keepTransaction); - } - - while (!transactionMap.isEmpty()) { - final Map.Entry<Long, Partition<T>> firstEntry = transactionMap.entrySet().iterator().next(); - final Long firstTransactionId = firstEntry.getKey(); - final Partition<T> nextPartition = firstEntry.getValue(); - - try { - updateMap.clear(); - final Set<Object> idsRemoved = nextPartition.recoverNextTransaction(unmodifiableRecordMap, updateMap, recoveredExternalLocations); - modifiableRecordMap.putAll(updateMap); - for (final Object id : idsRemoved) { - modifiableRecordMap.remove(id); - } - } catch (final EOFException e) { - logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", - new Object[]{this, nextPartition, firstTransactionId}); - } - - transactionMap.remove(firstTransactionId); - - Long subsequentTransactionId = null; - try { - subsequentTransactionId = nextPartition.getNextRecoverableTransactionId(); - } catch (final IOException e) { - logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction", - new Object[]{this, nextPartition, firstTransactionId}); - } - - if (subsequentTransactionId != null) { - transactionMap.put(subsequentTransactionId, nextPartition); - } - } - - for (final Partition<T> partition : partitions) { - partition.endRecovery(); - } - } - - @Override - public synchronized int checkpoint() throws IOException { - final Set<T> records; - final Set<String> swapLocations; - final long maxTransactionId; - - final long startNanos = System.nanoTime(); - - FileOutputStream fileOut = null; - DataOutputStream dataOut = null; - - long stopTheWorldNanos = -1L; - long stopTheWorldStart = -1L; - try { - writeLock.lock(); - try { - stopTheWorldStart = System.nanoTime(); - // stop the world while we make a copy of the records that must - // be checkpointed and rollover the partitions. - // We copy the records because serializing them is potentially - // very expensive, especially when we have hundreds - // of thousands or even millions of them. We don't want to - // prevent WALI from being used during this time. - - // So the design is to copy all of the records, determine the - // last transaction ID that the records represent, - // and roll over the partitions to new write-ahead logs. - // Then, outside of the write lock, we will serialize the data - // to disk, and then remove the old Partition data. - records = new HashSet<>(recordMap.values()); - maxTransactionId = transactionIdGenerator.get() - 1; - - swapLocations = new HashSet<>(externalLocations); - for (final Partition<T> partition : partitions) { - partition.rollover(); - } - - // notify global sync with the write lock held. We do this because we don't want the repository to get updated - // while the listener is performing its necessary tasks - if (syncListener != null) { - syncListener.onGlobalSync(); - } - } finally { - writeLock.unlock(); - } - - stopTheWorldNanos = System.nanoTime() - stopTheWorldStart; - - // perform checkpoint, writing to .partial file - fileOut = new FileOutputStream(partialPath.toFile()); - dataOut = new DataOutputStream(fileOut); - dataOut.writeUTF(MinimalLockingWriteAheadLog.class.getName()); - dataOut.writeInt(getVersion()); - dataOut.writeUTF(serde.getClass().getName()); - dataOut.writeInt(serde.getVersion()); - dataOut.writeLong(maxTransactionId); - dataOut.writeInt(records.size()); - - for (final T record : records) { - logger.trace("Checkpointing {}", record); - serde.serializeRecord(record, dataOut); - } - - dataOut.writeInt(swapLocations.size()); - for (final String swapLocation : swapLocations) { - dataOut.writeUTF(swapLocation); - } - } finally { - if (dataOut != null) { - try { - dataOut.flush(); - fileOut.getFD().sync(); - dataOut.close(); - } catch (final IOException e) { - logger.warn("Failed to close Data Stream due to {}", e.toString(), e); - } - } - } - - // delete the snapshot, if it exists, and rename the .partial to - // snapshot - Files.deleteIfExists(snapshotPath); - Files.move(partialPath, snapshotPath); - - // clear all of the edit logs - final long partitionStart = System.nanoTime(); - for (final Partition<T> partition : partitions) { - // we can call clearOld without claiming the partition because it - // does not change the partition's state - // and the only member variable it touches cannot be modified, other - // than when #rollover() is called. - // And since this method is the only one that calls #rollover() and - // this method is synchronized, - // the value of that member variable will not change. And it's - // volatile, so we will get the correct value. - partition.clearOld(); - } - final long partitionEnd = System.nanoTime(); - numberBlackListedPartitions.set(0); - - final long endNanos = System.nanoTime(); - final long millis = TimeUnit.MILLISECONDS.convert(endNanos - startNanos, TimeUnit.NANOSECONDS); - final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS); - final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos); - - logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}", - new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId}); - - return records.size(); - } - - @Override - public void shutdown() throws IOException { - writeLock.lock(); - try { - for (final Partition<T> partition : partitions) { - partition.close(); - } - } finally { - writeLock.unlock(); - lockChannel.close(); - } - } - - public int getVersion() { - return 1; - } - - /** - * Represents a partition of this repository, which maps directly to a - * .journal file. - * - * All methods with the exceptions of {@link #claim()}, {@link #tryClaim()}, - * and {@link #releaseClaim()} in this Partition MUST be called while - * holding the claim (via {@link #claim} or {@link #tryClaim()). - * - * @param <S> - */ - private static class Partition<S> { - - public static final String JOURNAL_EXTENSION = ".journal"; - private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); - - private final SerDe<S> serde; - - private final Path editDirectory; - private final int writeAheadLogVersion; - - private final Lock lock = new ReentrantLock(); - private DataOutputStream dataOut = null; - private FileOutputStream fileOut = null; - private boolean blackListed = false; - private boolean closed = false; - private DataInputStream recoveryIn; - private int recoveryVersion; - private String currentJournalFilename = ""; - - private static final byte TRANSACTION_CONTINUE = 1; - private static final byte TRANSACTION_COMMIT = 2; - - private final String description; - private final AtomicLong maxTransactionId = new AtomicLong(-1L); - private final Logger logger = LoggerFactory.getLogger(MinimalLockingWriteAheadLog.class); - - private final Queue<Path> recoveryFiles; - - public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException { - this.editDirectory = path; - this.serde = serde; - - final File file = path.toFile(); - if (!file.exists() && !file.mkdirs()) { - throw new IOException("Could not create directory " + file.getAbsolutePath()); - } - - this.recoveryFiles = new LinkedBlockingQueue<>(); - for (final Path recoveryPath : getRecoveryPaths()) { - recoveryFiles.add(recoveryPath); - } - - this.description = "Partition-" + partitionIndex; - this.writeAheadLogVersion = writeAheadLogVersion; - } - - public boolean tryClaim() { - final boolean obtainedLock = lock.tryLock(); - if (!obtainedLock) { - return false; - } - - // Check if the partition is blacklisted. If so, unlock it and return false. Otherwise, - // leave it locked and return true, so that the caller will need to unlock. - if (blackListed) { - lock.unlock(); - return false; - } - - return true; - } - - public void releaseClaim() { - lock.unlock(); - } - - public void close() { - final DataOutputStream out = dataOut; - if (out != null) { - try { - out.close(); - } catch (final Exception e) { - - } - } - - this.closed = true; - this.dataOut = null; - } - - public void blackList() { - lock.lock(); - try { - blackListed = true; - } finally { - lock.unlock(); - } - logger.debug("Blacklisted {}", this); - } - - /** - * Closes resources pointing to the current journal and begins writing - * to a new one - * - * @throws IOException - */ - public void rollover() throws IOException { - lock.lock(); - try { - final DataOutputStream out = dataOut; - if (out != null) { - out.close(); - } - - final Path editPath = getNewEditPath(); - final FileOutputStream fos = new FileOutputStream(editPath.toFile()); - final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); - outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); - outStream.writeInt(writeAheadLogVersion); - outStream.writeUTF(serde.getClass().getName()); - outStream.writeInt(serde.getVersion()); - outStream.flush(); - dataOut = outStream; - fileOut = fos; - - currentJournalFilename = editPath.toFile().getName(); - - blackListed = false; - } finally { - lock.unlock(); - } - } - - private long getJournalIndex(final File file) { - final String filename = file.getName(); - final int dotIndex = filename.indexOf("."); - final String number = filename.substring(0, dotIndex); - return Long.parseLong(number); - } - - private Path getNewEditPath() { - final List<Path> recoveryPaths = getRecoveryPaths(); - final long newIndex; - if (recoveryPaths == null || recoveryPaths.isEmpty()) { - newIndex = 1; - } else { - final long lastFileIndex = getJournalIndex(recoveryPaths.get(recoveryPaths.size() - 1).toFile()); - newIndex = lastFileIndex + 1; - } - - return editDirectory.resolve(newIndex + JOURNAL_EXTENSION); - } - - private List<Path> getRecoveryPaths() { - final List<Path> paths = new ArrayList<>(); - - final File directory = editDirectory.toFile(); - final File[] partitionFiles = directory.listFiles(); - if (partitionFiles == null) { - return paths; - } - - for (final File file : partitionFiles) { - // if file is a journal file but no data has yet been persisted, it may - // very well be a 0-byte file (the journal is not SYNC'ed to disk after - // a header is written out, so it may be lost). In this case, the journal - // is empty, so we can just skip it. - if (file.isDirectory() || file.length() == 0L) { - continue; - } - - if (!JOURNAL_FILENAME_PATTERN.matcher(file.getName()).matches()) { - continue; - } - - if (isJournalFile(file)) { - paths.add(file.toPath()); - } else { - logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath()); - } - } - - // Sort journal files by the numeric portion of the filename - Collections.sort(paths, new Comparator<Path>() { - @Override - public int compare(final Path o1, final Path o2) { - if (o1 == null && o2 == null) { - return 0; - } - if (o1 == null) { - return 1; - } - if (o2 == null) { - return -1; - } - - final long index1 = getJournalIndex(o1.toFile()); - final long index2 = getJournalIndex(o2.toFile()); - return Long.compare(index1, index2); - } - }); - - return paths; - } - - void clearOld() { - final List<Path> oldRecoveryFiles = getRecoveryPaths(); - - for (final Path path : oldRecoveryFiles) { - final File file = path.toFile(); - if (file.getName().equals(currentJournalFilename)) { - continue; - } - if (file.exists()) { - file.delete(); - } - } - } - - private boolean isJournalFile(final File file) { - final String expectedStartsWith = MinimalLockingWriteAheadLog.class.getName(); - try { - try (final FileInputStream fis = new FileInputStream(file); - final InputStream bufferedIn = new BufferedInputStream(fis); - final DataInputStream in = new DataInputStream(bufferedIn)) { - final String waliImplClassName = in.readUTF(); - if (!expectedStartsWith.equals(waliImplClassName)) { - return false; - } - } - } catch (final IOException e) { - return false; - } - - return true; - } - - public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException { - if (this.closed) { - throw new IllegalStateException("Partition is closed"); - } - - final DataOutputStream out = dataOut; - out.writeLong(transactionId); - - final int numEditsToSerialize = records.size(); - int editsSerialized = 0; - for (final S record : records) { - final Object recordId = serde.getRecordIdentifier(record); - final S previousVersion = recordMap.get(recordId); - - serde.serializeEdit(previousVersion, record, out); - if (++editsSerialized < numEditsToSerialize) { - out.write(TRANSACTION_CONTINUE); - } else { - out.write(TRANSACTION_COMMIT); - } - } - - out.flush(); - - if (forceSync) { - fileOut.getFD().sync(); - } - } - - private DataInputStream createDataInputStream(final Path path) throws IOException { - return new DataInputStream(new BufferedInputStream(Files.newInputStream(path))); - } - - private DataInputStream getRecoveryStream() throws IOException { - if (recoveryIn != null && hasMoreData(recoveryIn)) { - return recoveryIn; - } - - while (true) { - final Path nextRecoveryPath = recoveryFiles.poll(); - if (nextRecoveryPath == null) { - return null; - } - - recoveryIn = createDataInputStream(nextRecoveryPath); - if (hasMoreData(recoveryIn)) { - final String waliImplementationClass = recoveryIn.readUTF(); - if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) { - continue; - } - - final long waliVersion = recoveryIn.readInt(); - if (waliVersion > writeAheadLogVersion) { - throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion); - } - - @SuppressWarnings("unused") - final String serdeClassName = recoveryIn.readUTF(); - this.recoveryVersion = recoveryIn.readInt(); - - break; - } - } - - return recoveryIn; - } - - public Long getNextRecoverableTransactionId() throws IOException { - while (true) { - DataInputStream recoveryStream = getRecoveryStream(); - if (recoveryStream == null) { - return null; - } - - final long transactionId; - try { - transactionId = recoveryIn.readLong(); - } catch (final EOFException e) { - continue; - } - - this.maxTransactionId.set(transactionId); - return transactionId; - } - } - - private boolean hasMoreData(final InputStream in) throws IOException { - in.mark(1); - final int nextByte = in.read(); - in.reset(); - return nextByte >= 0; - } - - public void endRecovery() throws IOException { - if (recoveryIn != null) { - recoveryIn.close(); - } - - final Path nextRecoveryPath = this.recoveryFiles.poll(); - if (nextRecoveryPath != null) { - throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory); - } - - final Path newEditPath = getNewEditPath(); - - final FileOutputStream fos = new FileOutputStream(newEditPath.toFile()); - final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); - outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); - outStream.writeInt(writeAheadLogVersion); - outStream.writeUTF(serde.getClass().getName()); - outStream.writeInt(serde.getVersion()); - outStream.flush(); - dataOut = outStream; - fileOut = fos; - } - - public Set<Object> recoverNextTransaction(final Map<Object, S> currentRecordMap, final Map<Object, S> updatedRecordMap, final Set<String> swapLocations) throws IOException { - final Set<Object> idsRemoved = new HashSet<>(); - - int transactionFlag; - do { - final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); - if (logger.isTraceEnabled()) { - logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record}); - } - - final Object recordId = serde.getRecordIdentifier(record); - final UpdateType updateType = serde.getUpdateType(record); - if (updateType == UpdateType.DELETE) { - updatedRecordMap.remove(recordId); - idsRemoved.add(recordId); - } else if (updateType == UpdateType.SWAP_IN) { - final String location = serde.getLocation(record); - if (location == null) { - logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record"); - } else { - swapLocations.remove(location); - updatedRecordMap.put(recordId, record); - idsRemoved.remove(recordId); - } - } else if (updateType == UpdateType.SWAP_OUT) { - final String location = serde.getLocation(record); - if (location == null) { - logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record"); - } else { - swapLocations.add(location); - updatedRecordMap.remove(recordId); - idsRemoved.add(recordId); - } - } else { - updatedRecordMap.put(recordId, record); - idsRemoved.remove(recordId); - } - - transactionFlag = recoveryIn.read(); - } while (transactionFlag != TRANSACTION_COMMIT); - - return idsRemoved; - } - - /** - * Must be called after recovery has finished - * - * @return - */ - public long getMaxRecoveredTransactionId() { - return maxTransactionId.get(); - } - - @Override - public String toString() { - return description; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/main/java/org/wali/SerDe.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/main/java/org/wali/SerDe.java b/nifi/commons/wali/src/main/java/org/wali/SerDe.java deleted file mode 100644 index bbc7efb..0000000 --- a/nifi/commons/wali/src/main/java/org/wali/SerDe.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Map; - -/** - * A mechanism for Serializing and De-Serializing a Record of a given Type - * - * @param <T> the type of record that is to be Serialized and De-Serialized by - * this object - */ -public interface SerDe<T> { - - /** - * <p> - * Serializes an Edit Record to the log via the given - * {@link DataOutputStream}. - * </p> - * - * @param previousRecordState - * @param newRecordState - * @param out - * @throws IOException - */ - void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException; - - /** - * <p> - * Serializes a Record in a form suitable for a Snapshot via the given - * {@link DataOutputStream}. - * </p> - * - * @param record - * @param out - * @throws IOException - */ - void serializeRecord(T record, DataOutputStream out) throws IOException; - - /** - * <p> - * Reads an Edit Record from the given {@link DataInputStream} and merges - * that edit with the current version of the record, returning the new, - * merged version. If the Edit Record indicates that the entity was deleted, - * must return a Record with an UpdateType of {@link UpdateType#DELETE}. - * This method must never return <code>null</code>. - * </p> - * - * @param in - * @param currentRecordStates an unmodifiable map of Record ID's to the - * current state of that record - * @param version the version of the SerDe that was used to serialize the - * edit record - * @return - * @throws IOException - */ - T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException; - - /** - * <p> - * Reads a Record from the given {@link DataInputStream} and returns this - * record. If no data is available, returns <code>null</code>. - * </p> - * - * @param in - * @param version the version of the SerDe that was used to serialize the - * record - * @return - * @throws IOException - */ - T deserializeRecord(DataInputStream in, int version) throws IOException; - - /** - * Returns the unique ID for the given record - * - * @param record - * @return - */ - Object getRecordIdentifier(T record); - - /** - * Returns the UpdateType for the given record - * - * @param record - * @return - */ - UpdateType getUpdateType(T record); - - /** - * Returns the external location of the given record; this is used when a - * record is moved away from WALI or is being re-introduced to WALI. For - * example, WALI can be updated with a record of type - * {@link UpdateType#SWAP_OUT} that indicates a Location of - * file://tmp/external1 and can then be re-introduced to WALI by updating - * WALI with a record of type {@link UpdateType#CREATE} that indicates a - * Location of file://tmp/external1 - * - * @param record - * @return - */ - String getLocation(T record); - - /** - * Returns the version that this SerDe will use when writing. This used used - * when serializing/deserializing the edit logs so that if the version - * changes, we are still able to deserialize old versions - * - * @return - */ - int getVersion(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/main/java/org/wali/SyncListener.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/main/java/org/wali/SyncListener.java b/nifi/commons/wali/src/main/java/org/wali/SyncListener.java deleted file mode 100644 index ffb11ca..0000000 --- a/nifi/commons/wali/src/main/java/org/wali/SyncListener.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -/** - * <p> - * Provides a callback mechanism by which applicable listeners can be notified - * when a WriteAheadRepository is synched (via the - * {@link WriteAheadRepository#sync()} method) or one of its partitions is - * synched via - * {@link WriteAheadRepository#update(java.util.Collection, boolean)} with a - * value of <code>true</code> for the second argument. - * </p> - * - * <p> - * It is not required that an implementation of {@link WriteAheadRepository} - * support this interface. Those that do generally will require that the - * listener be injected via the constructor. - * </p> - * - * <p> - * All implementations of this interface must be thread-safe. - * </p> - * - * <p> - * The {@link #onSync(int)} method will always be called while the associated - * partition is locked. The {@link #onGlobalSync()} will always be called while - * the entire repository is locked. - * </p> - * - */ -public interface SyncListener { - - /** - * This method is called whenever a specific partition is synched via the - * {@link WriteAheadRepository#update(java.util.Collection, boolean)} method - * - * @param partitionIndex the index of the partition that was synched - */ - void onSync(int partitionIndex); - - /** - * This method is called whenever the entire - * <code>WriteAheadRepository</code> is synched via the - * {@link WriteAheadRepository#sync()} method. - */ - void onGlobalSync(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/main/java/org/wali/UpdateType.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/main/java/org/wali/UpdateType.java b/nifi/commons/wali/src/main/java/org/wali/UpdateType.java deleted file mode 100644 index 1b039f8..0000000 --- a/nifi/commons/wali/src/main/java/org/wali/UpdateType.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -/** - * <p> - * Enumerates the valid types of things that can cause a - * {@link WriteAheadRepository} to update its state</p> - */ -public enum UpdateType { - - /** - * Used when a new Record has been created - */ - CREATE, - /** - * Used when a Record has been updated in some way - */ - UPDATE, - /** - * Used to indicate that a Record has been deleted and should be removed - * from the Repository - */ - DELETE, - /** - * Used to indicate that a Record still exists but has been moved elsewhere, - * so that it is no longer maintained by the WALI instance - */ - SWAP_OUT, - /** - * Used to indicate that a Record that was previously Swapped Out is now - * being Swapped In - */ - SWAP_IN; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/main/java/org/wali/WriteAheadRepository.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/main/java/org/wali/WriteAheadRepository.java b/nifi/commons/wali/src/main/java/org/wali/WriteAheadRepository.java deleted file mode 100644 index 4567872..0000000 --- a/nifi/commons/wali/src/main/java/org/wali/WriteAheadRepository.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import java.io.IOException; -import java.util.Collection; -import java.util.Set; - -/** - * <p> - * A WriteAheadRepository is used to persist state that is otherwise kept - * in-memory. The Repository does not provide any query capability except to - * allow the data to be recovered upon restart of the system. - * </p> - * - * <p> - * A WriteAheadRepository operates by writing every update to an Edit Log. On - * restart, the data can be recovered by replaying all of the updates that are - * found in the Edit Log. This can, however, eventually result in very large - * Edit Logs, which can both take up massive amounts of disk space and take a - * long time to recover. In order to prevent this, the Repository provides a - * Checkpointing capability. This allows the current in-memory state of the - * Repository to be flushed to disk and the Edit Log to be deleted, thereby - * compacting the amount of space required to store the Repository. After a - * Checkpoint is performed, modifications are again written to an Edit Log. At - * this point, when the system is to be restored, it is restored by first - * loading the Checkpointed version of the Repository and then replaying the - * Edit Log. - * </p> - * - * <p> - * All implementations of <code>WriteAheadRepository</code> use one or more - * partitions to manage their Edit Logs. An implementation may require exactly - * one partition or may allow many partitions. - * </p> - * - * @param <T> - */ -public interface WriteAheadRepository<T> { - - /** - * <p> - * Updates the repository with the specified Records. The Collection must - * not contain multiple records with the same ID - * </p> - * - * @param records the records to update - * @param forceSync specifies whether or not the Repository forces the data - * to be flushed to disk. If false, the data may be stored in Operating - * System buffers, which improves performance but could cause loss of data - * if power is lost or the Operating System crashes - * @throws IOException - * @throws IllegalArgumentException if multiple records within the given - * Collection have the same ID, as specified by {@link Record#getId()} - * method - * - * @return the index of the Partition that performed the update - */ - int update(Collection<T> records, boolean forceSync) throws IOException; - - /** - * <p> - * Recovers all records from the persisted state. This method must be called - * before any updates are issued to the Repository. - * </p> - * - * @return - * @throws IOException - * @throws IllegalStateException if any updates have been issued against - * this Repository before this method is invoked - */ - Collection<T> recoverRecords() throws IOException; - - /** - * <p> - * Recovers all External Swap locations that were persisted. If this method - * is to be called, it must be called AFTER {@link #recoverRecords()} and - * BEFORE {@link update}. - * </p> - * - * @return - * @throws IOException - */ - Set<String> getRecoveredSwapLocations() throws IOException; - - /** - * <p> - * Compacts the contents of the Repository so that rather than having a - * Snapshot and an Edit Log indicating many Updates to the Snapshot, the - * Snapshot is updated to contain the current state of the Repository, and - * the edit log is purged. - * </p> - * - * - * @return the number of records that were written to the new snapshot - * @throws java.io.IOException - */ - int checkpoint() throws IOException; - - /** - * <p> - * Causes the repository to checkpoint and then close any open resources. - * </p> - * - * @throws IOException - */ - void shutdown() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/test/java/org/wali/DummyRecord.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/test/java/org/wali/DummyRecord.java b/nifi/commons/wali/src/test/java/org/wali/DummyRecord.java deleted file mode 100644 index e0f7f96..0000000 --- a/nifi/commons/wali/src/test/java/org/wali/DummyRecord.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class DummyRecord { - - private final String id; - private final Map<String, String> props; - private final UpdateType updateType; - - public DummyRecord(final String id, final UpdateType updateType) { - this.id = id; - this.props = new HashMap<>(); - this.updateType = updateType; - } - - public String getId() { - return id; - } - - public UpdateType getUpdateType() { - return updateType; - } - - public DummyRecord setProperties(final Map<String, String> props) { - this.props.clear(); - this.props.putAll(props); - return this; - } - - public DummyRecord setProperty(final String name, final String value) { - this.props.put(name, value); - return this; - } - - public Map<String, String> getProperties() { - return Collections.unmodifiableMap(this.props); - } - - public String getProperty(final String name) { - return props.get(name); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/test/java/org/wali/DummyRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/test/java/org/wali/DummyRecordSerde.java b/nifi/commons/wali/src/test/java/org/wali/DummyRecordSerde.java deleted file mode 100644 index 8cc7860..0000000 --- a/nifi/commons/wali/src/test/java/org/wali/DummyRecordSerde.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.Map; - -public class DummyRecordSerde implements SerDe<DummyRecord> { - - public static final int NUM_UPDATE_TYPES = UpdateType.values().length; - private int throwIOEAfterNserializeEdits = -1; - private int serializeEditCount = 0; - - @Override - public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { - if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { - throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); - } - - out.write(record.getUpdateType().ordinal()); - out.writeUTF(record.getId()); - - if (record.getUpdateType() != UpdateType.DELETE) { - final Map<String, String> props = record.getProperties(); - out.writeInt(props.size()); - for (final Map.Entry<String, String> entry : props.entrySet()) { - out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue()); - } - } - } - - @Override - public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { - serializeEdit(null, record, out); - } - - @Override - public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - final int index = in.read(); - if (index < 0) { - throw new EOFException(); - } - if (index >= NUM_UPDATE_TYPES) { - throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values"); - } - final UpdateType updateType = UpdateType.values()[index]; - final String id = in.readUTF(); - final DummyRecord record = new DummyRecord(id, updateType); - - if (record.getUpdateType() != UpdateType.DELETE) { - final int numProps = in.readInt(); - for (int i = 0; i < numProps; i++) { - final String key = in.readUTF(); - final String value = in.readUTF(); - record.setProperty(key, value); - } - } - return record; - } - - @Override - public Object getRecordIdentifier(final DummyRecord record) { - return record.getId(); - } - - @Override - public UpdateType getUpdateType(final DummyRecord record) { - return record.getUpdateType(); - } - - @Override - public DummyRecord deserializeEdit(final DataInputStream in, final Map<Object, DummyRecord> currentVersion, final int version) throws IOException { - return deserializeRecord(in, version); - } - - @Override - public int getVersion() { - return 1; - } - - public void setThrowIOEAfterNSerializeEdits(final int n) { - this.throwIOEAfterNserializeEdits = n; - } - - @Override - public String getLocation(final DummyRecord record) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java deleted file mode 100644 index 57f3495..0000000 --- a/nifi/commons/wali/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.wali; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Test; - -public class TestMinimalLockingWriteAheadLog { - - @Test - public void testWrite() throws IOException, InterruptedException { - final int numPartitions = 8; - - final Path path = Paths.get("target/minimal-locking-repo"); - deleteRecursively(path.toFile()); - assertTrue(path.toFile().mkdirs()); - - final DummyRecordSerde serde = new DummyRecordSerde(); - final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> initialRecs = repo.recoverRecords(); - assertTrue(initialRecs.isEmpty()); - - final List<InsertThread> threads = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - threads.add(new InsertThread(10000, 1000000 * i, repo)); - } - - final long start = System.nanoTime(); - for (final InsertThread thread : threads) { - thread.start(); - } - for (final InsertThread thread : threads) { - thread.join(); - } - final long nanos = System.nanoTime() - start; - final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - System.out.println("Took " + millis + " millis to insert 1,000,000 records each in its own transaction"); - repo.shutdown(); - - final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); - assertFalse(recoveredRecords.isEmpty()); - assertEquals(100000, recoveredRecords.size()); - for (final DummyRecord record : recoveredRecords) { - final Map<String, String> recoveredProps = record.getProperties(); - assertEquals(1, recoveredProps.size()); - assertEquals("B", recoveredProps.get("A")); - } - } - - @Test - public void testRecoverAfterIOException() throws IOException { - final int numPartitions = 5; - final Path path = Paths.get("target/minimal-locking-repo-test-recover-after-ioe"); - deleteRecursively(path.toFile()); - Files.createDirectories(path); - - final DummyRecordSerde serde = new DummyRecordSerde(); - final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> initialRecs = repo.recoverRecords(); - assertTrue(initialRecs.isEmpty()); - - serde.setThrowIOEAfterNSerializeEdits(7); // serialize the 2 transactions, then the first edit of the third transaction; then throw IOException - - final List<DummyRecord> firstTransaction = new ArrayList<>(); - firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); - firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); - firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); - - final List<DummyRecord> secondTransaction = new ArrayList<>(); - secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); - secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); - secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); - - final List<DummyRecord> thirdTransaction = new ArrayList<>(); - thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); - thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); - - repo.update(firstTransaction, true); - repo.update(secondTransaction, true); - try { - repo.update(thirdTransaction, true); - Assert.fail("Did not throw IOException on third transaction"); - } catch (final IOException e) { - // expected behavior. - } - - repo.shutdown(); - - serde.setThrowIOEAfterNSerializeEdits(-1); - final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); - assertFalse(recoveredRecords.isEmpty()); - assertEquals(3, recoveredRecords.size()); - - boolean record1 = false, record2 = false, record3 = false; - for (final DummyRecord record : recoveredRecords) { - switch (record.getId()) { - case "1": - record1 = true; - assertEquals("123", record.getProperty("abc")); - break; - case "2": - record2 = true; - assertEquals("123", record.getProperty("cba")); - break; - case "3": - record3 = true; - assertEquals("123", record.getProperty("aaa")); - break; - } - } - - assertTrue(record1); - assertTrue(record2); - assertTrue(record3); - } - - @Test - public void testCannotModifyLogAfterAllAreBlackListed() throws IOException { - final int numPartitions = 5; - final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted"); - deleteRecursively(path.toFile()); - Files.createDirectories(path); - - final DummyRecordSerde serde = new DummyRecordSerde(); - final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> initialRecs = repo.recoverRecords(); - assertTrue(initialRecs.isEmpty()); - - serde.setThrowIOEAfterNSerializeEdits(3); // serialize the first transaction, then fail on all subsequent transactions - - final List<DummyRecord> firstTransaction = new ArrayList<>(); - firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); - firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); - firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); - - final List<DummyRecord> secondTransaction = new ArrayList<>(); - secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); - secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); - secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); - - final List<DummyRecord> thirdTransaction = new ArrayList<>(); - thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); - thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); - - repo.update(firstTransaction, true); - - try { - repo.update(secondTransaction, true); - Assert.fail("Did not throw IOException on second transaction"); - } catch (final IOException e) { - // expected behavior. - } - - for (int i = 0; i < 4; i++) { - try { - repo.update(thirdTransaction, true); - Assert.fail("Did not throw IOException on third transaction"); - } catch (final IOException e) { - // expected behavior. - } - } - - serde.setThrowIOEAfterNSerializeEdits(-1); - final List<DummyRecord> fourthTransaction = new ArrayList<>(); - fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE)); - - try { - repo.update(fourthTransaction, true); - Assert.fail("Successfully updated repo for 4th transaction"); - } catch (final IOException e) { - // expected behavior - assertTrue(e.getMessage().contains("All Partitions have been blacklisted")); - } - - repo.shutdown(); - serde.setThrowIOEAfterNSerializeEdits(-1); - - final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); - final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords(); - assertFalse(recoveredRecords.isEmpty()); - assertEquals(3, recoveredRecords.size()); - } - - @Test - public void testStriping() throws IOException { - final int numPartitions = 6; - final Path path = Paths.get("target/minimal-locking-repo-striped"); - deleteRecursively(path.toFile()); - Files.createDirectories(path); - - final SortedSet<Path> paths = new TreeSet<>(); - paths.add(path.resolve("stripe-1")); - paths.add(path.resolve("stripe-2")); - - final DummyRecordSerde serde = new DummyRecordSerde(); - final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null); - final Collection<DummyRecord> initialRecs = repo.recoverRecords(); - assertTrue(initialRecs.isEmpty()); - - final InsertThread inserter = new InsertThread(100000, 0, repo); - inserter.run(); - - for (final Path partitionPath : paths) { - final File[] files = partitionPath.toFile().listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - return pathname.getName().startsWith("partition"); - } - }); - assertEquals(3, files.length); - - for (final File file : files) { - final File[] journalFiles = file.listFiles(); - assertEquals(1, journalFiles.length); - } - } - - repo.checkpoint(); - - } - - private static class InsertThread extends Thread { - - private final List<List<DummyRecord>> records; - private final WriteAheadRepository<DummyRecord> repo; - - public InsertThread(final int numInsertions, final int startIndex, final WriteAheadRepository<DummyRecord> repo) { - records = new ArrayList<>(); - for (int i = 0; i < numInsertions; i++) { - final DummyRecord record = new DummyRecord(String.valueOf(i + startIndex), UpdateType.CREATE); - record.setProperty("A", "B"); - final List<DummyRecord> list = new ArrayList<>(); - list.add(record); - records.add(list); - } - this.repo = repo; - } - - @Override - public void run() { - try { - int counter = 0; - for (final List<DummyRecord> list : records) { - final boolean forceSync = (++counter == records.size()); - repo.update(list, forceSync); - } - } catch (IOException e) { - Assert.fail("Failed to update: " + e.toString()); - e.printStackTrace(); - } - } - } - - private void deleteRecursively(final File file) { - final File[] children = file.listFiles(); - if (children != null) { - for (final File child : children) { - deleteRecursively(child); - } - } - - file.delete(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/administration/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/administration/.gitignore b/nifi/nar-bundles/framework-bundle/framework/administration/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/administration/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/administration/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/administration/pom.xml b/nifi/nar-bundles/framework-bundle/framework/administration/pom.xml deleted file mode 100644 index 9522fff..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/administration/pom.xml +++ /dev/null @@ -1,116 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - </parent> - - <artifactId>nifi-administration</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - <name>NiFi Administration</name> - <build> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - <resource> - <directory>src/main/xsd</directory> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>jaxb2-maven-plugin</artifactId> - <executions> - <execution> - <id>current</id> - <goals> - <goal>xjc</goal> - </goals> - <configuration> - <packageName>org.apache.nifi.authorization.generated</packageName> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-user-actions</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>core-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security-utils</artifactId> - </dependency> - <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-aop</artifactId> - </dependency> - <dependency> - <groupId>org.aspectj</groupId> - <artifactId>aspectjweaver</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - </dependency> - </dependencies> -</project>