[6/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java -- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java index 2afaa70..324f59f 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java @@ -16,19 +16,445 @@ */ package org.apache.nifi.stream.io; +import java.io.IOException; import java.io.InputStream; /** * This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means * that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance. */ -public class BufferedInputStream extends java.io.BufferedInputStream { +public class BufferedInputStream extends InputStream { -public BufferedInputStream(final InputStream in) { -super(in); +private final InputStream in; + +private static int DEFAULT_BUFFER_SIZE = 8192; + +/** + * The maximum size of array to allocate. + * Some VMs reserve some header words in an array. + * Attempts to allocate larger arrays may result in + * OutOfMemoryError: Requested array size exceeds VM limit + */ +private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; + +/** + * The internal buffer array where the data is stored. When necessary, + * it may be replaced by another array of + * a different size. + */ +protected byte buf[]; + +/** + * The index one greater than the index of the last valid byte in + * the buffer. + * This value is always + * in the range 0 through buf.length; + * elements buf[0] through buf[count-1] + * contain buffered input data obtained + * from the underlying input stream. + */ +private int count; + +/** + * The current position in the buffer. This is the index of the next + * character to be read from the buf array. + * + * This value is always in the range 0 + * through count. If it is less + * than count, then buf[pos] + * is the next byte to be supplied as input; + * if it is equal to count, then + * the next read or skip + * operation will require more bytes to be + * read from the contained input stream. + * + * @see java.io.BufferedInputStream#buf + */ +private int pos; + +/** + * The value of the pos field at the time the last + * mark method was called. + * + * This value is always + * in the range -1 through pos. + * If there is no marked position in the input + * stream, this field is -1. If + * there is a marked position in the input + * stream, then buf[markpos] + * is the first byte to be supplied as input + * after a reset operation. If + * markpos is not -1, + * then all bytes from positions buf[markpos] + * through buf[pos-1] must remain + * in the buffer array (though they may be + * moved to another place in the buffer array, + * with suitable adjustments to the values + * of count, pos, + * and markpos); they may not + * be discarded unless and until the difference + * between pos and markpos + * exceeds marklimit. + * + * @see java.io.BufferedInputStream#mark(int) + * @see java.io.BufferedInputStream#pos + */ +protected int markpos = -1; + +/** + * The maximum read ahead allowed after a call to the + * mark method before subsequent calls to the + * reset method fail. + * Whenever the difference between pos + * and markpos exceeds marklimit, + * then the mark may be dropped by setting + * markpos to -1. + * + * @see java.io.BufferedInputStream#mark(int) + * @see java.io.BufferedInputStream#reset() + */ +protected int marklimit; + +/** + * Check to make sure that underlying input stream has not been + * nulled out due to close; if not return it; + */ +private InputStream getInIfOpen() throws IOException { +InputStream input = in; +if (input == null) { +throw new IOException("Stream closed"); +} +return input; +} + +/** + * Check to make sure that buffer has not been nulled out due to + * close; if not return it; + */ +private byte[] getBufIfOpen() throws IOException { +if (buf == null) { +throw new IOException("Stream closed"); +} +return buf; +} + +/** + * Creates a BufferedInputStream +
[1/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
Repository: nifi Updated Branches: refs/heads/master 5a25884f5 -> 1be087147 http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java -- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java new file mode 100644 index 000..bae2364 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.file.FileUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +public abstract class AbstractTestRecordReaderWriter { +@BeforeClass +public static void setLogLevel() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "INFO"); +} + +protected ProvenanceEventRecord createEvent() { +final Mapattributes = new HashMap<>(); +attributes.put("filename", "1.txt"); +attributes.put("uuid", UUID.randomUUID().toString()); + +final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); +builder.setEventTime(System.currentTimeMillis()); +builder.setEventType(ProvenanceEventType.RECEIVE); +builder.setTransitUri("nifi://unit-test"); +builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); +builder.setComponentId("1234"); +builder.setComponentType("dummy processor"); +final ProvenanceEventRecord record = builder.build(); + +return record; +} + +@Test +public void testSimpleWriteWithToc() throws IOException { +final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite"); +final File tocFile = TocUtil.getTocFile(journalFile); +final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); +final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024); + +writer.writeHeader(1L); +writer.writeRecord(createEvent(), 1L); +writer.close(); + +final TocReader tocReader = new StandardTocReader(tocFile); + +try (final FileInputStream fis = new FileInputStream(journalFile); +final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { +assertEquals(0, reader.getBlockIndex()); +reader.skipToBlock(0); +final StandardProvenanceEventRecord recovered = reader.nextRecord(); +assertNotNull(recovered); + +assertEquals("nifi://unit-test", recovered.getTransitUri()); +assertNull(reader.nextRecord()); +} + +FileUtils.deleteFile(journalFile.getParentFile(), true); +} + + +@Test +public void
[7/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade. NIFI-2854: Incorporated PR review feedback NIFI-2854: Implemented feedback from PR Review NIFI-2854: Ensure that all resources are closed on CompressableRecordReader.close() even if an IOException is thrown when closing one of them This closes #1202 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1be08714 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1be08714 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1be08714 Branch: refs/heads/master Commit: 1be08714731f01347ac1f98e18047fe7d9ab8afd Parents: 5a25884 Author: Mark PayneAuthored: Tue Oct 4 09:38:14 2016 -0400 Committer: Oleg Zhurakousky Committed: Fri Nov 18 14:53:13 2016 -0500 -- .../nifi/provenance/NamedSearchableField.java | 9 +- .../StandardProvenanceEventRecord.java | 38 ++ nifi-commons/nifi-schema-utils/pom.xml | 26 + .../repository/schema/ComplexRecordField.java | 95 .../nifi/repository/schema/FieldMapRecord.java | 81 +++ .../nifi/repository/schema/FieldType.java | 74 +++ .../nifi/repository/schema/MapRecordField.java | 75 +++ .../nifi/repository/schema/NamedValue.java | 36 ++ .../apache/nifi/repository/schema/Record.java | 30 + .../nifi/repository/schema/RecordField.java | 30 + .../nifi/repository/schema/RecordSchema.java| 188 +++ .../nifi/repository/schema/Repetition.java | 22 + .../repository/schema/SchemaRecordReader.java | 191 +++ .../repository/schema/SchemaRecordWriter.java | 139 + .../repository/schema/SimpleRecordField.java| 84 +++ .../repository/schema/UnionRecordField.java | 64 +++ .../schema/TestSchemaRecordReader.java | 281 ++ .../schema/TestSchemaRecordReaderWriter.java| 178 ++ .../nifi/stream/io/BufferedInputStream.java | 436 ++- .../java/org/apache/nifi/util/FormatUtils.java | 38 ++ .../util/timebuffer/CountSizeEntityAccess.java | 43 ++ .../nifi/util/timebuffer/TimedCountSize.java| 41 ++ .../org/wali/MinimalLockingWriteAheadLog.java | 118 ++-- .../src/main/java/org/wali/SerDe.java | 30 +- .../src/main/java/org/wali/SerDeFactory.java| 60 ++ .../java/org/wali/SingletonSerDeFactory.java| 46 ++ nifi-commons/pom.xml| 1 + .../repository/claim/ResourceClaim.java | 24 + .../nifi-framework/nifi-framework-core/pom.xml | 4 + .../nifi/controller/FileSystemSwapManager.java | 416 ++ .../nifi/controller/StandardFlowFileQueue.java | 6 + .../repository/RepositoryRecordSerde.java | 68 +++ .../RepositoryRecordSerdeFactory.java | 95 .../repository/SchemaRepositoryRecordSerde.java | 213 .../repository/StandardProcessSession.java | 70 +-- .../WriteAheadFlowFileRepository.java | 547 +-- .../WriteAheadRepositoryRecordSerde.java| 517 ++ .../repository/claim/StandardResourceClaim.java | 23 - .../claim/StandardResourceClaimManager.java | 7 +- .../repository/io/ByteCountingInputStream.java | 101 .../repository/io/ByteCountingOutputStream.java | 63 --- .../repository/schema/ContentClaimFieldMap.java | 92 .../repository/schema/ContentClaimSchema.java | 63 +++ .../schema/FlowFileRecordFieldMap.java | 99 .../repository/schema/FlowFileSchema.java | 67 +++ .../schema/RepositoryRecordFieldMap.java| 83 +++ .../schema/RepositoryRecordSchema.java | 93 .../schema/RepositoryRecordUpdate.java | 69 +++ .../schema/ResourceClaimFieldMap.java | 85 +++ .../controller/swap/SchemaSwapDeserializer.java | 77 +++ .../controller/swap/SchemaSwapSerializer.java | 101 .../controller/swap/SimpleSwapDeserializer.java | 303 ++ .../controller/swap/SimpleSwapSerializer.java | 133 + .../nifi/controller/swap/SwapDeserializer.java | 33 ++ .../apache/nifi/controller/swap/SwapSchema.java | 79 +++ .../nifi/controller/swap/SwapSerializer.java| 33 ++ .../controller/swap/SwapSummaryFieldMap.java| 106 .../controller/TestFileSystemSwapManager.java | 160 +- .../TestWriteAheadFlowFileRepository.java | 2 + .../nifi/controller/swap/MockFlowFile.java | 136 + .../TestSchemaSwapSerializerDeserializer.java | 195 +++ .../TestSimpleSwapSerializerDeserializer.java | 139 + .../pom.xml | 4 + .../nifi/provenance/AbstractRecordWriter.java | 173 ++ .../provenance/ByteArraySchemaRecordReader.java | 80 +++ .../provenance/ByteArraySchemaRecordWriter.java | 85 +++
[4/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java new file mode 100644 index 000..76c208d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import java.util.List; + +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; + +public class ContentClaimFieldMap implements Record { +private final ContentClaim contentClaim; +private final long contentClaimOffset; +private final ResourceClaimFieldMap resourceClaimFieldMap; +private final RecordSchema schema; + +public ContentClaimFieldMap(final ContentClaim contentClaim, final long contentClaimOffset, final RecordSchema schema) { +this.contentClaim = contentClaim; +this.contentClaimOffset = contentClaimOffset; +this.schema = schema; + +final List resourceClaimFields = schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields(); +final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimFields); +this.resourceClaimFieldMap = new ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema); +} + +@Override +public Object getFieldValue(final String fieldName) { +switch (fieldName) { +case ContentClaimSchema.RESOURCE_CLAIM: +return resourceClaimFieldMap; +case ContentClaimSchema.CONTENT_CLAIM_LENGTH: +return contentClaim.getLength(); +case ContentClaimSchema.CONTENT_CLAIM_OFFSET: +return contentClaimOffset; +case ContentClaimSchema.RESOURCE_CLAIM_OFFSET: +return contentClaim.getOffset(); +default: +return null; +} +} + +@Override +public RecordSchema getSchema() { +return schema; +} + +@Override +public String toString() { +return "ContentClaimFieldMap[" + contentClaim + "]"; +} + +public static ContentClaim getContentClaim(final Record claimRecord, final ResourceClaimManager resourceClaimManager) { +final Record resourceClaimRecord = (Record) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM); +final String container = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER); +final String section = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION); +final String identifier = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER); +final Boolean lossTolerant = (Boolean) resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT); + +final Long length = (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH); +final Long resourceOffset = (Long) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET); + +final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, lossTolerant, false); +final StandardContentClaim contentClaim = new
[3/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 97226b2..46bea31 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -22,26 +22,20 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.SwapContents; -import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.events.EventReporter; import org.junit.Test; import org.mockito.Mockito; @@ -56,7 +50,9 @@ public class TestFileSystemSwapManager { final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); -final SwapContents swapContents = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager()); +final FileSystemSwapManager swapManager = createSwapManager(); +final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue); + final List records = swapContents.getFlowFiles(); assertEquals(1, records.size()); @@ -67,53 +63,32 @@ public class TestFileSystemSwapManager { } } -@Test -public void testRoundTripSerializeDeserialize() throws IOException { -final List toSwap = new ArrayList<>(1); -final Mapattrs = new HashMap<>(); -for (int i = 0; i < 1; i++) { -attrs.put("i", String.valueOf(i)); -final FlowFileRecord ff = new TestFlowFile(attrs, i); -toSwap.add(ff); -} - -final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); - Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); -final String swapLocation = "target/testRoundTrip.swap"; -final File swapFile = new File(swapLocation); -Files.deleteIfExists(swapFile.toPath()); +private FileSystemSwapManager createSwapManager() { +final FileSystemSwapManager swapManager = new FileSystemSwapManager(); +final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager(); +final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class); +swapManager.initialize(new SwapManagerInitializationContext() { +@Override +public ResourceClaimManager getResourceClaimManager() { +return resourceClaimManager; +} -try (final FileOutputStream fos = new FileOutputStream(swapFile)) { -FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); -} +@Override +public FlowFileRepository getFlowFileRepository() { +return flowfileRepo; +} -final SwapContents swappedIn; -try (final FileInputStream fis = new FileInputStream(swapFile); -final DataInputStream dis = new DataInputStream(fis)) { -swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class)); -} +@Override +public EventReporter
[2/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java -- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index 35832c4..a95bd4f 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -17,239 +17,142 @@ package org.apache.nifi.provenance; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.toc.TocWriter; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.DataOutputStream; -import org.apache.nifi.stream.io.GZIPOutputStream; -import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StandardRecordWriter implements RecordWriter { +/** + * @deprecated Deprecated in favor of SchemaRecordWriter + */ +@Deprecated +public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter { private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); +public static final int SERIALIZATION_VERISON = 9; +public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository"; private final File file; -private final FileOutputStream fos; -private final ByteCountingOutputStream rawOutStream; -private final TocWriter tocWriter; -private final boolean compressed; -private final int uncompressedBlockSize; -private final AtomicBoolean dirtyFlag = new AtomicBoolean(false); - -private DataOutputStream out; -private ByteCountingOutputStream byteCountingOut; -private long lastBlockOffset = 0L; -private int recordCount = 0; -private volatile boolean closed = false; - -private final Lock lock = new ReentrantLock(); public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException { +super(file, writer, compressed, uncompressedBlockSize); logger.trace("Creating Record Writer for {}", file.getName()); this.file = file; -this.compressed = compressed; -this.fos = new FileOutputStream(file); -rawOutStream = new ByteCountingOutputStream(fos); -this.uncompressedBlockSize = uncompressedBlockSize; +} -this.tocWriter = writer; +public StandardRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { +super(out, tocWriter, compressed, uncompressedBlockSize); +this.file = null; } @Override -public synchronized File getFile() { -return file; +protected String getSerializationName() { +return SERIALIZATION_NAME; } @Override -public synchronized void writeHeader(final long firstEventId) throws IOException { -if (isDirty()) { -throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository"); -} - -try { -lastBlockOffset = rawOutStream.getBytesWritten(); -resetWriteStream(firstEventId); - -out.writeUTF(PersistentProvenanceRepository.class.getName()); -out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); -out.flush(); -} catch (final IOException ioe) { -markDirty(); -throw ioe; -} +protected int getSerializationVersion() { +return SERIALIZATION_VERISON; } - -/** - * Resets the streams to prepare for a new block - * @param eventId the first id that will be written to the new block - * @throws IOException if unable to flush/close the current streams
[5/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java new file mode 100644 index 000..916fd76 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap; +import org.apache.nifi.controller.repository.schema.ContentClaimSchema; +import org.apache.nifi.controller.repository.schema.FlowFileSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SchemaRecordReader; +import org.apache.nifi.repository.schema.SchemaRecordWriter; +import org.apache.nifi.repository.schema.SimpleRecordField; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe { +private static final int MAX_ENCODING_VERSION = 1; + +private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1; +private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; + +private final ResourceClaimManager resourceClaimManager; +private volatile RecordSchema recoverySchema; + +public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) { +this.resourceClaimManager = resourceClaimManager; +} + +@Override +public void writeHeader(final DataOutputStream out) throws IOException { +writeSchema.writeTo(out); +} + +@Override +public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException { +serializeRecord(newRecordState, out); +} + +@Override +public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { +final RecordSchema schema; +switch (record.getType()) { +case CREATE: +case UPDATE: +schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1; +break; +case CONTENTMISSING: +case DELETE: +schema = RepositoryRecordSchema.DELETE_SCHEMA_V1; +break; +case SWAP_IN: +schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1; +break; +case SWAP_OUT: +schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1; +break; +default: +throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen. +} + +final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
nifi git commit: NIFI-3052 Update screenshots in Admin Guide and Getting Starting guide for additional colors added to UI This closes #1239
Repository: nifi Updated Branches: refs/heads/master 8568d40cd -> 5a25884f5 NIFI-3052 Update screenshots in Admin Guide and Getting Starting guide for additional colors added to UI This closes #1239 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a25884f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a25884f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a25884f Branch: refs/heads/master Commit: 5a25884f52e92b3c73f425a1dd6c0993866b1b8f Parents: 8568d40 Author: Andrew LimAuthored: Thu Nov 17 12:20:21 2016 -0500 Committer: Scott Aslan Committed: Fri Nov 18 14:05:17 2016 -0500 -- .../src/main/asciidoc/images/global-menu.png| Bin 33163 -> 34564 bytes .../src/main/asciidoc/images/iconAlert.png | Bin 624 -> 597 bytes nifi-docs/src/main/asciidoc/images/iconRun.png | Bin 429 -> 456 bytes nifi-docs/src/main/asciidoc/images/iconStop.png | Bin 248 -> 245 bytes .../asciidoc/images/lineage-graph-annotated.png | Bin 69156 -> 65991 bytes nifi-docs/src/main/asciidoc/images/ncm.png | Bin 244418 -> 134532 bytes 6 files changed, 0 insertions(+), 0 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/global-menu.png -- diff --git a/nifi-docs/src/main/asciidoc/images/global-menu.png b/nifi-docs/src/main/asciidoc/images/global-menu.png index 25ad52a..c32f1a7 100644 Binary files a/nifi-docs/src/main/asciidoc/images/global-menu.png and b/nifi-docs/src/main/asciidoc/images/global-menu.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconAlert.png -- diff --git a/nifi-docs/src/main/asciidoc/images/iconAlert.png b/nifi-docs/src/main/asciidoc/images/iconAlert.png index 18917d4..7006e5a 100644 Binary files a/nifi-docs/src/main/asciidoc/images/iconAlert.png and b/nifi-docs/src/main/asciidoc/images/iconAlert.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconRun.png -- diff --git a/nifi-docs/src/main/asciidoc/images/iconRun.png b/nifi-docs/src/main/asciidoc/images/iconRun.png old mode 100755 new mode 100644 index b6fd45b..f912a6c Binary files a/nifi-docs/src/main/asciidoc/images/iconRun.png and b/nifi-docs/src/main/asciidoc/images/iconRun.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconStop.png -- diff --git a/nifi-docs/src/main/asciidoc/images/iconStop.png b/nifi-docs/src/main/asciidoc/images/iconStop.png index 3192a10..2b89422 100644 Binary files a/nifi-docs/src/main/asciidoc/images/iconStop.png and b/nifi-docs/src/main/asciidoc/images/iconStop.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png -- diff --git a/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png b/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png index d500c70..f89462d 100644 Binary files a/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png and b/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/ncm.png -- diff --git a/nifi-docs/src/main/asciidoc/images/ncm.png b/nifi-docs/src/main/asciidoc/images/ncm.png index e02fbb0..c16a4fa 100644 Binary files a/nifi-docs/src/main/asciidoc/images/ncm.png and b/nifi-docs/src/main/asciidoc/images/ncm.png differ
[jira] [Commented] (MINIFI-61) Refactor dependencies to make use of separated nifi-api
[ https://issues.apache.org/jira/browse/MINIFI-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15677021#comment-15677021 ] Aldrin Piri commented on MINIFI-61: --- Would also benefit if there was a way to get more granular access to certain NiFi components but would need some restructuring against those items. > Refactor dependencies to make use of separated nifi-api > --- > > Key: MINIFI-61 > URL: https://issues.apache.org/jira/browse/MINIFI-61 > Project: Apache NiFi MiNiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Aldrin Piri >Assignee: Aldrin Piri > > NIFI-1896 provided a separation of core extension API (nifi-api) and items > that were more framework specific to NiFi (nifi-framework-api). Accordingly, > the adjustment of nifi-api should let us provide a common interface to use > extensions interchangeably with NiFi while providing internals and framework > specific to minifi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (MINIFI-29) Create FlowController implementation
[ https://issues.apache.org/jira/browse/MINIFI-29?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aldrin Piri updated MINIFI-29: -- Assignee: (was: Aldrin Piri) > Create FlowController implementation > - > > Key: MINIFI-29 > URL: https://issues.apache.org/jira/browse/MINIFI-29 > Project: Apache NiFi MiNiFi > Issue Type: Task > Components: Core Framework >Reporter: Aldrin Piri > > While much can be reused and many of the fundamentals are the same, a > component of similar functionality to the FlowController in NiFi is also > needed to handle the specifics of how processing works within MiNiFi -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MINIFI-118) MiNiFi does not compile under Centos 6 standard tooling
[ https://issues.apache.org/jira/browse/MINIFI-118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676967#comment-15676967 ] Aldrin Piri commented on MINIFI-118: I understand this is an inconvenience but am also of the opinion that it is not an unfair requirement given the period of time since its release. To get a more informed decision of where the issues arise, do you have a listing of the problem areas? Off the top of my head, while supported now, I believe the latest (and current release, 0.5.3) of yaml-cpp that we are using is the last before C++11 is required. Overall, if it's a small gap, maybe we can adjust, but also don't want to preclude us from using the latest libraries nor get in a position where we have to manage this compatibility for things upon which we depend. I could also see where we have varying levels of compatibility wherein some of our core libraries/framework items are maybe using pre-C++11 type work and our higher level more outward facing components run off more current versions. Let me know your thoughts. > MiNiFi does not compile under Centos 6 standard tooling > --- > > Key: MINIFI-118 > URL: https://issues.apache.org/jira/browse/MINIFI-118 > Project: Apache NiFi MiNiFi > Issue Type: Bug >Affects Versions: 0.0.1 >Reporter: Andre > > Reliance on C++11 is a big issue as default (i.e. no SCL) tooling on RHEL6 > does not support C++11 fully. > Using Software collections to deploy later version of the toolchain produce > valid binary -- This message was sent by Atlassian JIRA (v6.3.4#6332)