http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/test/resources/logback-test.xml b/commons/nifi-stream-utils/src/test/resources/logback-test.xml new file mode 100644 index 0000000..0f3f60c --- /dev/null +++ b/commons/nifi-stream-utils/src/test/resources/logback-test.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration scan="true" scanPeriod="30 seconds"> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%-4r [%t] %-5p %c - %m%n</pattern> + </encoder> + </appender> + + <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> + <logger name="org.apache.nifi" level="DEBUG"/> + + <!-- Logger for managing logging statements for nifi clusters. --> + <logger name="org.apache.nifi.cluster" level="INFO"/> + + <!-- + Logger for logging HTTP requests received by the web server. Setting + log level to 'debug' activates HTTP request logging. + --> + <logger name="org.apache.nifi.server.JettyServer" level="INFO"/> + + <!-- Logger for managing logging statements for jetty --> + <logger name="org.mortbay" level="INFO"/> + + <!-- Suppress non-error messages due to excessive logging by class --> + <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/> + + <logger name="org.apache.nifi.processors.standard" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + </root> + +</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/.gitignore ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/.gitignore b/commons/nifi-utils/.gitignore new file mode 100755 index 0000000..12c5231 --- /dev/null +++ b/commons/nifi-utils/.gitignore @@ -0,0 +1,8 @@ +/target +/target +/target +/target +/target +/target +/target +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml new file mode 100644 index 0000000..8aeccd7 --- /dev/null +++ b/commons/nifi-utils/pom.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + <name>NiFi Utils</name> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java new file mode 100644 index 0000000..e22032b --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/CompoundUpdateMonitor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * An {@link UpdateMonitor} that combines multiple <code>UpdateMonitor</code>s + * such that it will indicate a change in a file only if ALL sub-monitors + * indicate a change. The sub-monitors will be applied in the order given and if + * any indicates that the state has not changed, the subsequent sub-monitors may + * not be given a chance to run + */ +public class CompoundUpdateMonitor implements UpdateMonitor { + + private final List<UpdateMonitor> monitors; + + public CompoundUpdateMonitor(final UpdateMonitor first, final UpdateMonitor... others) { + monitors = new ArrayList<>(); + monitors.add(first); + for (final UpdateMonitor monitor : others) { + monitors.add(monitor); + } + } + + @Override + public Object getCurrentState(final Path path) throws IOException { + return new DeferredMonitorAction(monitors, path); + } + + private static class DeferredMonitorAction { + + private static final Object NON_COMPUTED_VALUE = new Object(); + + private final List<UpdateMonitor> monitors; + private final Path path; + + private final Object[] preCalculated; + + public DeferredMonitorAction(final List<UpdateMonitor> monitors, final Path path) { + this.monitors = monitors; + this.path = path; + preCalculated = new Object[monitors.size()]; + + for (int i = 0; i < preCalculated.length; i++) { + preCalculated[i] = NON_COMPUTED_VALUE; + } + } + + private Object getCalculatedValue(final int i) throws IOException { + if (preCalculated[i] == NON_COMPUTED_VALUE) { + preCalculated[i] = monitors.get(i).getCurrentState(path); + } + + return preCalculated[i]; + } + + @Override + public boolean equals(final Object obj) { + // must return true unless ALL DeferredMonitorAction's indicate that they are different + if (obj == null) { + return false; + } + + if (!(obj instanceof DeferredMonitorAction)) { + return false; + } + + final DeferredMonitorAction other = (DeferredMonitorAction) obj; + try { + // Go through each UpdateMonitor's value and check if the value has changed. + for (int i = 0; i < preCalculated.length; i++) { + final Object mine = getCalculatedValue(i); + final Object theirs = other.getCalculatedValue(i); + + if (mine == theirs) { + // same + return true; + } + + if (mine == null && theirs == null) { + // same + return true; + } + + if (mine.equals(theirs)) { + return true; + } + } + } catch (final IOException e) { + return false; + } + + // No DeferredMonitorAction was the same as last time. Therefore, it's not equal + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java new file mode 100644 index 0000000..f446465 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/LastModifiedMonitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class LastModifiedMonitor implements UpdateMonitor { + + @Override + public Object getCurrentState(final Path path) throws IOException { + return Files.getLastModifiedTime(path); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java new file mode 100644 index 0000000..1326c2a --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/MD5SumMonitor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class MD5SumMonitor implements UpdateMonitor { + + @Override + public Object getCurrentState(final Path path) throws IOException { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (final NoSuchAlgorithmException nsae) { + throw new AssertionError(nsae); + } + + try (final FileInputStream fis = new FileInputStream(path.toFile())) { + int len; + final byte[] buffer = new byte[8192]; + while ((len = fis.read(buffer)) > 0) { + digest.update(buffer, 0, len); + } + } + + // Return a ByteBuffer instead of byte[] because we want equals() to do a deep equality + return ByteBuffer.wrap(digest.digest()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java new file mode 100644 index 0000000..785f1ac --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/SynchronousFileWatcher.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Allows the user to configure a {@link java.nio.file.Path Path} to watch for + * modifications and periodically poll to check if the file has been modified + */ +public class SynchronousFileWatcher { + + private final Path path; + private final long checkUpdateMillis; + private final UpdateMonitor monitor; + private final AtomicReference<StateWrapper> lastState; + private final Lock resourceLock = new ReentrantLock(); + + public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor) { + this(path, monitor, 0L); + } + + public SynchronousFileWatcher(final Path path, final UpdateMonitor monitor, final long checkMillis) { + if (checkMillis < 0) { + throw new IllegalArgumentException(); + } + + this.path = path; + checkUpdateMillis = checkMillis; + this.monitor = monitor; + + Object currentState; + try { + currentState = monitor.getCurrentState(path); + } catch (final IOException e) { + currentState = null; + } + + this.lastState = new AtomicReference<>(new StateWrapper(currentState)); + } + + /** + * Checks if the file has been updated according to the configured + * {@link UpdateMonitor} and resets the state + * + * @return + * @throws IOException + */ + public boolean checkAndReset() throws IOException { + if (checkUpdateMillis <= 0) { // if checkUpdateMillis <= 0, always check + return checkForUpdate(); + } else { + final StateWrapper stateWrapper = lastState.get(); + if (stateWrapper.getTimestamp() < System.currentTimeMillis() - checkUpdateMillis) { + return checkForUpdate(); + } + return false; + } + } + + private boolean checkForUpdate() throws IOException { + if (resourceLock.tryLock()) { + try { + final StateWrapper wrapper = lastState.get(); + final Object newState = monitor.getCurrentState(path); + if (newState == null && wrapper.getState() == null) { + return false; + } + if (newState == null || wrapper.getState() == null) { + lastState.set(new StateWrapper(newState)); + return true; + } + + final boolean unmodified = newState.equals(wrapper.getState()); + if (!unmodified) { + lastState.set(new StateWrapper(newState)); + } + return !unmodified; + } finally { + resourceLock.unlock(); + } + } else { + return false; + } + } + + private static class StateWrapper { + + private final Object state; + private final long timestamp; + + public StateWrapper(final Object state) { + this.state = state; + this.timestamp = System.currentTimeMillis(); + } + + public Object getState() { + return state; + } + + public long getTimestamp() { + return timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java new file mode 100644 index 0000000..33fb444 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.nio.file.Path; + +public interface UpdateMonitor { + + Object getCurrentState(Path path) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java new file mode 100644 index 0000000..92061e0 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +public class BooleanHolder extends ObjectHolder<Boolean> { + + public BooleanHolder(final boolean initialValue) { + super(initialValue); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java new file mode 100644 index 0000000..9954bfb --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class FormatUtils { + + private static final String UNION = "|"; + + // for Data Sizes + private static final double BYTES_IN_KILOBYTE = 1024; + private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024; + private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024; + private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024; + + // for Time Durations + private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds"); + private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds"); + private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds"); + private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes"); + private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours"); + private static final String DAYS = join(UNION, "d", "day", "days"); + + private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS); + public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")"; + public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX); + + /** + * Formats the specified count by adding commas. + * + * @param count + * @return + */ + public static String formatCount(final long count) { + return NumberFormat.getIntegerInstance().format(count); + } + + /** + * Formats the specified duration in 'mm:ss.SSS' format. + * + * @param sourceDuration + * @param sourceUnit + * @return + */ + public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) { + final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit); + final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS"); + return formatter.format(new Date(millis)); + } + + /** + * Formats the specified duration in 'HH:mm:ss.SSS' format. + * + * @param sourceDuration + * @param sourceUnit + * @return + */ + public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) { + final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit); + final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); + final int hours = (int) (millis / millisInHour); + final long whatsLeft = millis - hours * millisInHour; + + return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS").format(new Date(whatsLeft)); + } + + private static String pad(final int val) { + return (val < 10) ? "0" + val : String.valueOf(val); + } + + /** + * Formats the specified data size in human readable format. + * + * @param dataSize Data size in bytes + * @return Human readable format + */ + public static String formatDataSize(final double dataSize) { + // initialize the formatter + final NumberFormat format = NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(2); + + // check terabytes + double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE; + if (dataSizeToFormat > 1) { + return format.format(dataSizeToFormat) + " TB"; + } + + // check gigabytes + dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE; + if (dataSizeToFormat > 1) { + return format.format(dataSizeToFormat) + " GB"; + } + + // check megabytes + dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE; + if (dataSizeToFormat > 1) { + return format.format(dataSizeToFormat) + " MB"; + } + + // check kilobytes + dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE; + if (dataSizeToFormat > 1) { + return format.format(dataSizeToFormat) + " KB"; + } + + // default to bytes + return format.format(dataSize) + " bytes"; + } + + public static long getTimeDuration(final String value, final TimeUnit desiredUnit) { + final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration"); + } + + final String duration = matcher.group(1); + final String units = matcher.group(2); + TimeUnit specifiedTimeUnit = null; + switch (units.toLowerCase()) { + case "ns": + case "nano": + case "nanos": + case "nanoseconds": + specifiedTimeUnit = TimeUnit.NANOSECONDS; + break; + case "ms": + case "milli": + case "millis": + case "milliseconds": + specifiedTimeUnit = TimeUnit.MILLISECONDS; + break; + case "s": + case "sec": + case "secs": + case "second": + case "seconds": + specifiedTimeUnit = TimeUnit.SECONDS; + break; + case "m": + case "min": + case "mins": + case "minute": + case "minutes": + specifiedTimeUnit = TimeUnit.MINUTES; + break; + case "h": + case "hr": + case "hrs": + case "hour": + case "hours": + specifiedTimeUnit = TimeUnit.HOURS; + break; + case "d": + case "day": + case "days": + specifiedTimeUnit = TimeUnit.DAYS; + break; + } + + final long durationVal = Long.parseLong(duration); + return desiredUnit.convert(durationVal, specifiedTimeUnit); + } + + public static String formatUtilization(final double utilization) { + return utilization + "%"; + } + + private static String join(final String delimiter, final String... values) { + if (values.length == 0) { + return ""; + } else if (values.length == 1) { + return values[0]; + } + + final StringBuilder sb = new StringBuilder(); + sb.append(values[0]); + for (int i = 1; i < values.length; i++) { + sb.append(delimiter).append(values[i]); + } + + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java new file mode 100644 index 0000000..213bbc0 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +public class IntegerHolder extends ObjectHolder<Integer> { + + public IntegerHolder(final int initialValue) { + super(initialValue); + } + + public int addAndGet(final int delta) { + final int curValue = get(); + final int newValue = curValue + delta; + set(newValue); + return newValue; + } + + public int getAndAdd(final int delta) { + final int curValue = get(); + final int newValue = curValue + delta; + set(newValue); + return curValue; + } + + public int incrementAndGet() { + return addAndGet(1); + } + + public int getAndIncrement() { + return getAndAdd(1); + } + + public int decrementAndGet() { + return addAndGet(-1); + } + + public int getAndDecrement() { + return getAndAdd(-1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java new file mode 100644 index 0000000..ef70ce8 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +/** + * Wraps a Long value so that it can be declared <code>final</code> and still be + * accessed from which inner classes; the functionality is similar to that of an + * AtomicLong, but operations on this class are not atomic. This results in + * greater performance when the atomicity is not needed. + */ +public class LongHolder extends ObjectHolder<Long> { + + public LongHolder(final long initialValue) { + super(initialValue); + } + + public long addAndGet(final long delta) { + final long curValue = get(); + final long newValue = curValue + delta; + set(newValue); + return newValue; + } + + public long getAndAdd(final long delta) { + final long curValue = get(); + final long newValue = curValue + delta; + set(newValue); + return curValue; + } + + public long incrementAndGet() { + return addAndGet(1); + } + + public long getAndIncrement() { + return getAndAdd(1); + } + + public long decrementAndGet() { + return addAndGet(-1L); + } + + public long getAndDecrement() { + return getAndAdd(-1L); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java new file mode 100644 index 0000000..a58ec6a --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +/** + * A bean that holds a single value of type T. + * + * @param <T> + */ +public class ObjectHolder<T> { + + private T value; + + public ObjectHolder(final T initialValue) { + this.value = initialValue; + } + + public T get() { + return value; + } + + public void set(T value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java new file mode 100644 index 0000000..c0bb830 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Thread-safe implementation of a RingBuffer + * + * @param <T> + */ +public class RingBuffer<T> { + + private final Object[] buffer; + private int insertionPointer = 0; + private boolean filled = false; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public RingBuffer(final int size) { + buffer = new Object[size]; + } + + /** + * Adds the given value to the RingBuffer and returns the value that was + * removed in order to make room. + * + * @param value + * @return + */ + @SuppressWarnings("unchecked") + public T add(final T value) { + Objects.requireNonNull(value); + + writeLock.lock(); + try { + final Object removed = buffer[insertionPointer]; + + buffer[insertionPointer] = value; + + if (insertionPointer == buffer.length - 1) { + filled = true; + } + + insertionPointer = (insertionPointer + 1) % buffer.length; + return (T) removed; + } finally { + writeLock.unlock(); + } + } + + public int getSize() { + readLock.lock(); + try { + return filled ? buffer.length : insertionPointer; + } finally { + readLock.unlock(); + } + } + + public List<T> getSelectedElements(final Filter<T> filter) { + return getSelectedElements(filter, Integer.MAX_VALUE); + } + + public List<T> getSelectedElements(final Filter<T> filter, final int maxElements) { + final List<T> selected = new ArrayList<>(1000); + int numSelected = 0; + readLock.lock(); + try { + for (int i = 0; i < buffer.length && numSelected < maxElements; i++) { + final int idx = (insertionPointer + i) % buffer.length; + final Object val = buffer[idx]; + if (val == null) { + continue; + } + + @SuppressWarnings("unchecked") + final T element = (T) val; + if (filter.select(element)) { + selected.add(element); + numSelected++; + } + } + } finally { + readLock.unlock(); + } + return selected; + } + + public int countSelectedElements(final Filter<T> filter) { + int numSelected = 0; + readLock.lock(); + try { + for (int i = 0; i < buffer.length; i++) { + final int idx = (insertionPointer + i) % buffer.length; + final Object val = buffer[idx]; + if (val == null) { + continue; + } + + @SuppressWarnings("unchecked") + final T element = (T) val; + if (filter.select(element)) { + numSelected++; + } + } + } finally { + readLock.unlock(); + } + + return numSelected; + } + + /** + * Removes all elements from the RingBuffer that match the given filter + * + * @param filter + * @return + */ + public int removeSelectedElements(final Filter<T> filter) { + int count = 0; + + writeLock.lock(); + try { + for (int i = 0; i < buffer.length; i++) { + final int idx = (insertionPointer + i + 1) % buffer.length; + final Object val = buffer[idx]; + if (val == null) { + continue; + } + + @SuppressWarnings("unchecked") + final T element = (T) val; + + if (filter.select(element)) { + buffer[idx] = null; + } + } + } finally { + writeLock.unlock(); + } + + return count; + } + + public List<T> asList() { + return getSelectedElements(new Filter<T>() { + @Override + public boolean select(final T value) { + return true; + } + }); + } + + public T getOldestElement() { + readLock.lock(); + try { + return getElementData(insertionPointer); + } finally { + readLock.unlock(); + } + } + + public T getNewestElement() { + readLock.lock(); + try { + int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1; + return getElementData(index); + } finally { + readLock.unlock(); + } + } + + @SuppressWarnings("unchecked") + private T getElementData(final int index) { + readLock.lock(); + try { + return (T) buffer[index]; + } finally { + readLock.unlock(); + } + } + + /** + * Iterates over each element in the RingBuffer, calling the + * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element + * in the RingBuffer. If the Evaluator returns {@code false}, the method + * will skip all remaining elements in the RingBuffer; otherwise, the next + * element will be evaluated until all elements have been evaluated. + * + * @param evaluator + */ + public void forEach(final ForEachEvaluator<T> evaluator) { + forEach(evaluator, IterationDirection.FORWARD); + } + + /** + * Iterates over each element in the RingBuffer, calling the + * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element + * in the RingBuffer. If the Evaluator returns {@code false}, the method + * will skip all remaining elements in the RingBuffer; otherwise, the next + * element will be evaluated until all elements have been evaluated. + * + * @param evaluator + * @param iterationDirection the order in which to iterate over the elements + * in the RingBuffer + */ + public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirection iterationDirection) { + readLock.lock(); + try { + final int startIndex; + final int endIndex; + final int increment; + + if (iterationDirection == IterationDirection.FORWARD) { + startIndex = 0; + endIndex = buffer.length - 1; + increment = 1; + } else { + startIndex = buffer.length - 1; + endIndex = 0; + increment = -1; + } + + for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) { + final int idx = (insertionPointer + i) % buffer.length; + final Object val = buffer[idx]; + if (val == null) { + continue; + } + + @SuppressWarnings("unchecked") + final T element = (T) val; + if (!evaluator.evaluate(element)) { + return; + } + } + } finally { + readLock.unlock(); + } + } + + public static interface Filter<S> { + + boolean select(S value); + } + + /** + * Defines an interface that can be used to iterate over all of the elements + * in the RingBuffer via the {@link #forEach} method + * + * @param <S> + */ + public static interface ForEachEvaluator<S> { + + /** + * Evaluates the given element and returns {@code true} if the next + * element should be evaluated, {@code false} otherwise + * + * @param value + * @return + */ + boolean evaluate(S value); + } + + public static enum IterationDirection { + + FORWARD, + BACKWARD; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java new file mode 100644 index 0000000..cd11930 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.concurrent.TimeUnit; + +public final class StopWatch { + + private long startNanos = -1L; + private long duration = -1L; + + /** + * Creates a StopWatch but does not start it + */ + public StopWatch() { + this(false); + } + + /** + * @param autoStart whether or not the timer should be started automatically + */ + public StopWatch(final boolean autoStart) { + if (autoStart) { + start(); + } + } + + public void start() { + this.startNanos = System.nanoTime(); + this.duration = -1L; + } + + public void stop() { + if (startNanos < 0) { + throw new IllegalStateException("StopWatch has not been started"); + } + this.duration = System.nanoTime() - startNanos; + this.startNanos = -1L; + } + + /** + * Returns the amount of time that the StopWatch was running. + * + * @param timeUnit + * @return + * + * @throws IllegalStateException if the StopWatch has not been stopped via + * {@link #stop()} + */ + public long getDuration(final TimeUnit timeUnit) { + if (duration < 0) { + throw new IllegalStateException("Cannot get duration until StopWatch has been stopped"); + } + return timeUnit.convert(duration, TimeUnit.NANOSECONDS); + } + + /** + * Returns the amount of time that has elapsed since the timer was started. + * + * @param timeUnit + * @return + */ + public long getElapsed(final TimeUnit timeUnit) { + return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS); + } + + public String calculateDataRate(final long bytes) { + final double seconds = (double) duration / 1000000000.0D; + final long dataSize = (long) (bytes / seconds); + return FormatUtils.formatDataSize(dataSize) + "/sec"; + } + + public String getDuration() { + final StringBuilder sb = new StringBuilder(); + + long duration = this.duration; + final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L; + duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES); + + final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L; + duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS); + + final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L; + duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); + + final long nanos = duration % 1000000L; + + if (minutes > 0) { + sb.append(minutes).append(" minutes"); + } + + if (seconds > 0) { + if (minutes > 0) { + sb.append(", "); + } + + sb.append(seconds).append(" seconds"); + } + + if (millis > 0) { + if (seconds > 0) { + sb.append(", "); + } + + sb.append(millis).append(" millis"); + } + if (seconds == 0 && millis == 0) { + sb.append(nanos).append(" nanos"); + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java new file mode 100644 index 0000000..63736ed --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +/** + * + * @author unattrib + * @param <A> + * @param <B> + */ +public class Tuple<A, B> { + + final A key; + final B value; + + public Tuple(A key, B value) { + this.key = key; + this.value = value; + } + + public A getKey() { + return key; + } + + public B getValue() { + return value; + } + + @Override + public boolean equals(final Object other) { + if (other == null) { + return false; + } + if (other == this) { + return true; + } + if (!(other instanceof Tuple)) { + return false; + } + + final Tuple<?, ?> tuple = (Tuple<?, ?>) other; + if (key == null) { + if (tuple.key != null) { + return false; + } + } else { + if (!key.equals(tuple.key)) { + return false; + } + } + + if (value == null) { + if (tuple.value != null) { + return false; + } + } else { + if (!value.equals(tuple.value)) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java new file mode 100644 index 0000000..a8d7e82 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.concurrency; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +public class DebugDisabledTimedLock implements DebuggableTimedLock { + + private final Lock lock; + + public DebugDisabledTimedLock(final Lock lock) { + this.lock = lock; + } + + /** + * + * @return + */ + @Override + public boolean tryLock() { + return lock.tryLock(); + } + + /** + * + * @param timeout + * @param timeUnit + * @return + */ + @Override + public boolean tryLock(final long timeout, final TimeUnit timeUnit) { + try { + return lock.tryLock(timeout, timeUnit); + } catch (InterruptedException e) { + return false; + } + } + + /** + * + */ + @Override + public void lock() { + lock.lock(); + } + + @Override + public void unlock(final String task) { + lock.unlock(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java new file mode 100644 index 0000000..f082168 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.concurrency; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebugEnabledTimedLock implements DebuggableTimedLock { + + private final Lock lock; + private final Logger logger; + private long lockTime = 0L; + + private final Map<String, Long> lockIterations = new HashMap<>(); + private final Map<String, Long> lockNanos = new HashMap<>(); + + private final String name; + private final int iterationFrequency; + + public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) { + this.lock = lock; + this.name = name; + this.iterationFrequency = iterationFrequency; + logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name); + } + + /** + * + * @return + */ + @Override + public boolean tryLock() { + logger.trace("Trying to obtain Lock: {}", name); + final boolean success = lock.tryLock(); + if (!success) { + logger.trace("TryLock failed for Lock: {}", name); + return false; + } + logger.trace("TryLock successful"); + + return true; + } + + /** + * + * @param timeout + * @param timeUnit + * @return + */ + @Override + public boolean tryLock(final long timeout, final TimeUnit timeUnit) { + logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit); + final boolean success; + try { + success = lock.tryLock(timeout, timeUnit); + } catch (final InterruptedException ie) { + return false; + } + + if (!success) { + logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit); + return false; + } + logger.trace("TryLock successful"); + return true; + } + + /** + * + */ + @Override + public void lock() { + logger.trace("Obtaining Lock {}", name); + lock.lock(); + lockTime = System.nanoTime(); + logger.trace("Obtained Lock {}", name); + } + + /** + * + * @param task + */ + @Override + public void unlock(final String task) { + if (lockTime <= 0L) { + lock.unlock(); + return; + } + + logger.trace("Releasing Lock {}", name); + final long nanosLocked = System.nanoTime() - lockTime; + + Long startIterations = lockIterations.get(task); + if (startIterations == null) { + startIterations = 0L; + } + final long iterations = startIterations + 1L; + lockIterations.put(task, iterations); + + Long startNanos = lockNanos.get(task); + if (startNanos == null) { + startNanos = 0L; + } + final long totalNanos = startNanos + nanosLocked; + lockNanos.put(task, totalNanos); + + lockTime = -1L; + + lock.unlock(); + logger.trace("Released Lock {}", name); + + if (iterations % iterationFrequency == 0) { + logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java new file mode 100644 index 0000000..69da6e8 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.concurrency; + +import java.util.concurrent.TimeUnit; + +public interface DebuggableTimedLock { + + void lock(); + + boolean tryLock(long timePeriod, TimeUnit timeUnit); + + boolean tryLock(); + + void unlock(String task); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java new file mode 100644 index 0000000..532d3c3 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.concurrency; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TimedLock { + + private final DebugEnabledTimedLock enabled; + private final DebugDisabledTimedLock disabled; + + private final Logger logger; + + public TimedLock(final Lock lock, final String name, final int iterationFrequency) { + this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency); + this.disabled = new DebugDisabledTimedLock(lock); + + logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name); + } + + private DebuggableTimedLock getLock() { + return logger.isDebugEnabled() ? enabled : disabled; + } + + public boolean tryLock() { + return getLock().tryLock(); + } + + public boolean tryLock(final long timeout, final TimeUnit timeUnit) { + return getLock().tryLock(timeout, timeUnit); + } + + public void lock() { + getLock().lock(); + } + + public void unlock(final String task) { + getLock().unlock(task); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java new file mode 100644 index 0000000..2b95897 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public interface EntityAccess<T> { + + T aggregate(T oldValue, T toAdd); + + T createNew(); + + long getTimestamp(T entity); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java new file mode 100644 index 0000000..193abc6 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public class LongEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java new file mode 100644 index 0000000..dd8e523 --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TimedBuffer<T> { + + private final int numBins; + private final EntitySum<T>[] bins; + private final EntityAccess<T> entityAccess; + private final TimeUnit binPrecision; + + @SuppressWarnings("unchecked") + public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) { + this.binPrecision = binPrecision; + this.numBins = numBins + 1; + this.bins = new EntitySum[this.numBins]; + for (int i = 0; i < this.numBins; i++) { + this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor); + } + this.entityAccess = accessor; + } + + public T add(final T entity) { + final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins); + final EntitySum<T> sum = bins[binIdx]; + + return sum.addOrReset(entity); + } + + public T getAggregateValue(final long sinceEpochMillis) { + final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, TimeUnit.MILLISECONDS) % numBins); + + T total = null; + for (int i = 0; i < numBins; i++) { + int binIdx = (startBinIdx + i) % numBins; + final EntitySum<T> bin = bins[binIdx]; + + if (!bin.isExpired()) { + total = entityAccess.aggregate(total, bin.getValue()); + } + } + + return total; + } + + private static class EntitySum<S> { + + private final EntityAccess<S> entityAccess; + private final AtomicReference<S> ref = new AtomicReference<>(); + private final TimeUnit binPrecision; + private final int numConfiguredBins; + + public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) { + this.binPrecision = binPrecision; + this.entityAccess = aggregator; + this.numConfiguredBins = numConfiguredBins; + } + + private S add(final S event) { + S newValue; + S value; + do { + value = ref.get(); + newValue = entityAccess.aggregate(value, event); + } while (!ref.compareAndSet(value, newValue)); + + return newValue; + } + + public S getValue() { + return ref.get(); + } + + public boolean isExpired() { + // entityAccess.getTimestamp(curValue) represents the time at which the current value + // was last updated. If the last value is less than current time - 1 binPrecision, then it + // means that we've rolled over and need to reset the value. + final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision); + + final S curValue = ref.get(); + return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod); + } + + public S addOrReset(final S event) { + // entityAccess.getTimestamp(curValue) represents the time at which the current value + // was last updated. If the last value is less than current time - 1 binPrecision, then it + // means that we've rolled over and need to reset the value. + final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision); + + final S curValue = ref.get(); + if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) { + ref.compareAndSet(curValue, entityAccess.createNew()); + } + return add(event); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java new file mode 100644 index 0000000..07d31ea --- /dev/null +++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +public class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java new file mode 100644 index 0000000..c796a96 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestCompoundUpdateMonitor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.UUID; + +import org.apache.nifi.io.CompoundUpdateMonitor; +import org.apache.nifi.io.LastModifiedMonitor; +import org.apache.nifi.io.MD5SumMonitor; +import org.apache.nifi.io.UpdateMonitor; + +import org.junit.Test; + +public class TestCompoundUpdateMonitor { + + @Test + public void test() throws IOException { + final UpdateMonitor lastModified = new LastModifiedMonitor(); + final MD5SumMonitor md5 = new MD5SumMonitor(); + final CompoundUpdateMonitor compound = new CompoundUpdateMonitor(lastModified, md5); + + final File file = new File("target/" + UUID.randomUUID().toString()); + if (file.exists()) { + assertTrue(file.delete()); + } + assertTrue(file.createNewFile()); + + final Path path = file.toPath(); + + final Object curState = compound.getCurrentState(path); + final Object state2 = compound.getCurrentState(path); + + assertEquals(curState, state2); + file.setLastModified(System.currentTimeMillis() + 1000L); + final Object state3 = compound.getCurrentState(path); + assertEquals(state2, state3); + + final Object state4 = compound.getCurrentState(path); + assertEquals(state3, state4); + + final long lastModifiedDate = file.lastModified(); + try (final OutputStream out = new FileOutputStream(file)) { + out.write("Hello".getBytes("UTF-8")); + } + + file.setLastModified(lastModifiedDate); + + final Object state5 = compound.getCurrentState(path); + assertNotSame(state4, state5); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java new file mode 100644 index 0000000..fafffdd --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.util.RingBuffer; +import org.apache.nifi.util.RingBuffer.ForEachEvaluator; +import org.apache.nifi.util.RingBuffer.IterationDirection; + +import org.junit.Test; + +/** + * + */ +public class TestRingBuffer { + + @Test + public void testAsList() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final List<Integer> emptyList = ringBuffer.asList(); + assertTrue(emptyList.isEmpty()); + + for (int i = 0; i < 3; i++) { + ringBuffer.add(i); + } + + List<Integer> list = ringBuffer.asList(); + assertEquals(3, list.size()); + for (int i = 0; i < 3; i++) { + assertEquals(Integer.valueOf(i), list.get(i)); + } + + for (int i = 3; i < 10; i++) { + ringBuffer.add(i); + } + + list = ringBuffer.asList(); + assertEquals(10, list.size()); + for (int i = 0; i < 10; i++) { + assertEquals(Integer.valueOf(i), list.get(i)); + } + } + + @Test + public void testIterateForwards() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + final AtomicInteger countHolder = new AtomicInteger(0); + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + final int expected = values[counter++]; + countHolder.incrementAndGet(); + assertEquals(expected, value.intValue()); + return true; + } + + }, IterationDirection.FORWARD); + + assertEquals(4, countHolder.get()); + } + + @Test + public void testIterateForwardsAfterFull() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + for (int i = 0; i < 12; i++) { + ringBuffer.add(i); + } + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + if (counter < 6) { + assertEquals(counter + 6, value.intValue()); + } else { + final int expected = values[counter - 6]; + assertEquals(expected, value.intValue()); + } + + counter++; + return true; + } + + }, IterationDirection.FORWARD); + } + + @Test + public void testIterateBackwards() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + final AtomicInteger countHolder = new AtomicInteger(0); + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + final int index = values.length - 1 - counter; + final int expected = values[index]; + countHolder.incrementAndGet(); + + assertEquals(expected, value.intValue()); + counter++; + return true; + } + + }, IterationDirection.BACKWARD); + + assertEquals(4, countHolder.get()); + } + + @Test + public void testIterateBackwardsAfterFull() { + final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10); + + for (int i = 0; i < 12; i++) { + ringBuffer.add(i); + } + + final int[] values = new int[]{3, 5, 20, 7}; + for (final int v : values) { + ringBuffer.add(v); + } + + ringBuffer.forEach(new ForEachEvaluator<Integer>() { + int counter = 0; + + @Override + public boolean evaluate(final Integer value) { + if (counter < values.length) { + final int index = values.length - 1 - counter; + final int expected = values[index]; + + assertEquals(expected, value.intValue()); + counter++; + } + + return true; + } + + }, IterationDirection.BACKWARD); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java new file mode 100644 index 0000000..4b2c0d5 --- /dev/null +++ b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestSynchronousFileWatcher.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.timebuffer; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import org.junit.Test; + +import org.apache.nifi.io.MD5SumMonitor; +import org.apache.nifi.io.SynchronousFileWatcher; +import org.apache.nifi.io.UpdateMonitor; + +public class TestSynchronousFileWatcher { + + @Test + public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException { + final Path path = Paths.get("target/1.txt"); + Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING); + final UpdateMonitor monitor = new MD5SumMonitor(); + + final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L); + assertFalse(watcher.checkAndReset()); + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + + final FileOutputStream fos = new FileOutputStream(path.toFile()); + try { + fos.write("Good-bye, World!".getBytes("UTF-8")); + fos.getFD().sync(); + } finally { + fos.close(); + } + + assertTrue(watcher.checkAndReset()); + assertFalse(watcher.checkAndReset()); + + Thread.sleep(30L); + assertFalse(watcher.checkAndReset()); + } +}