http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java new file mode 100644 index 0000000..39ca330 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +public class TestTimedBuffer { + + @Test + public void testAgesOff() throws InterruptedException { + final LongEntityAccess access = new LongEntityAccess(); + final TimedBuffer<TimestampedLong> buffer = new TimedBuffer<>(TimeUnit.SECONDS, 2, access); + + buffer.add(new TimestampedLong(1000000L)); + TimestampedLong aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(1000000L, aggregate.getValue().longValue()); + Thread.sleep(1000L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(1000000L, aggregate.getValue().longValue()); + Thread.sleep(1500L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertNull(aggregate); + } + + @Test + public void testAggregation() throws InterruptedException { + final LongEntityAccess access = new LongEntityAccess(); + final TimedBuffer<TimestampedLong> buffer = new TimedBuffer<>(TimeUnit.SECONDS, 2, access); + + buffer.add(new TimestampedLong(1000000L)); + buffer.add(new TimestampedLong(1000000L)); + buffer.add(new TimestampedLong(25000L)); + + TimestampedLong aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(2025000L, aggregate.getValue().longValue()); + Thread.sleep(1000L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertEquals(2025000L, aggregate.getValue().longValue()); + Thread.sleep(1500L); + aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 30000L); + assertNull(aggregate); + } + + private static class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + private static class LongEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue().longValue() + toAdd.getValue().longValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-web-utils/pom.xml b/commons/nifi-web-utils/pom.xml new file mode 100644 index 0000000..434e1a3 --- /dev/null +++ b/commons/nifi-web-utils/pom.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-web-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>NiFi Web Utils</name> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.10</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.18.2</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + <version>1.18.2</version> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <version>3.1.0</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java new file mode 100644 index 0000000..8c0b1f4 --- /dev/null +++ b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import java.net.URI; +import java.util.Map; +import javax.ws.rs.core.MediaType; + +/** + * + */ +public class ClientUtils { + + private final Client client; + + public ClientUtils(Client client) { + this.client = client; + } + + /** + * Gets the content at the specified URI. + * + * @param uri + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri) throws ClientHandlerException, UniformInterfaceException { + return get(uri, null); + } + + /** + * Gets the content at the specified URI using the given query parameters. + * + * @param uri + * @param queryParams + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + if (queryParams != null) { + for (final Map.Entry<String, String> queryEntry : queryParams.entrySet()) { + webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); + } + } + + return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } + + /** + * Performs a POST using the specified url and entity body. + * + * @param uri + * @param entity + * @return + */ + public ClientResponse post(URI uri, Object entity) throws ClientHandlerException, UniformInterfaceException { + // get the resource + WebResource.Builder resourceBuilder = client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON); + + // include the request entity + if (entity != null) { + resourceBuilder = resourceBuilder.entity(entity); + } + + // perform the request + return resourceBuilder.post(ClientResponse.class); + } + + /** + * Performs a POST using the specified url and form data. + * + * @param uri + * @param formData + * @return + */ + public ClientResponse post(URI uri, Map<String, String> formData) throws ClientHandlerException, UniformInterfaceException { + // convert the form data + MultivaluedMapImpl entity = new MultivaluedMapImpl(); + for (String key : formData.keySet()) { + entity.add(key, formData.get(key)); + } + + // get the resource + WebResource.Builder resourceBuilder = client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED); + + // add the form data if necessary + if (!entity.isEmpty()) { + resourceBuilder = resourceBuilder.entity(entity); + } + + // perform the request + return resourceBuilder.post(ClientResponse.class); + } + + /** + * Performs a HEAD request to the specified URI. + * + * @param uri + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse head(final URI uri) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + return webResource.head(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java ---------------------------------------------------------------------- diff --git a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java new file mode 100644 index 0000000..4e7f5b6 --- /dev/null +++ b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import javax.ws.rs.ext.ContextResolver; +import javax.ws.rs.ext.Provider; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +@Provider +public class ObjectMapperResolver implements ContextResolver<ObjectMapper> { + + private final ObjectMapper mapper; + + public ObjectMapperResolver() throws Exception { + mapper = new ObjectMapper(); + + final AnnotationIntrospector jaxbIntrospector = new JaxbAnnotationIntrospector(); + final SerializationConfig serializationConfig = mapper.getSerializationConfig(); + final DeserializationConfig deserializationConfig = mapper.getDeserializationConfig(); + + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + mapper.setDeserializationConfig(deserializationConfig.withAnnotationIntrospector(jaxbIntrospector)); + } + + @Override + public ObjectMapper getContext(Class<?> objectType) { + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java new file mode 100644 index 0000000..587b3d8 --- /dev/null +++ b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.security.cert.Certificate; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + +import org.apache.nifi.security.util.CertificateUtils; + +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.client.urlconnection.HTTPSProperties; + +/** + * Common utilities related to web development. + * + * @author unattributed + */ +public final class WebUtils { + + private static Logger logger = LoggerFactory.getLogger(WebUtils.class); + + final static ReadWriteLock lock = new ReentrantReadWriteLock(); + + private WebUtils() { + } + + /** + * Creates a client for non-secure requests. The client will be created + * using the given configuration. Additionally, the client will be + * automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * + * @return a Client instance + */ + public static Client createClient(final ClientConfig config) { + return createClientHelper(config, null); + } + + /** + * Creates a client for secure requests. The client will be created using + * the given configuration and security context. Additionally, the client + * will be automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * @param ctx security context + * + * @return a Client instance + */ + public static Client createClient(final ClientConfig config, final SSLContext ctx) { + return createClientHelper(config, ctx); + } + + /** + * A helper method for creating clients. The client will be created using + * the given configuration and security context. Additionally, the client + * will be automatically configured for JSON serialization/deserialization. + * + * @param config client configuration + * @param ctx security context, which may be null for non-secure client + * creation + * + * @return a Client instance + */ + private static Client createClientHelper(final ClientConfig config, final SSLContext ctx) { + + final ClientConfig finalConfig = (config == null) ? new DefaultClientConfig() : config; + + if (ctx != null && StringUtils.isBlank((String) finalConfig.getProperty(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES))) { + + // custom hostname verifier that checks subject alternative names against the hostname of the URI + final HostnameVerifier hostnameVerifier = new HostnameVerifier() { + @Override + public boolean verify(final String hostname, final SSLSession ssls) { + + try { + for (final Certificate peerCertificate : ssls.getPeerCertificates()) { + if (peerCertificate instanceof X509Certificate) { + final X509Certificate x509Cert = (X509Certificate) peerCertificate; + final List<String> subjectAltNames = CertificateUtils.getSubjectAlternativeNames(x509Cert); + if (subjectAltNames.contains(hostname.toLowerCase())) { + return true; + } + } + } + } catch (final SSLPeerUnverifiedException | CertificateParsingException ex) { + logger.warn("Hostname Verification encountered exception verifying hostname due to: " + ex, ex); + } + + return false; + } + }; + + finalConfig.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new HTTPSProperties(hostnameVerifier, ctx)); + } + + finalConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); + finalConfig.getClasses().add(ObjectMapperResolver.class); + + // web client for restful request + return Client.create(finalConfig); + + } + + /** + * Serializes the given object to hexadecimal. Serialization uses Java's + * native serialization mechanism, the ObjectOutputStream. + * + * @param obj an object + * @return the serialized object as hex + */ + public static String serializeObjectToHex(final Serializable obj) { + + final ByteArrayOutputStream serializedObj = new ByteArrayOutputStream(); + + // IOException can never be thrown because we are serializing to an in memory byte array + try { + final ObjectOutputStream oos = new ObjectOutputStream(serializedObj); + oos.writeObject(obj); + oos.close(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + + logger.debug(String.format("Serialized object '%s' size: %d", obj, serializedObj.size())); + + // hex encode the binary + return new String(Hex.encodeHex(serializedObj.toByteArray(), /* tolowercase */ true)); + } + + /** + * Deserializes a Java serialized, hex-encoded string into a Java object. + * This method is the inverse of the serializeObjectToHex method in this + * class. + * + * @param hexEncodedObject a string + * @return the object + * @throws ClassNotFoundException if the class could not be found + */ + public static Serializable deserializeHexToObject(final String hexEncodedObject) throws ClassNotFoundException { + + // decode the hex encoded object + byte[] serializedObj; + try { + serializedObj = Hex.decodeHex(hexEncodedObject.toCharArray()); + } catch (final DecoderException de) { + throw new IllegalArgumentException(de); + } + + // IOException can never be thrown because we are deserializing from an in memory byte array + try { + // deserialize bytes into object + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(serializedObj)); + return (Serializable) ois.readObject(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/pom.xml ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/pom.xml b/commons/processor-utilities/pom.xml new file mode 100644 index 0000000..0519b7f --- /dev/null +++ b/commons/processor-utilities/pom.xml @@ -0,0 +1,47 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-processor-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + <name>NiFi Processor Utils</name> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>[0.0.1-SNAPSHOT,1.0.0-SNAPSHOT)</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java new file mode 100644 index 0000000..1f77093 --- /dev/null +++ b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.FlowFileFilter; + +public class FlowFileFilters { + + /** + * Returns a new {@link FlowFileFilter} that will pull FlowFiles until the + * maximum file size has been reached, or the maximum FlowFile Count was + * been reached (this is important because FlowFiles may be 0 bytes!). If + * the first FlowFile exceeds the max size, the FlowFile will be selected + * and no other FlowFile will be. + * + * @param maxSize the maximum size of the group of FlowFiles + * @param unit the unit of the <code>maxSize</code> argument + * @param maxCount the maximum number of FlowFiles to pull + * @return + */ + public static FlowFileFilter newSizeBasedFilter(final double maxSize, final DataUnit unit, final int maxCount) { + final double maxBytes = DataUnit.B.convert(maxSize, unit); + + return new FlowFileFilter() { + int count = 0; + long size = 0L; + + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + if (count == 0) { + count++; + size += flowFile.getSize(); + + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + + if ((size + flowFile.getSize() > maxBytes) || (count + 1 > maxCount)) { + return FlowFileFilterResult.REJECT_AND_TERMINATE; + } + + count++; + size += flowFile.getSize(); + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } + + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java new file mode 100644 index 0000000..0d66df5 --- /dev/null +++ b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.security.util.CertificateUtils; +import org.apache.nifi.security.util.KeystoreType; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.security.util.SslContextFactory.ClientAuth; + +public class SSLProperties { + + public static final PropertyDescriptor TRUSTSTORE = new PropertyDescriptor.Builder() + .name("Truststore Filename") + .description("The fully-qualified filename of the Truststore") + .defaultValue(null) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor TRUSTSTORE_TYPE = new PropertyDescriptor.Builder() + .name("Truststore Type") + .description("The Type of the Truststore. Either JKS or PKCS12") + .allowableValues("JKS", "PKCS12") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(null) + .sensitive(false) + .build(); + + public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("Truststore Password") + .description("The password for the Truststore") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static final PropertyDescriptor KEYSTORE = new PropertyDescriptor.Builder() + .name("Keystore Filename") + .description("The fully-qualified filename of the Keystore") + .defaultValue(null) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor KEYSTORE_TYPE = new PropertyDescriptor.Builder() + .name("Keystore Type") + .description("The Type of the Keystore") + .allowableValues("JKS", "PKCS12") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder() + .name("Keystore Password") + .defaultValue(null) + .description("The password for the Keystore") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static Collection<ValidationResult> validateStore(final Map<PropertyDescriptor, String> properties) { + final Collection<ValidationResult> results = new ArrayList<>(); + results.addAll(validateStore(properties, KeystoreValidationGroup.KEYSTORE)); + results.addAll(validateStore(properties, KeystoreValidationGroup.TRUSTSTORE)); + return results; + } + + public static Collection<ValidationResult> validateStore(final Map<PropertyDescriptor, String> properties, final KeystoreValidationGroup keyStoreOrTrustStore) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final String filename; + final String password; + final String type; + + if (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) { + filename = properties.get(KEYSTORE); + password = properties.get(KEYSTORE_PASSWORD); + type = properties.get(KEYSTORE_TYPE); + } else { + filename = properties.get(TRUSTSTORE); + password = properties.get(TRUSTSTORE_PASSWORD); + type = properties.get(TRUSTSTORE_TYPE); + } + + final String keystoreDesc = (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) ? "Keystore" : "Truststore"; + + final int nulls = countNulls(filename, password, type); + if (nulls != 3 && nulls != 0) { + results.add(new ValidationResult.Builder().valid(false).explanation("Must set either 0 or 3 properties for " + keystoreDesc).subject(keystoreDesc + " Properties").build()); + } else if (nulls == 0) { + // all properties were filled in. + final File file = new File(filename); + if (!file.exists() || !file.canRead()) { + results.add(new ValidationResult.Builder().valid(false).subject(keystoreDesc + " Properties").explanation("Cannot access file " + file.getAbsolutePath()).build()); + } else { + try { + final boolean storeValid = CertificateUtils.isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), password.toCharArray()); + if (!storeValid) { + results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Invalid KeyStore Password or Type specified for file " + filename).build()); + } + } catch (MalformedURLException e) { + results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Malformed URL from file: " + e).build()); + } + } + } + + return results; + } + + private static int countNulls(Object... objects) { + int count = 0; + for (final Object x : objects) { + if (x == null) { + count++; + } + } + + return count; + } + + public static enum KeystoreValidationGroup { + + KEYSTORE, TRUSTSTORE + } + + public static List<PropertyDescriptor> getKeystoreDescriptors(final boolean required) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + for (final PropertyDescriptor descriptor : KEYSTORE_DESCRIPTORS) { + final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder().fromPropertyDescriptor(descriptor).required(required); + if (required && descriptor.getName().equals(KEYSTORE_TYPE.getName())) { + builder.defaultValue("JKS"); + } + descriptors.add(builder.build()); + } + + return descriptors; + } + + public static List<PropertyDescriptor> getTruststoreDescriptors(final boolean required) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + for (final PropertyDescriptor descriptor : TRUSTSTORE_DESCRIPTORS) { + final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder().fromPropertyDescriptor(descriptor).required(required); + if (required && descriptor.getName().equals(TRUSTSTORE_TYPE.getName())) { + builder.defaultValue("JKS"); + } + descriptors.add(builder.build()); + } + + return descriptors; + } + + public static SSLContext createSSLContext(final ProcessContext context, final ClientAuth clientAuth) + throws UnrecoverableKeyException, KeyManagementException, KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException { + final String keystoreFile = context.getProperty(KEYSTORE).getValue(); + if (keystoreFile == null) { + return SslContextFactory.createTrustSslContext( + context.getProperty(TRUSTSTORE).getValue(), + context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(), + context.getProperty(TRUSTSTORE_TYPE).getValue()); + } else { + final String truststoreFile = context.getProperty(TRUSTSTORE).getValue(); + if (truststoreFile == null) { + return SslContextFactory.createSslContext( + context.getProperty(KEYSTORE).getValue(), + context.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(), + context.getProperty(KEYSTORE_TYPE).getValue()); + } else { + return SslContextFactory.createSslContext( + context.getProperty(KEYSTORE).getValue(), + context.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(), + context.getProperty(KEYSTORE_TYPE).getValue(), + context.getProperty(TRUSTSTORE).getValue(), + context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(), + context.getProperty(TRUSTSTORE_TYPE).getValue(), + clientAuth); + } + } + } + + private static final Set<PropertyDescriptor> KEYSTORE_DESCRIPTORS = new HashSet<>(); + private static final Set<PropertyDescriptor> TRUSTSTORE_DESCRIPTORS = new HashSet<>(); + + static { + KEYSTORE_DESCRIPTORS.add(KEYSTORE); + KEYSTORE_DESCRIPTORS.add(KEYSTORE_TYPE); + KEYSTORE_DESCRIPTORS.add(KEYSTORE_PASSWORD); + + TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE); + TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE_TYPE); + TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE_PASSWORD); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java new file mode 100644 index 0000000..10748fe --- /dev/null +++ b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util; + +import java.io.File; +import java.net.URI; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.util.FormatUtils; + +public class StandardValidators { + + // + // + // STATICALLY DEFINED VALIDATORS + // + // + public static final Validator ATTRIBUTE_KEY_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(input); + + try { + FlowFile.KeyValidator.validateKey(input); + builder.valid(true); + } catch (final IllegalArgumentException e) { + builder.valid(false).explanation(e.getMessage()); + } + + return builder.build(); + } + }; + + public static final Validator ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject("Property Name").input(subject); + + try { + FlowFile.KeyValidator.validateKey(subject); + builder.valid(true); + } catch (final IllegalArgumentException e) { + builder.valid(false).explanation(e.getMessage()); + } + + return builder.build(); + } + }; + + public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final int intVal = Integer.parseInt(value); + + if (intVal <= 0) { + reason = "not a positive value"; + } + } catch (final NumberFormatException e) { + reason = "not a valid integer"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final long longVal = Long.parseLong(value); + + if (longVal <= 0) { + reason = "not a positive value"; + } + } catch (final NumberFormatException e) { + reason = "not a valid 64-bit integer"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true); + + public static final Validator NON_EMPTY_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build(); + } + }; + + public static final Validator BOOLEAN_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final boolean valid = "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value); + final String explanation = valid ? null : "Value must be 'true' or 'false'"; + return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build(); + } + }; + + public static final Validator INTEGER_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + Integer.parseInt(value); + } catch (final NumberFormatException e) { + reason = "not a valid integer"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + public static final Validator LONG_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + Long.parseLong(value); + } catch (final NumberFormatException e) { + reason = "not a valid Long"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final int intVal = Integer.parseInt(value); + + if (intVal < 0) { + reason = "value is negative"; + } + } catch (final NumberFormatException e) { + reason = "value is not a valid integer"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + public static final Validator CHARACTER_SET_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + if (!Charset.isSupported(value)) { + reason = "Character Set is not supported by this JVM."; + } + } catch (final UnsupportedCharsetException uce) { + reason = "Character Set is not supported by this JVM."; + } catch (final IllegalArgumentException iae) { + reason = "Character Set value cannot be null."; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + /** + * URL Validator that does not allow the Expression Language to be used + */ + public static final Validator URL_VALIDATOR = createURLValidator(); + + public static final Validator URI_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + new URI(input); + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI").valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI").valid(false).build(); + } + } + }; + + public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false); + + public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + context.newExpressionLanguageCompiler().compile(input); + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build(); + } + } + + }; + + public static final Validator TIME_PERIOD_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (input == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build(); + } + if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } else { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days").build(); + } + } + }; + + public static final Validator DATA_SIZE_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (input == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Data Size cannot be null").build(); + } + if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } else { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <Data Size> <Data Unit> where <Data Size> is a non-negative integer and <Data Unit> is a supported Data Unit, such as: B, KB, MB, GB, TB").build(); + } + } + }; + + public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true); + + // + // + // FACTORY METHODS FOR VALIDATORS + // + // + public static Validator createDirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean createDirectoryIfMissing) { + return new DirectoryExistsValidator(allowExpressionLanguage, createDirectoryIfMissing); + } + + private static Validator createURLValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + final String evaluatedInput = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + new URL(evaluatedInput); + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URL").valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URL").valid(false).build(); + } + } + }; + } + + public static Validator createTimePeriodValidator(final long minTime, final TimeUnit minTimeUnit, final long maxTime, final TimeUnit maxTimeUnit) { + return new TimePeriodValidator(minTime, minTimeUnit, maxTime, maxTimeUnit); + } + + public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType) { + return createAttributeExpressionLanguageValidator(expectedResultType, true); + } + + public static Validator createRegexMatchingValidator(final Pattern pattern) { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final boolean matches = pattern.matcher(input).matches(); + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(matches) + .explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern()) + .build(); + } + }; + } + + /** + * Creates a @{link Validator} that ensure that a value is a valid Java + * Regular Expression with at least <code>minCapturingGroups</code> + * capturing groups and at most <code>maxCapturingGroups</code> capturing + * groups. If <code>supportAttributeExpressionLanguage</code> is set to + * <code>true</code>, the value may also include the Expression Language, + * but the result of evaluating the Expression Language will be applied + * before the Regular Expression is performed. In this case, the Expression + * Language will not support FlowFile Attributes but only System/JVM + * Properties + * + * @param minCapturingGroups + * @param maxCapturingGroups + * @param supportAttributeExpressionLanguage + * @return + */ + public static Validator createRegexValidator(final int minCapturingGroups, final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + try { + final String substituted; + if (supportAttributeExpressionLanguage) { + try { + substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString()).build(); + } + } else { + substituted = value; + } + + final Pattern pattern = Pattern.compile(substituted); + final int numGroups = pattern.matcher("").groupCount(); + if (numGroups < minCapturingGroups || numGroups > maxCapturingGroups) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups).build(); + } + + return new ValidationResult.Builder().subject(subject).input(value).valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Not a valid Java Regular Expression").build(); + } + + } + }; + } + + public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType, final boolean allowExtraCharacters) { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String syntaxError = context.newExpressionLanguageCompiler().validateExpression(input, allowExtraCharacters); + if (syntaxError != null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(syntaxError).build(); + } + + final ResultType resultType = allowExtraCharacters ? ResultType.STRING : context.newExpressionLanguageCompiler().getResultType(input); + if (!resultType.equals(expectedResultType)) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType).build(); + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + } + + public static Validator createLongValidator(final long minimum, final long maximum, final boolean inclusive) { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + String reason = null; + try { + final long longVal = Long.parseLong(input); + if (longVal < minimum || (!inclusive && longVal == minimum) | longVal > maximum || (!inclusive && longVal == maximum)) { + reason = "Value must be between " + minimum + " and " + maximum + " (" + (inclusive ? "inclusive" : "exclusive") + ")"; + } + } catch (final NumberFormatException e) { + reason = "not a valid integer"; + } + + return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(reason == null).build(); + } + + }; + } + + // + // + // SPECIFIC VALIDATOR IMPLEMENTATIONS THAT CANNOT BE ANONYMOUS CLASSES + // + // + static class TimePeriodValidator implements Validator { + + private final Pattern pattern; + + private final long minNanos; + private final long maxNanos; + + private final String minValueEnglish; + private final String maxValueEnglish; + + public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) { + pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX); + + this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit); + this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit); + this.minValueEnglish = minValue + " " + minTimeUnit.toString(); + this.maxValueEnglish = maxValue + " " + maxTimeUnit.toString(); + } + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (input == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build(); + } + final String lowerCase = input.toLowerCase(); + final boolean validSyntax = pattern.matcher(lowerCase).matches(); + final ValidationResult.Builder builder = new ValidationResult.Builder(); + if (validSyntax) { + final long nanos = FormatUtils.getTimeDuration(lowerCase, TimeUnit.NANOSECONDS); + + if (nanos < minNanos || nanos > maxNanos) { + builder.subject(subject).input(input).valid(false) + .explanation("Must be in the range of " + minValueEnglish + " to " + maxValueEnglish); + } else { + builder.subject(subject).input(input).valid(true); + } + } else { + builder.subject(subject).input(input).valid(false) + .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days"); + } + return builder.build(); + } + } + + public static class FileExistsValidator implements Validator { + + private final boolean allowEL; + + public FileExistsValidator(final boolean allowExpressionLanguage) { + this.allowEL = allowExpressionLanguage; + } + + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final String substituted; + if (allowEL) { + try { + substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false) + .explanation("Not a valid Expression Language value: " + e.getMessage()).build(); + } + } else { + substituted = value; + } + + final File file = new File(substituted); + final boolean valid = file.exists(); + final String explanation = valid ? null : "File " + file + " does not exist"; + return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build(); + } + } + + public static class DirectoryExistsValidator implements Validator { + + private final boolean allowEL; + private final boolean create; + + public DirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean create) { + this.allowEL = allowExpressionLanguage; + this.create = create; + } + + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final String substituted; + if (allowEL) { + try { + substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false) + .explanation("Not a valid Expression Language value: " + e.getMessage()).build(); + } + + if (substituted.trim().isEmpty() && !value.trim().isEmpty()) { + // User specified an Expression and nothing more... assume valid. + return new ValidationResult.Builder().subject(subject).input(value).valid(true).build(); + } + } else { + substituted = value; + } + + String reason = null; + try { + final File file = new File(substituted); + if (!file.exists()) { + if (!create) { + reason = "Directory does not exist"; + } else if (!file.mkdirs()) { + reason = "Directory does not exist and could not be created"; + } + } else if (!file.isDirectory()) { + reason = "Path does not point to a directory"; + } + } catch (final Exception e) { + reason = "Value is not a valid directory name"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + } + + public static Validator createControllerServiceExistsValidator(final Class<? extends ControllerService> serviceClass) { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final ControllerService svc = context.getControllerServiceLookup().getControllerService(input); + + if (svc == null) { + return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("No Controller Service exists with this ID").build(); + } + + if (!serviceClass.isAssignableFrom(svc.getClass())) { + return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName()).build(); + } + + final ValidationContext serviceValidationContext = context.getControllerServiceValidationContext(svc); + final Collection<ValidationResult> serviceValidationResults = svc.validate(serviceValidationContext); + for (final ValidationResult result : serviceValidationResults) { + if (!result.isValid()) { + return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service " + input + " is not valid: " + result.getExplanation()).build(); + } + } + + return new ValidationResult.Builder().input(input).subject(subject).valid(true).build(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java new file mode 100644 index 0000000..359def2 --- /dev/null +++ b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.util.FormatUtils; + +import org.junit.Test; + +public class TestFormatUtils { + + @Test + public void testParse() { + assertEquals(3, FormatUtils.getTimeDuration("3000 ms", TimeUnit.SECONDS)); + assertEquals(3000, FormatUtils.getTimeDuration("3000 s", TimeUnit.SECONDS)); + assertEquals(0, FormatUtils.getTimeDuration("999 millis", TimeUnit.SECONDS)); + assertEquals(4L * 24L * 60L * 60L * 1000000000L, FormatUtils.getTimeDuration("4 days", TimeUnit.NANOSECONDS)); + assertEquals(24, FormatUtils.getTimeDuration("1 DAY", TimeUnit.HOURS)); + assertEquals(60, FormatUtils.getTimeDuration("1 hr", TimeUnit.MINUTES)); + assertEquals(60, FormatUtils.getTimeDuration("1 Hrs", TimeUnit.MINUTES)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java ---------------------------------------------------------------------- diff --git a/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java new file mode 100644 index 0000000..2ae50c9 --- /dev/null +++ b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import org.junit.Test; + +public class TestStandardValidators { + + @Test + public void testTimePeriodValidator() { + Validator val = StandardValidators.createTimePeriodValidator(1L, TimeUnit.SECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS); + ValidationResult vr; + + vr = val.validate("TimePeriodTest", "0 sense made", null); + assertFalse(vr.isValid()); + + vr = val.validate("TimePeriodTest", null, null); + assertFalse(vr.isValid()); + + vr = val.validate("TimePeriodTest", "0 secs", null); + assertFalse(vr.isValid()); + + vr = val.validate("TimePeriodTest", "999 millis", null); + assertFalse(vr.isValid()); + + vr = val.validate("TimePeriodTest", "999999999 nanos", null); + assertFalse(vr.isValid()); + + vr = val.validate("TimePeriodTest", "1 sec", null); + assertTrue(vr.isValid()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/pom.xml b/commons/remote-communications-utils/pom.xml new file mode 100644 index 0000000..5e5ebc1 --- /dev/null +++ b/commons/remote-communications-utils/pom.xml @@ -0,0 +1,29 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>remote-communications-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>remote-communications-utils</name> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java new file mode 100644 index 0000000..77c34c9 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class StandardVersionNegotiator implements VersionNegotiator { + + private final List<Integer> versions; + private int curVersion; + + public StandardVersionNegotiator(final int... supportedVersions) { + if (Objects.requireNonNull(supportedVersions).length == 0) { + throw new IllegalArgumentException("At least one version must be supported"); + } + + final List<Integer> supported = new ArrayList<>(); + for (final int version : supportedVersions) { + supported.add(version); + } + this.versions = Collections.unmodifiableList(supported); + this.curVersion = supportedVersions[0]; + } + + @Override + public int getVersion() { + return curVersion; + } + + @Override + public void setVersion(final int version) throws IllegalArgumentException { + if (!isVersionSupported(version)) { + throw new IllegalArgumentException("Version " + version + " is not supported"); + } + + this.curVersion = version; + } + + @Override + public int getPreferredVersion() { + return versions.get(0); + } + + @Override + public Integer getPreferredVersion(final int maxVersion) { + for (final Integer version : this.versions) { + if (maxVersion >= version) { + return version; + } + } + return null; + } + + @Override + public boolean isVersionSupported(final int version) { + return versions.contains(version); + } + + @Override + public List<Integer> getSupportedVersions() { + return versions; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java new file mode 100644 index 0000000..74f9b3d --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.List; + +public interface VersionNegotiator { + + /** + * @return the currently configured Version of this resource + */ + int getVersion(); + + /** + * Sets the version of this resource to the specified version. Only the + * lower byte of the version is relevant. + * + * @param version + * @throws IllegalArgumentException if the given Version is not supported by + * this resource, as is indicated by the {@link #isVersionSupported(int)} + * method + */ + void setVersion(int version) throws IllegalArgumentException; + + /** + * + * @return the Version of this resource that is preferred + */ + int getPreferredVersion(); + + /** + * Gets the preferred version of this resource that is no greater than the + * given maxVersion. If no acceptable version exists that is less than + * <code>maxVersion</code>, then <code>null</code> is returned + * + * @param maxVersion + * @return + */ + Integer getPreferredVersion(int maxVersion); + + /** + * Indicates whether or not the specified version is supported by this + * resource + * + * @param version + * @return + */ + boolean isVersionSupported(int version); + + List<Integer> getSupportedVersions(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java new file mode 100644 index 0000000..05fd915 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +/** + * Indicates that the user disabled transmission while communications were + * taking place with a peer + */ +public class TransmissionDisabledException extends RuntimeException { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java new file mode 100644 index 0000000..71cf894 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +public class CompressionInputStream extends InputStream { + + private final InputStream in; + private final Inflater inflater; + + private byte[] compressedBuffer; + private byte[] buffer; + + private int bufferIndex; + private boolean eos = false; // whether or not we've reached the end of stream + private boolean allDataRead = false; // different from eos b/c eos means allDataRead == true && buffer is empty + + private final byte[] fourByteBuffer = new byte[4]; + + public CompressionInputStream(final InputStream in) { + this.in = in; + inflater = new Inflater(); + + buffer = new byte[0]; + compressedBuffer = new byte[0]; + bufferIndex = 1; + } + + private String toHex(final byte[] array) { + final StringBuilder sb = new StringBuilder("0x"); + for (final byte b : array) { + final String hex = Integer.toHexString(b).toUpperCase(); + if (hex.length() == 1) { + sb.append("0"); + } + sb.append(hex); + } + return sb.toString(); + } + + protected void readChunkHeader() throws IOException { + // Ensure that we have a valid SYNC chunk + fillBuffer(fourByteBuffer); + if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) { + throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer)); + } + + // determine the size of the decompressed buffer + fillBuffer(fourByteBuffer); + buffer = new byte[toInt(fourByteBuffer)]; + + // determine the size of the compressed buffer + fillBuffer(fourByteBuffer); + compressedBuffer = new byte[toInt(fourByteBuffer)]; + + bufferIndex = buffer.length; // indicate that buffer is empty + } + + private int toInt(final byte[] data) { + return ((data[0] & 0xFF) << 24) + | ((data[1] & 0xFF) << 16) + | ((data[2] & 0xFF) << 8) + | (data[3] & 0xFF); + } + + protected void bufferAndDecompress() throws IOException { + if (allDataRead) { + eos = true; + return; + } + + readChunkHeader(); + fillBuffer(compressedBuffer); + + inflater.setInput(compressedBuffer); + try { + inflater.inflate(buffer); + } catch (final DataFormatException e) { + throw new IOException(e); + } + inflater.reset(); + + bufferIndex = 0; + final int moreDataByte = in.read(); + if (moreDataByte < 1) { + allDataRead = true; + } else if (moreDataByte > 1) { + throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte); + } + } + + private void fillBuffer(final byte[] buffer) throws IOException { + int len; + int bytesLeft = buffer.length; + int bytesRead = 0; + while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) { + bytesLeft -= len; + bytesRead += len; + } + + if (bytesRead < buffer.length) { + throw new EOFException(); + } + } + + private boolean isBufferEmpty() { + return bufferIndex >= buffer.length; + } + + @Override + public int read() throws IOException { + if (eos) { + return -1; + } + + if (isBufferEmpty()) { + bufferAndDecompress(); + } + + if (isBufferEmpty()) { + eos = true; + return -1; + } + + return buffer[bufferIndex++]; + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (eos) { + return -1; + } + + if (isBufferEmpty()) { + bufferAndDecompress(); + } + + if (isBufferEmpty()) { + eos = true; + return -1; + } + + final int free = buffer.length - bufferIndex; + final int bytesToTransfer = Math.min(len, free); + System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer); + bufferIndex += bytesToTransfer; + + return bytesToTransfer; + } + + /** + * Does nothing. Does NOT close underlying InputStream + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + + } +}