http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java new file mode 100644 index 0000000..500015f --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV2.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +public class FlowFileUnpackagerV2 implements FlowFileUnpackager { + + private final byte readBuffer[] = new byte[8192]; + private Map<String, String> nextAttributes = null; + private boolean haveReadSomething = false; + + @Override + public boolean hasMoreData() throws IOException { + return nextAttributes != null || !haveReadSomething; + } + + protected Map<String, String> readAttributes(final InputStream in) throws IOException { + final Map<String, String> attributes = new HashMap<>(); + final Integer numAttributes = readFieldLength(in); //read number of attributes + if (numAttributes == null) { + return null; + } + if (numAttributes == 0) { + throw new IOException("flow files cannot have zero attributes"); + } + for (int i = 0; i < numAttributes; i++) { //read each attribute key/value pair + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + return attributes; + } + + @Override + public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { + final Map<String, String> attributes; + if (nextAttributes != null) { + attributes = nextAttributes; + } else { + attributes = readAttributes(in); + } + + final long expectedNumBytes = readLong(in); // read length of payload + copy(in, out, expectedNumBytes); // read payload + + nextAttributes = readAttributes(in); + haveReadSomething = true; + + return attributes; + } + + protected String readString(final InputStream in) throws IOException { + final Integer numBytes = readFieldLength(in); + if (numBytes == null) { + throw new EOFException(); + } + final byte[] bytes = new byte[numBytes]; + fillBuffer(in, bytes, numBytes); + return new String(bytes, "UTF-8"); + } + + private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { + int bytesRead; + int totalBytesRead = 0; + while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { + totalBytesRead += bytesRead; + } + if (totalBytesRead != length) { + throw new EOFException(); + } + } + + protected long copy(final InputStream in, final OutputStream out, final long numBytes) throws IOException { + int bytesRead; + long totalBytesRead = 0L; + while ((bytesRead = in.read(readBuffer, 0, (int) Math.min(readBuffer.length, numBytes - totalBytesRead))) > 0) { + out.write(readBuffer, 0, bytesRead); + totalBytesRead += bytesRead; + } + + if (totalBytesRead < numBytes) { + throw new EOFException("Expected " + numBytes + " but received " + totalBytesRead); + } + + return totalBytesRead; + } + + protected long readLong(final InputStream in) throws IOException { + fillBuffer(in, readBuffer, 8); + return (((long) readBuffer[0] << 56) + + ((long) (readBuffer[1] & 255) << 48) + + ((long) (readBuffer[2] & 255) << 40) + + ((long) (readBuffer[3] & 255) << 32) + + ((long) (readBuffer[4] & 255) << 24) + + ((readBuffer[5] & 255) << 16) + + ((readBuffer[6] & 255) << 8) + + ((readBuffer[7] & 255))); + } + + private Integer readFieldLength(final InputStream in) throws IOException { + final int firstValue = in.read(); + final int secondValue = in.read(); + if (firstValue < 0) { + return null; + } + if (secondValue < 0) { + throw new EOFException(); + } + if (firstValue == 0xff && secondValue == 0xff) { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) { + throw new EOFException(); + } + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + } else { + return ((firstValue << 8) + (secondValue)); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV3.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV3.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV3.java new file mode 100644 index 0000000..f937585 --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/FlowFileUnpackagerV3.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class FlowFileUnpackagerV3 implements FlowFileUnpackager { + + private byte[] nextHeader = null; + private boolean haveReadSomething = false; + private final byte readBuffer[] = new byte[8192]; + + @Override + public boolean hasMoreData() throws IOException { + return nextHeader != null || !haveReadSomething; + } + + private byte[] readHeader(final InputStream in) throws IOException { + final byte[] header = new byte[FlowFilePackagerV3.MAGIC_HEADER.length]; + for (int i = 0; i < header.length; i++) { + final int next = in.read(); + if (next < 0) { + if (i == 0) { + return null; + } + + throw new IOException("Not in FlowFile-v3 format"); + } + header[i] = (byte) (next & 0xFF); + } + + return header; + } + + @Override + public Map<String, String> unpackageFlowFile(final InputStream in, final OutputStream out) throws IOException { + final byte[] header = (nextHeader == null) ? readHeader(in) : nextHeader; + if (!Arrays.equals(header, FlowFilePackagerV3.MAGIC_HEADER)) { + throw new IOException("Not in FlowFile-v3 format"); + } + + final Map<String, String> attributes = readAttributes(in); + final long expectedNumBytes = readLong(in); // read length of payload + copy(in, out, expectedNumBytes); // read payload + + nextHeader = readHeader(in); + haveReadSomething = true; + + return attributes; + } + + protected Map<String, String> readAttributes(final InputStream in) throws IOException { + final Map<String, String> attributes = new HashMap<>(); + final Integer numAttributes = readFieldLength(in); //read number of attributes + if (numAttributes == null) { + return null; + } + if (numAttributes == 0) { + throw new IOException("flow files cannot have zero attributes"); + } + for (int i = 0; i < numAttributes; i++) { //read each attribute key/value pair + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + return attributes; + } + + protected String readString(final InputStream in) throws IOException { + final Integer numBytes = readFieldLength(in); + if (numBytes == null) { + throw new EOFException(); + } + final byte[] bytes = new byte[numBytes]; + fillBuffer(in, bytes, numBytes); + return new String(bytes, "UTF-8"); + } + + private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { + int bytesRead; + int totalBytesRead = 0; + while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { + totalBytesRead += bytesRead; + } + if (totalBytesRead != length) { + throw new EOFException(); + } + } + + protected long copy(final InputStream in, final OutputStream out, final long numBytes) throws IOException { + int bytesRead; + long totalBytesRead = 0L; + while ((bytesRead = in.read(readBuffer, 0, (int) Math.min(readBuffer.length, numBytes - totalBytesRead))) > 0) { + out.write(readBuffer, 0, bytesRead); + totalBytesRead += bytesRead; + } + + if (totalBytesRead < numBytes) { + throw new EOFException("Expected " + numBytes + " but received " + totalBytesRead); + } + + return totalBytesRead; + } + + protected long readLong(final InputStream in) throws IOException { + fillBuffer(in, readBuffer, 8); + return (((long) readBuffer[0] << 56) + + ((long) (readBuffer[1] & 255) << 48) + + ((long) (readBuffer[2] & 255) << 40) + + ((long) (readBuffer[3] & 255) << 32) + + ((long) (readBuffer[4] & 255) << 24) + + ((readBuffer[5] & 255) << 16) + + ((readBuffer[6] & 255) << 8) + + ((readBuffer[7] & 255))); + } + + private Integer readFieldLength(final InputStream in) throws IOException { + final int firstValue = in.read(); + final int secondValue = in.read(); + if (firstValue < 0) { + return null; + } + if (secondValue < 0) { + throw new EOFException(); + } + if (firstValue == 0xff && secondValue == 0xff) { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) { + throw new EOFException(); + } + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + } else { + return ((firstValue << 8) + (secondValue)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/main/java/org/apache/nifi/util/Unpackage.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/main/java/org/apache/nifi/util/Unpackage.java b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/Unpackage.java new file mode 100644 index 0000000..19f702c --- /dev/null +++ b/commons/flowfile-packager/src/main/java/org/apache/nifi/util/Unpackage.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +public class Unpackage { + + private static void printUsage() { + System.out.println("java " + Unpackage.class.getCanonicalName() + " <version> <input file 1> [<input file 2> <input file 3> ... <input file N>]"); + System.out.println("<version> : The version of the FlowFile Package format. Valid values are 1, 2, 3"); + System.out.println("<input file X> : The FlowFile package to unpack"); + System.out.println(); + } + + public static void main(final String[] args) throws IOException { + if (args.length < 2) { + printUsage(); + return; + } + + final String version = args[0]; + + int inputFileCount = 0; + int outputFileCount = 0; + + for (int i = 1; i < args.length; i++) { + final String filename = args[i]; + final File inFile = new File(filename); + + if (inFile.isDirectory()) { + System.out.println("WARNING: input file " + inFile + " is a directory; skipping"); + continue; + } + + if (!inFile.exists() || !inFile.canRead()) { + System.out.println("ERROR: unable to read file " + inFile); + continue; + } + + final File outputDir = new File(inFile.getAbsolutePath() + ".unpacked"); + if (!outputDir.exists() && !outputDir.mkdir()) { + System.out.println("ERROR: Unable to create directory " + outputDir); + continue; + } + + final File tempFile = new File(outputDir, ".temp." + UUID.randomUUID().toString() + ".unpackage"); + inputFileCount++; + try (final FileInputStream fis = new FileInputStream(inFile); + final BufferedInputStream bufferedIn = new BufferedInputStream(fis)) { + + final FlowFileUnpackager unpackager = createUnpackager(version); + while (unpackager.hasMoreData()) { + outputFileCount++; + final Map<String, String> attributes; + + try (final FileOutputStream fos = new FileOutputStream(tempFile); + final BufferedOutputStream bufferedOut = new BufferedOutputStream(fos)) { + attributes = unpackager.unpackageFlowFile(bufferedIn, bufferedOut); + } + + String outputFilename = attributes.get("filename"); + if (outputFilename == null) { + outputFilename = attributes.get("nf.file.name"); + } + + final File outputFile = new File(outputDir, outputFilename); + tempFile.renameTo(outputFile); + + final File attributeFilename = new File(outputDir, outputFilename + ".attributes"); + try (final FileOutputStream fos = new FileOutputStream(attributeFilename); + final BufferedOutputStream bufferedOut = new BufferedOutputStream(fos)) { + + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + bufferedOut.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes("UTF-8")); + } + } + } + } + } + + System.out.println("Unpacked " + inputFileCount + " packages into " + outputFileCount + " files"); + } + + public static FlowFileUnpackager createUnpackager(final String version) { + switch (version) { + case "1": + return new FlowFileUnpackagerV1(); + case "2": + return new FlowFileUnpackagerV2(); + case "3": + return new FlowFileUnpackagerV3(); + default: + System.out.println("ERROR: Invalid version: " + version + "; must be 1, 2, or 3"); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java ---------------------------------------------------------------------- diff --git a/commons/flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java b/commons/flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java new file mode 100644 index 0000000..24cd374 --- /dev/null +++ b/commons/flowfile-packager/src/test/java/org/apache/nifi/util/TestPackageUnpackageV3.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class TestPackageUnpackageV3 { + + @Test + public void test() throws IOException { + final FlowFilePackager packager = new FlowFilePackagerV3(); + final FlowFileUnpackager unpackager = new FlowFileUnpackagerV3(); + + final byte[] data = "Hello, World!".getBytes("UTF-8"); + final Map<String, String> map = new HashMap<>(); + map.put("abc", "cba"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayInputStream in = new ByteArrayInputStream(data); + packager.packageFlowFile(in, baos, map, data.length); + + final byte[] encoded = baos.toByteArray(); + final ByteArrayInputStream encodedIn = new ByteArrayInputStream(encoded); + final ByteArrayOutputStream decodedOut = new ByteArrayOutputStream(); + final Map<String, String> unpackagedAttributes = unpackager.unpackageFlowFile(encodedIn, decodedOut); + final byte[] decoded = decodedOut.toByteArray(); + + assertEquals(map, unpackagedAttributes); + assertTrue(Arrays.equals(data, decoded)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/naive-search-ring-buffer/pom.xml ---------------------------------------------------------------------- diff --git a/commons/naive-search-ring-buffer/pom.xml b/commons/naive-search-ring-buffer/pom.xml new file mode 100644 index 0000000..e84be0f --- /dev/null +++ b/commons/naive-search-ring-buffer/pom.xml @@ -0,0 +1,30 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>naive-search-ring-buffer</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>NiFi Ring Buffer</name> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/naive-search-ring-buffer/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/naive-search-ring-buffer/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/commons/naive-search-ring-buffer/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java new file mode 100644 index 0000000..85bfd96 --- /dev/null +++ b/commons/naive-search-ring-buffer/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.Arrays; + +/** + * <p> + * A RingBuffer that can be used to scan byte sequences for subsequences. + * </p> + * + * <p> + * This class implements an efficient naive search algorithm, which allows the + * user of the library to identify byte sequences in a stream on-the-fly so that + * the stream can be segmented without having to buffer the data. + * </p> + * + * <p> + * The intended usage paradigm is: + * <code> + * <pre> + * final byte[] searchSequence = ...; + * final CircularBuffer buffer = new CircularBuffer(searchSequence); + * while ((int nextByte = in.read()) > 0) { + * if ( buffer.addAndCompare(nextByte) ) { + * // This byte is the last byte in the given sequence + * } else { + * // This byte does not complete the given sequence + * } + * } + * </pre> + * </code> + * </p> + */ +public class NaiveSearchRingBuffer { + + private final byte[] lookingFor; + private final int[] buffer; + private int insertionPointer = 0; + private int bufferSize = 0; + + public NaiveSearchRingBuffer(final byte[] lookingFor) { + this.lookingFor = lookingFor; + this.buffer = new int[lookingFor.length]; + Arrays.fill(buffer, -1); + } + + /** + * Returns the contents of the internal buffer, which represents the last X + * bytes added to the buffer, where X is the minimum of the number of bytes + * added to the buffer or the length of the byte sequence for which we are + * looking + * + * @return + */ + public byte[] getBufferContents() { + final int contentLength = Math.min(lookingFor.length, bufferSize); + final byte[] contents = new byte[contentLength]; + for (int i = 0; i < contentLength; i++) { + final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length]; + contents[i] = nextByte; + } + return contents; + } + + /** + * Returns the oldest byte in the buffer + * + * @return + */ + public int getOldestByte() { + return buffer[insertionPointer]; + } + + /** + * Returns <code>true</code> if the number of bytes that have been added to + * the buffer is at least equal to the length of the byte sequence for which + * we are searching + * + * @return + */ + public boolean isFilled() { + return bufferSize >= buffer.length; + } + + /** + * Clears the internal buffer so that a new search may begin + */ + public void clear() { + Arrays.fill(buffer, -1); + insertionPointer = 0; + bufferSize = 0; + } + + /** + * Add the given byte to the buffer and notify whether or not the byte + * completes the desired byte sequence. + * + * @param data + * @return <code>true</code> if this byte completes the byte sequence, + * <code>false</code> otherwise. + */ + public boolean addAndCompare(final byte data) { + buffer[insertionPointer] = data; + insertionPointer = (insertionPointer + 1) % lookingFor.length; + + bufferSize++; + if (bufferSize < lookingFor.length) { + return false; + } + + for (int i = 0; i < lookingFor.length; i++) { + final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length]; + if (compare != lookingFor[i]) { + return false; + } + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/naive-search-ring-buffer/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/naive-search-ring-buffer/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java b/commons/naive-search-ring-buffer/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java new file mode 100644 index 0000000..0838e96 --- /dev/null +++ b/commons/naive-search-ring-buffer/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class TestNaiveSearchRingBuffer { + + @Test + public void testAddAndCompare() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 10) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + + @Test + public void testGetOldestByte() { + final byte[] pattern = new byte[]{ + '\r', '0', 38, 48 + }; + + final byte[] search = new byte[]{ + '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, '\r', '0', 38, 48, 83, 92, 78, 4, 38 + }; + + final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern); + int counter = -1; + for (final byte b : search) { + counter++; + final boolean matched = circ.addAndCompare(b); + if (counter == 13) { + assertTrue(matched); + } else { + assertFalse(matched); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/pom.xml b/commons/nifi-expression-language/pom.xml new file mode 100644 index 0000000..dfb1ea5 --- /dev/null +++ b/commons/nifi-expression-language/pom.xml @@ -0,0 +1,59 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-expression-language</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <packaging>jar</packaging> + <name>NiFi Expression Language</name> + + <build> + <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr3-maven-plugin</artifactId> + <version>3.5.2</version> + <executions> + <execution> + <goals> + <goal>antlr</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + <version>3.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g new file mode 100644 index 0000000..8cb6847 --- /dev/null +++ b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +lexer grammar AttributeExpressionLexer; + +@header { + package org.apache.nifi.attribute.expression.language.antlr; + import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +} + +@rulecatch { + catch(final Exception e) { + throw new AttributeExpressionLanguageParsingException(e); + } +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } + + public void recover(RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } +} + + +// PUNCTUATION & SPECIAL CHARACTERS +WHITESPACE : (' '|'\t'|'\n'|'\r')+ { $channel = HIDDEN; }; +COMMENT : '#' ( ~('\n') )* '\n' { $channel = HIDDEN; }; + +DOLLAR : '$'; +LPAREN : '('; +RPAREN : ')'; +LBRACE : '{'; +RBRACE : '}'; +COLON : ':'; +COMMA : ','; +DOT : '.'; +SEMICOLON : ';'; +NUMBER : ('0'..'9')+; + +TRUE : 'true'; +FALSE : 'false'; + +// +// FUNCTION NAMES +// + +// ATTRIBUTE KEY SELECTION FUNCTIONS +ANY_ATTRIBUTE : 'anyAttribute'; +ANY_MATCHING_ATTRIBUTE : 'anyMatchingAttribute'; +ALL_ATTRIBUTES : 'allAttributes'; +ALL_MATCHING_ATTRIBUTES : 'allMatchingAttributes'; +ANY_DELINEATED_VALUE : 'anyDelineatedValue'; +ALL_DELINEATED_VALUES : 'allDelineatedValues'; + +// NO-SUBJECT FUNCTIONS +NEXT_INT : 'nextInt'; +IP : 'ip'; +UUID : 'UUID'; +HOSTNAME : 'hostname'; // requires boolean arg: prefer FQDN +NOW : 'now'; + + +// 0 arg functions +TO_UPPER : 'toUpper'; +TO_LOWER : 'toLower'; +TO_STRING : 'toString'; +LENGTH : 'length'; +TRIM : 'trim'; +IS_NULL : 'isNull'; +NOT_NULL : 'notNull'; +TO_NUMBER : 'toNumber'; +URL_ENCODE : 'urlEncode'; +URL_DECODE : 'urlDecode'; +NOT : 'not'; + +// 1 arg functions +SUBSTRING_AFTER : 'substringAfter'; +SUBSTRING_BEFORE : 'substringBefore'; +SUBSTRING_AFTER_LAST : 'substringAfterLast'; +SUBSTRING_BEFORE_LAST : 'substringBeforeLast'; +STARTS_WITH : 'startsWith'; +ENDS_WITH : 'endsWith'; +CONTAINS : 'contains'; +PREPEND : 'prepend'; +APPEND : 'append'; +INDEX_OF : 'indexOf'; +LAST_INDEX_OF : 'lastIndexOf'; +REPLACE_NULL : 'replaceNull'; +FIND : 'find'; // regex +MATCHES : 'matches'; // regex +EQUALS : 'equals'; +EQUALS_IGNORE_CASE : 'equalsIgnoreCase'; +GREATER_THAN : 'gt'; +LESS_THAN : 'lt'; +GREATER_THAN_OR_EQUAL : 'ge'; +LESS_THAN_OR_EQUAL : 'le'; +FORMAT : 'format'; // takes string date format; uses SimpleDateFormat +TO_DATE : 'toDate'; // takes string date format; converts the subject to a Long based on the date format +MOD : 'mod'; +PLUS : 'plus'; +MINUS : 'minus'; +MULTIPLY : 'multiply'; +DIVIDE : 'divide'; +TO_RADIX : 'toRadix'; +OR : 'or'; +AND : 'and'; + + +// 2 arg functions +SUBSTRING : 'substring'; +REPLACE : 'replace'; +REPLACE_ALL : 'replaceAll'; + + +// STRINGS +STRING_LITERAL +@init{StringBuilder lBuf = new StringBuilder();} + : + ( + '"' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '"' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '"' + ) + { + setText(lBuf.toString()); + } + | + ( + '\'' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '\'' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '\'' + ) + { + setText(lBuf.toString()); + } + ; + + +fragment +ESC + : '\\' + ( + '"' { setText("\""); } + | '\'' { setText("\'"); } + | 'r' { setText("\r"); } + | 'n' { setText("\n"); } + | 't' { setText("\t"); } + | '\\' { setText("\\\\"); } + | nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | '\\') + { + StringBuilder lBuf = new StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); setText(lBuf.toString()); + } + ) + ; + +ATTRIBUTE_NAME : ( + ~('$' | '{' | '}' | '(' | ')' | '[' | ']' | ',' | ':' | ';' | '/' | '*' | '\'' | ' ' | '\t' | '\r' | '\n' | '0'..'9') + ~('$' | '{' | '}' | '(' | ')' | '[' | ']' | ',' | ':' | ';' | '/' | '*' | '\'' | ' ' | '\t' | '\r' | '\n')* + ); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g new file mode 100644 index 0000000..cf10fc0 --- /dev/null +++ b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +parser grammar AttributeExpressionParser; + +options { + output=AST; + tokenVocab=AttributeExpressionLexer; +} + +tokens { + QUERY; + ATTRIBUTE_REFERENCE; + ATTR_NAME; + FUNCTION_CALL; + EXPRESSION; + MULTI_ATTRIBUTE_REFERENCE; + QUOTED_ATTR_NAME; +} + +@header { + package org.apache.nifi.attribute.expression.language.antlr; + import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } + + public void recover(final RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } +} + +// functions that return Strings +zeroArgString : (TO_UPPER | TO_LOWER | TRIM | TO_STRING | URL_ENCODE | URL_DECODE) LPAREN! RPAREN!; +oneArgString : ((SUBSTRING_BEFORE | SUBSTRING_BEFORE_LAST | SUBSTRING_AFTER | SUBSTRING_AFTER_LAST | REPLACE_NULL | + PREPEND | APPEND | FORMAT | STARTS_WITH | ENDS_WITH | CONTAINS) LPAREN! anyArg RPAREN!) | + (TO_RADIX LPAREN! anyArg (COMMA! anyArg)? RPAREN!); +twoArgString : ((REPLACE | REPLACE_ALL) LPAREN! anyArg COMMA! anyArg RPAREN!) | + (SUBSTRING LPAREN! anyArg (COMMA! anyArg)? RPAREN!); + + +// functions that return Booleans +zeroArgBool : (IS_NULL | NOT_NULL | NOT) LPAREN! RPAREN!; +oneArgBool : ((FIND | MATCHES | EQUALS_IGNORE_CASE) LPAREN! anyArg RPAREN!) | + (GREATER_THAN | LESS_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN_OR_EQUAL) LPAREN! anyArg RPAREN! | + (EQUALS) LPAREN! anyArg RPAREN! | + (AND | OR) LPAREN! anyArg RPAREN!; + + +// functions that return Numbers +zeroArgNum : (LENGTH | TO_NUMBER) LPAREN! RPAREN!; +oneArgNum : ((INDEX_OF | LAST_INDEX_OF) LPAREN! anyArg RPAREN!) | + (TO_DATE LPAREN! anyArg? RPAREN!) | + ((MOD | PLUS | MINUS | MULTIPLY | DIVIDE) LPAREN! anyArg RPAREN!); + +stringFunctionRef : zeroArgString | oneArgString | twoArgString; +booleanFunctionRef : zeroArgBool | oneArgBool; +numberFunctionRef : zeroArgNum | oneArgNum; + +anyArg : NUMBER | numberFunctionRef | STRING_LITERAL | zeroArgString | oneArgString | twoArgString | booleanLiteral | zeroArgBool | oneArgBool | expression; +stringArg : STRING_LITERAL | zeroArgString | oneArgString | twoArgString | expression; +functionRef : stringFunctionRef | booleanFunctionRef | numberFunctionRef; + + + +// Attribute Reference +subject : attrName | expression; +attrName : singleAttrName | multiAttrName; + +singleAttrRef : ATTRIBUTE_NAME | STRING_LITERAL; +singleAttrName : singleAttrRef -> + ^(ATTR_NAME singleAttrRef); + + +multiAttrFunction : ANY_ATTRIBUTE | ANY_MATCHING_ATTRIBUTE | ALL_ATTRIBUTES | ALL_MATCHING_ATTRIBUTES | ANY_DELINEATED_VALUE | ALL_DELINEATED_VALUES; +multiAttrName : multiAttrFunction LPAREN stringArg (COMMA stringArg)* RPAREN -> + ^(MULTI_ATTRIBUTE_REFERENCE multiAttrFunction stringArg*); + +attributeRef : subject -> + ^(ATTRIBUTE_REFERENCE subject); + + +functionCall : functionRef -> + ^(FUNCTION_CALL functionRef); + +booleanLiteral : TRUE | FALSE; +zeroArgStandaloneFunction : (IP | UUID | NOW | NEXT_INT | HOSTNAME) LPAREN! RPAREN!; +oneArgStandaloneFunction : HOSTNAME^ LPAREN! booleanLiteral RPAREN!; +standaloneFunction : zeroArgStandaloneFunction | oneArgStandaloneFunction; + +attributeRefOrFunctionCall : (attributeRef | standaloneFunction); + +expression : DOLLAR LBRACE attributeRefOrFunctionCall (COLON functionCall)* RBRACE -> + ^(EXPRESSION attributeRefOrFunctionCall functionCall*); + +query : expression EOF -> + ^(QUERY expression); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens new file mode 100755 index 0000000..1b973c8 --- /dev/null +++ b/commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens @@ -0,0 +1,72 @@ +ALL_ATTRIBUTES=4 +ALL_DELINEATED_VALUES=5 +ALL_MATCHING_ATTRIBUTES=6 +AND=7 +ANY_ATTRIBUTE=8 +ANY_DELINEATED_VALUE=9 +ANY_MATCHING_ATTRIBUTE=10 +APPEND=11 +ATTRIBUTE_NAME=12 +CEIL=13 +COLON=14 +COMMA=15 +CONTAINS=16 +DIVIDE=17 +DOLLAR=18 +DOT=19 +ENDS_WITH=20 +EQUALS=21 +EQUALS_IGNORE_CASE=22 +FALSE=23 +FIND=24 +FLOOR=25 +FORMAT=26 +GREATER_THAN=27 +GREATER_THAN_OR_EQUAL=28 +HOSTNAME=29 +INDEX_OF=30 +IP=31 +IS_NULL=32 +LAST_INDEX_OF=33 +LBRACE=34 +LENGTH=35 +LESS_THAN=36 +LESS_THAN_OR_EQUAL=37 +LPAREN=38 +MATCHES=39 +MINUS=40 +MOD=41 +MULTIPLY=42 +NEXT_INT=43 +NOT=44 +NOT_NULL=45 +NOW=46 +NUMBER=47 +OR=48 +PLUS=49 +PREPEND=50 +RBRACE=51 +REPLACE=52 +REPLACE_ALL=53 +REPLACE_NULL=54 +RPAREN=55 +SEMICOLON=56 +STARTS_WITH=57 +STRING_LITERAL=58 +SUBSTRING=59 +SUBSTRING_AFTER=60 +SUBSTRING_AFTER_LAST=61 +SUBSTRING_BEFORE=62 +SUBSTRING_BEFORE_LAST=63 +TO_DATE=64 +TO_LOWER=65 +TO_NUMBER=66 +TO_RADIX=67 +TO_STRING=68 +TO_UPPER=69 +TRIM=70 +TRUE=71 +URL_DECODE=72 +URL_ENCODE=73 +UUID=74 +WHITESPACE=75 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java new file mode 100644 index 0000000..81da47e --- /dev/null +++ b/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import java.util.Map; + +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +public class EmptyPreparedQuery implements PreparedQuery { + + private final String value; + + EmptyPreparedQuery(final String value) { + this.value = value; + } + + @Override + public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions() throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(final AttributeValueDecorator decorator) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(final FlowFile flowFile) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(Map<String, String> attributes) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException { + return value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java new file mode 100644 index 0000000..0d1b2c7 --- /dev/null +++ b/commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import java.util.Map; + +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +public interface PreparedQuery { + + String evaluateExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; + + String evaluateExpressions() throws ProcessException; + + String evaluateExpressions(AttributeValueDecorator decorator) throws ProcessException; + + String evaluateExpressions(FlowFile flowFile) throws ProcessException; + + String evaluateExpressions(Map<String, String> attributes) throws ProcessException; + + String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException; + +}