http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java index 353f1b2..bf0d089 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java @@ -18,29 +18,27 @@ */ package com.datatorrent.lib.db.jdbc; - import java.sql.PreparedStatement; import java.sql.SQLException; - import java.util.List; import java.util.Map; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor; import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey; import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.datatorrent.api.Context; - import com.datatorrent.lib.appdata.gpo.GPOMutable; import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema; import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java index 835bcdd..89022a3 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java @@ -60,8 +60,7 @@ public class JdbcNonTransactionalStore extends JdbcTransactionalStore try { connection.setAutoCommit(true); - } - catch(SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -71,10 +70,9 @@ public class JdbcNonTransactionalStore extends JdbcTransactionalStore { Long lastWindowCommit = getCommittedWindowIdHelper(appId, operatorId); - if(lastWindowCommit == null) { + if (lastWindowCommit == null) { return -1L; - } - else { + } else { return lastWindowCommit; } } @@ -87,8 +85,7 @@ public class JdbcNonTransactionalStore extends JdbcTransactionalStore lastWindowFetchCommand.close(); lastWindowInsertCommand.close(); - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java index 7fff4e0..ea8f174 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java @@ -19,7 +19,6 @@ package com.datatorrent.lib.fileaccess; import java.io.IOException; -import java.util.Arrays; import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceStability; @@ -97,7 +96,9 @@ public class DTFileReader implements FileAccess.FileReader @Override public boolean next(Slice key, Slice value) throws IOException { - if (scanner.atEnd()) return false; + if (scanner.atEnd()) { + return false; + } Entry en = scanner.entry(); key.buffer = en.getBlockBuffer(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java index a9cfe00..7184a82 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java @@ -61,7 +61,8 @@ public abstract class FileAccessFSImpl implements FileAccess this.basePath = path; } - protected Path getFilePath(long bucketKey, String fileName) { + protected Path getFilePath(long bucketKey, String fileName) + { return new Path(getBucketPath(bucketKey), fileName); } @@ -71,7 +72,8 @@ public abstract class FileAccessFSImpl implements FileAccess } @Override - public long getFileSize(long bucketKey, String fileName) throws IOException { + public long getFileSize(long bucketKey, String fileName) throws IOException + { return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java index 2a3fd0e..7dfe4e9 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java @@ -143,14 +143,14 @@ public abstract class TFileImpl extends FileAccessFSImpl /** * Return {@link TFile} {@link Reader} - * */ - public static class DefaultTFileImpl extends TFileImpl{ + public static class DefaultTFileImpl extends TFileImpl + { @Override public FileReader getReader(long bucketKey, String fileName) throws IOException { - FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); long fileLength = getFileSize(bucketKey, fileName); super.setupConfig(fs.getConf()); return new TFileReader(fsdis, fileLength, fs.getConf()); @@ -158,17 +158,16 @@ public abstract class TFileImpl extends FileAccessFSImpl } - /** * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader} - * */ - public static class DTFileImpl extends TFileImpl { + public static class DTFileImpl extends TFileImpl + { @Override public FileReader getReader(long bucketKey, String fileName) throws IOException { - FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); long fileLength = getFileSize(bucketKey, fileName); super.setupConfig(fs.getConf()); return new DTFileReader(fsdis, fileLength, fs.getConf()); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java index 9ab6f82..0f0c92a 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java @@ -93,17 +93,20 @@ public class TFileReader implements FileAccess.FileReader try { return scanner.seekTo(key.buffer, key.offset, key.length); } catch (NullPointerException ex) { - if (closed) + if (closed) { throw new IOException("Stream was closed"); - else + } else { throw ex; + } } } @Override public boolean next(Slice key, Slice value) throws IOException { - if (scanner.atEnd()) return false; + if (scanner.atEnd()) { + return false; + } Entry en = scanner.entry(); byte[] rkey = new byte[en.getKeyLength()]; byte[] rval = new byte[en.getValueLength()]; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java index 6566ae0..7e9d544 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java @@ -37,7 +37,8 @@ public final class TFileWriter implements FileAccess.FileWriter private FSDataOutputStream fsdos; - public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException + public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, + String comparator, Configuration conf) throws IOException { this.fsdos = stream; writer = new Writer(stream, minBlockSize, compressName, comparator, conf); @@ -58,6 +59,9 @@ public final class TFileWriter implements FileAccess.FileWriter } @Override - public long getBytesWritten() throws IOException{ return fsdos.getPos(); } + public long getBytesWritten() throws IOException + { + return fsdos.getPos(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java index 7d0018e..25c0b96 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java @@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter; * * @displayName Parser * @tags parser converter - * @param <INPUT> + * @param <OUTPUT> * @since 3.2.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java index fe9a50f..a84e5c7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java @@ -56,8 +56,7 @@ public abstract class AbstractHttpGetOperator<INPUT, OUTPUT> extends AbstractHtt if (output.isConnected()) { ClientResponse response = wr.get(ClientResponse.class); processResponse(response); - } - else { + } else { wr.get(ClientResponse.class); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java index d7bb503..40ce4bc 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java @@ -28,12 +28,13 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; + /** * This is a base implementation for an HTTP input operator that reads from a given url using the HTTP GET command like an input stream. * Subclasses must implement the method which handles the response to the HTTP GET request. @@ -122,15 +123,13 @@ public abstract class AbstractHttpInputOperator<T> extends SimpleSinglePortInput ClientResponse response = builder.get(ClientResponse.class); processResponse(response); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error reading from " + resource.getURI(), e); } try { Thread.sleep(500); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { LOG.info("Exiting IO loop {}.", e.toString()); break; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java index d5b8de6..60b123b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java @@ -20,14 +20,14 @@ package com.datatorrent.lib.io; import javax.validation.constraints.NotNull; -import com.sun.jersey.api.client.Client; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; +import com.sun.jersey.api.client.Client; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; /** * This is the base implementation for HTTP operators. http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java index 835f2a6..1e14fe1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java @@ -24,11 +24,11 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.util.KeyValPair; /** @@ -62,7 +62,7 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper * This input port receives tuples which are maps. * Each map may have many key value pairs. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<Map<K, V>> input = new DefaultInputPort<Map<K, V>>() { @Override @@ -78,7 +78,7 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper /** * This input port receives tuples which are individual key value pairs. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<KeyValPair<K, V>> inputInd = new DefaultInputPort<KeyValPair<K, V>>() { @Override @@ -141,8 +141,7 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper put(getEndWindowKey(), String.valueOf(currentWindowId)); commitTransaction(); committedWindowId = currentWindowId; - } - else { + } else { LOG.info("Discarding data for window id {} because committed window is {}", currentWindowId, committedWindowId); } } catch (RuntimeException se) { @@ -163,7 +162,8 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper return "_ew:" + appId + ":" + operatorId; } - private void logException(String message, Exception exception) { + private void logException(String message, Exception exception) + { if (continueOnError != 0) { LOG.warn(message, exception); } else { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java index 4f1e3df..450cdcb 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java @@ -23,7 +23,8 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.*; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,8 +160,7 @@ public abstract class AbstractSocketInputOperator<T> implements InputOperator, A channel.configureBlocking(false); channel.connect(new InetSocketAddress(hostname, port)); channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } lock = new ReentrantLock(); @@ -176,8 +176,7 @@ public abstract class AbstractSocketInputOperator<T> implements InputOperator, A selector.close(); scanThread.interrupt(); scanThread.join(); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } @@ -200,11 +199,11 @@ public abstract class AbstractSocketInputOperator<T> implements InputOperator, A SelectionKey nextKey = keyIterator.next(); keyIterator.remove(); if (nextKey.isConnectable()) { - SocketChannel sChannel = (SocketChannel) nextKey.channel(); + SocketChannel sChannel = (SocketChannel)nextKey.channel(); sChannel.finishConnect(); } if (nextKey.isReadable()) { - SocketChannel sChannel = (SocketChannel) nextKey.channel(); + SocketChannel sChannel = (SocketChannel)nextKey.channel(); lock.lock(); acquiredLock = true; sChannel.read(byteBuffer); @@ -215,8 +214,7 @@ public abstract class AbstractSocketInputOperator<T> implements InputOperator, A // Sleep for Scan interval Thread.sleep(scanIntervalInMilliSeconds); } - } - catch (Exception e) { + } catch (Exception e) { if (acquiredLock) { lock.unlock(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java b/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java index d1afc60..84bcb1d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java +++ b/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java @@ -22,10 +22,10 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * Generates apache server log entries. The apache access log has the following @@ -43,7 +43,7 @@ import com.datatorrent.api.Context.OperatorContext; * %b - The number of bytes in the response * %{Referer} - The referer web site reported by the client, "-" if there is none * %{User-agent} - Unique string identifying the client browser e.g., - * "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36" + * "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36" * * Putting it all together a sample log string looks like : * -------------------------------------------------------- @@ -59,153 +59,168 @@ import com.datatorrent.api.Context.OperatorContext; @org.apache.hadoop.classification.InterfaceStability.Evolving public class ApacheGenRandomLogs extends BaseOperator implements InputOperator { - /** + /** * This is the output port which emits generated log strings. */ - public final transient DefaultOutputPort<String> outport = new DefaultOutputPort<String>(); - - // server name/ip-address random variable - private Random rand = new Random(); - - // Apache date format - private static SimpleDateFormat apapcheDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z"); - - // http status codes - private static String [] httpStatusCodes = {"100", "101", "200", "201", "202", "203", "204", "205", "206", "300", "301", - "301", "302", "303", "304", "305", "306", "307", "400", "401", "402", "403", - "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", - "415", "416", "417", "500", "501", "502", "503", "504", "505"}; - - // possible url string formats - private static String[] urlFormats = { - "mydomain.com/home.php", "mydomain.com/products.php", "mydomain.com/products.php?productid=%d", - "mydomain.com/solutions.php", "mydomain.com/solutions.php?solutionid=%d", "mydomain.com/support.php", - "mydomain.com/about.php", "mydomain.com/contactus.php", "mydomain.com/services.php", - "mydomain.com/services.php?serviceid=%d", "mydomain.com/partners.php", "mydomain.com/partners.php?partnerid=%d" - }; - - // browser id - private static String[] browserIds = { - "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/%d Firefox/20.0", - "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/%d Firefox/18.0", - "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.8) Gecko/%d Fedora/1.0.4-4 Firefox/1.0.", - "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.10) Gecko/%d CentOS/1.5.0.10-0.1.el4.centos Firefox/1.5.0.10" - }; - - // generate server name and IP address for server - private int genServerId() - { - return rand.nextInt(10); - } - private String genServerName(int serverId) - { - return new StringBuilder("server").append(new Integer(serverId).toString()).append(".mydomain.com:80").toString(); - } - private String genIpAddress(int serverId) - { - return new StringBuilder().append(rand.nextInt(255)) - .append(".").append(rand.nextInt(255)).append(".").append(rand.nextInt(255)) - .append(".").append(rand.nextInt(255)).toString(); - } - private String getTimeStamp() - { - return new StringBuilder("[").append(apapcheDateFormat.format(new Date())).append("]").toString(); - } - private String genHttpCode() - { - return httpStatusCodes[rand.nextInt(httpStatusCodes.length)]; - } - private String genUrl() - { - String format = urlFormats[rand.nextInt(urlFormats.length)]; - return String.format(format, rand.nextInt(100)); - } - private String genBrowserId() - { - String format = browserIds[rand.nextInt(browserIds.length)]; - return String.format(format, rand.nextInt(100000)); - } - - // generate log string - private String genLogString(String ipAddress, String browserId, String httpCode, String url) - { - // server/ipaddress - int serverId = genServerId(); - String serverName = genServerName(serverId); - if (ipAddress == null) - { - ipAddress = genIpAddress(serverId); - } - - // time - String logTime = getTimeStamp(); - - // url - if (url == null) - { - url = new StringBuilder("\"").append("GET").append(" ").append(genUrl()).append(" ").append("HTTP/1.1").append("\"").toString(); - } - - // http code - if (httpCode == null) - { - httpCode = genHttpCode(); - } - - // number of bytes - int numBytes = rand.nextInt(4000); - - // browser id - if(browserId == null) - { - browserId = genBrowserId(); - } - - // print - return new StringBuilder().append(serverName).append(" ").append(ipAddress).append(" - - ").append(logTime).append(" ").append(url).append(" ") - .append(httpCode).append(" ").append(numBytes).append(" \" \" \"").append(browserId).append("\"").toString(); - } - - @Override - public void beginWindow(long windowId) - { - // TODO Auto-generated method stub - - } - @Override - public void endWindow() - { - // TODO Auto-generated method stub - - } - boolean genTuples; - int attackInterval; - @Override - public void setup(OperatorContext context) - { - genTuples = true; - attackInterval = rand.nextInt(10)+ 1; - } - @Override - public void teardown() - { - genTuples = false; - } - @Override - public void emitTuples() - { - attackInterval--; - String browserId = null; - String ipAdddress = null; - if (attackInterval == 0) - { - browserId = genBrowserId(); - ipAdddress = genIpAddress(rand.nextInt(10)); - attackInterval += rand.nextInt(10) + 1; - for (int i = 0; i < rand.nextInt(3); i++) outport.emit(genLogString(ipAdddress, browserId, "404", null)); - String url = new StringBuilder("\"").append("GET").append(" ").append(genUrl()).append(" ").append("HTTP/1.1").append("\"").toString(); - for (int i = 0; i < rand.nextInt(3); i++) outport.emit(genLogString(ipAdddress, browserId, "404", url)); - } - for (int i = 0; i < rand.nextInt(100000); i++) outport.emit(genLogString(ipAdddress, browserId, null, null)); - } + public final transient DefaultOutputPort<String> outport = new DefaultOutputPort<String>(); + + // server name/ip-address random variable + private Random rand = new Random(); + + // Apache date format + private static SimpleDateFormat apapcheDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z"); + + // http status codes + private static String[] httpStatusCodes = {"100", "101", "200", "201", "202", "203", "204", "205", "206", "300", "301", + "301", "302", "303", "304", "305", "306", "307", "400", "401", "402", "403", + "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", + "415", "416", "417", "500", "501", "502", "503", "504", "505"}; + + // possible url string formats + private static String[] urlFormats = { + "mydomain.com/home.php", "mydomain.com/products.php", "mydomain.com/products.php?productid=%d", + "mydomain.com/solutions.php", "mydomain.com/solutions.php?solutionid=%d", "mydomain.com/support.php", + "mydomain.com/about.php", "mydomain.com/contactus.php", "mydomain.com/services.php", + "mydomain.com/services.php?serviceid=%d", "mydomain.com/partners.php", "mydomain.com/partners.php?partnerid=%d" + }; + + // browser id + private static String[] browserIds = { + "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/%d Firefox/20.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/%d Firefox/18.0", + "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.8) Gecko/%d Fedora/1.0.4-4 Firefox/1.0.", + "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.10) Gecko/%d CentOS/1.5.0.10-0.1.el4.centos Firefox/1.5.0.10" + }; + + // generate server name and IP address for server + private int genServerId() + { + return rand.nextInt(10); + } + + private String genServerName(int serverId) + { + return new StringBuilder("server").append(new Integer(serverId).toString()).append(".mydomain.com:80").toString(); + } + + private String genIpAddress(int serverId) + { + return new StringBuilder().append(rand.nextInt(255)) + .append(".").append(rand.nextInt(255)).append(".").append(rand.nextInt(255)) + .append(".").append(rand.nextInt(255)).toString(); + } + + private String getTimeStamp() + { + return new StringBuilder("[").append(apapcheDateFormat.format(new Date())).append("]").toString(); + } + + private String genHttpCode() + { + return httpStatusCodes[rand.nextInt(httpStatusCodes.length)]; + } + + private String genUrl() + { + String format = urlFormats[rand.nextInt(urlFormats.length)]; + return String.format(format, rand.nextInt(100)); + } + + private String genBrowserId() + { + String format = browserIds[rand.nextInt(browserIds.length)]; + return String.format(format, rand.nextInt(100000)); + } + + // generate log string + private String genLogString(String ipAddress, String browserId, String httpCode, String url) + { + // server/ipaddress + int serverId = genServerId(); + String serverName = genServerName(serverId); + if (ipAddress == null) { + ipAddress = genIpAddress(serverId); + } + + // time + String logTime = getTimeStamp(); + + // url + if (url == null) { + url = new StringBuilder("\"").append("GET").append(" ").append(genUrl()).append(" ").append("HTTP/1.1") + .append("\"").toString(); + } + + // http code + if (httpCode == null) { + httpCode = genHttpCode(); + } + + // number of bytes + int numBytes = rand.nextInt(4000); + + // browser id + if (browserId == null) { + browserId = genBrowserId(); + } + + // print + return new StringBuilder().append(serverName).append(" ").append(ipAddress).append(" - - ").append(logTime) + .append(" ").append(url).append(" ").append(httpCode).append(" ").append(numBytes).append(" \" \" \"") + .append(browserId).append("\"").toString(); + } + + @Override + public void beginWindow(long windowId) + { + // TODO Auto-generated method stub + + } + + @Override + public void endWindow() + { + // TODO Auto-generated method stub + + } + + boolean genTuples; + int attackInterval; + + @Override + public void setup(OperatorContext context) + { + genTuples = true; + attackInterval = rand.nextInt(10) + 1; + } + + @Override + public void teardown() + { + genTuples = false; + } + + @Override + public void emitTuples() + { + attackInterval--; + String browserId = null; + String ipAdddress = null; + if (attackInterval == 0) { + browserId = genBrowserId(); + ipAdddress = genIpAddress(rand.nextInt(10)); + attackInterval += rand.nextInt(10) + 1; + for (int i = 0; i < rand.nextInt(3); i++) { + outport.emit(genLogString(ipAdddress, browserId, "404", null)); + } + String url = new StringBuilder("\"").append("GET").append(" ").append(genUrl()).append(" ").append("HTTP/1.1") + .append("\"").toString(); + for (int i = 0; i < rand.nextInt(3); i++) { + outport.emit(genLogString(ipAdddress, browserId, "404", url)); + } + } + for (int i = 0; i < rand.nextInt(100000); i++) { + outport.emit(genLogString(ipAdddress, browserId, null, null)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java index 1c142a4..36b042f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java @@ -23,8 +23,8 @@ import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; /** * This output operator receives collections as tuples. @@ -62,7 +62,8 @@ public class CollectionMultiConsoleOutputOperator<E> extends BaseOperator /** * This input port which receives collection tuples. */ - public final transient DefaultInputPort<Collection<E>> input = new DefaultInputPort<Collection<E>>() { + public final transient DefaultInputPort<Collection<E>> input = new DefaultInputPort<Collection<E>>() + { @Override public void process(Collection<E> t) { @@ -73,8 +74,9 @@ public class CollectionMultiConsoleOutputOperator<E> extends BaseOperator if (!silent) { System.out.println(obj.toString()); } - if (debug) + if (debug) { logger.info(obj.toString()); + } } System.out.println("}"); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java index 5e72d3d..64046b2 100644 --- a/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java @@ -18,13 +18,13 @@ */ package com.datatorrent.lib.io; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.annotation.Stateless; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; + /** * Writes tuples to stdout of the container. * <p> @@ -55,8 +55,7 @@ public class ConsoleOutputOperator extends BaseOperator String s; if (stringFormat == null) { s = t.toString(); - } - else { + } else { s = String.format(stringFormat, t); } if (!silent) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java index ea1fc8d..ad6f7a6 100644 --- a/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.io; -import com.sun.jersey.api.client.ClientResponse; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -27,12 +26,16 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.commons.io.IOUtils; + import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; + +import com.sun.jersey.api.client.ClientResponse; + /** * This operator reads in JSON data and outputs it as a map. * <p> @@ -75,8 +78,7 @@ public class HttpJsonChunksInputOperator extends AbstractHttpInputOperator<Map<S response.close(); break; } - } - catch (JSONException ex) { + } catch (JSONException ex) { LOG.error("Caught JSON error:", ex); } if (bytesRead == -1) { @@ -108,8 +110,7 @@ public class HttpJsonChunksInputOperator extends AbstractHttpInputOperator<Map<S currentChunkLength = nextLength; //LOG.debug("chunk length: " + line); - } - catch (NumberFormatException e) { + } catch (NumberFormatException e) { // add to chunk chunkStr.append(line); chunkStr.append("\n"); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java index a483df1..c355b16 100644 --- a/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java @@ -18,12 +18,13 @@ */ package com.datatorrent.lib.io; -import com.sun.jersey.api.client.ClientResponse; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import com.sun.jersey.api.client.ClientResponse; + /** * Incoming data is interpreted as lines of plain text and each tuple output is a line in the content. * <p></p> @@ -47,8 +48,7 @@ public class HttpLinesInputOperator extends AbstractHttpInputOperator<String> rawOutput.emit(line); outputPort.emit(line); } - } - finally { + } finally { br.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java index a69761c..57a4d91 100644 --- a/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java @@ -22,10 +22,10 @@ import java.util.Map; import javax.ws.rs.core.MediaType; -import com.sun.jersey.api.client.WebResource; - import org.codehaus.jettison.json.JSONObject; +import com.sun.jersey.api.client.WebResource; + import com.datatorrent.api.Context.OperatorContext; /** @@ -51,8 +51,7 @@ public class HttpPostOutputOperator<T> extends AbstractHttpOperator<T> { if (t instanceof Map) { resource.type(MediaType.APPLICATION_JSON).post(new JSONObject((Map<?, ?>)t).toString()); - } - else { + } else { resource.post(t.toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java index 65bda89..11b68a7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java +++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java @@ -162,7 +162,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { String fileName = status.getPath().getName(); - if(fileName.endsWith(FSStorageAgent.TMP_FILE)) { + if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { continue; } long windowId = Long.parseLong(fileName, 16); @@ -173,8 +173,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex } } } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -255,8 +254,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex deletedOperators.remove(loperator); fs.delete(loperatorPath, true); } - } - else if (loperator == operatorId) { + } else if (loperator == operatorId) { storageAgent.delete(loperator, lwindow); } } @@ -296,7 +294,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex for (IdempotentStorageManager storageManager : newManagers) { - FSIdempotentStorageManager lmanager = (FSIdempotentStorageManager) storageManager; + FSIdempotentStorageManager lmanager = (FSIdempotentStorageManager)storageManager; lmanager.recoveryPath = this.recoveryPath; lmanager.storageAgent = this.storageAgent; @@ -318,7 +316,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex //If some operators were removed then there needs to be a manager which can clean there state when it is not needed. if (deletedOperatorsManager == null) { //None of the managers were handling deleted operators data. - deletedOperatorsManager = (FSIdempotentStorageManager) newManagers.iterator().next(); + deletedOperatorsManager = (FSIdempotentStorageManager)newManagers.iterator().next(); deletedOperatorsManager.deletedOperators = Sets.newHashSet(); } @@ -331,8 +329,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex { try { fs.close(); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java index c461163..1e31552 100644 --- a/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java @@ -23,8 +23,8 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; /** * This operator writes tuples which are maps to standard out of the container. @@ -57,7 +57,8 @@ public class MapMultiConsoleOutputOperator<K, V> extends BaseOperator } private static final Logger logger = LoggerFactory.getLogger(MapMultiConsoleOutputOperator.class); - public final transient DefaultInputPort<Map<K, V>> input = new DefaultInputPort<Map<K, V>>() { + public final transient DefaultInputPort<Map<K, V>> input = new DefaultInputPort<Map<K, V>>() + { @Override public void process(Map<K, V> t) { @@ -66,8 +67,9 @@ public class MapMultiConsoleOutputOperator<K, V> extends BaseOperator if (!silent) { System.out.println(entry.getKey().toString() + "=" + entry.getValue().toString()); } - if (debug) + if (debug) { logger.info(entry.getKey().toString() + "=" + entry.getValue().toString()); + } } System.out.println("}"); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java index 3e8d8d3..7cf883f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java @@ -161,23 +161,18 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St JSONArray ja = jo.names(); //Make sure that only the correct keys are in the first level of JSON - for(int keyIndex = 0; - keyIndex < ja.length(); - keyIndex++) { + for (int keyIndex = 0; keyIndex < ja.length(); keyIndex++) { String key = ja.getString(keyIndex); - if(!(PubSubMessage.DATA_KEY.equals(key) || - PubSubMessage.TOPIC_KEY.equals(key) || - PubSubMessage.TYPE_KEY.equals(key))) { - logger.error("{} is not a valid key in the first level of the following pubsub message:\n{}", - key, - message); + if (!(PubSubMessage.DATA_KEY.equals(key) || + PubSubMessage.TOPIC_KEY.equals(key) || + PubSubMessage.TYPE_KEY.equals(key))) { + logger.error("{} is not a valid key in the first level of the following pubsub message:\n{}", key, message); return null; } } data = jo.getString(PubSubMessage.DATA_KEY); - } - catch(JSONException e) { + } catch (JSONException e) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java index 4d25f49..e1f3fa1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java @@ -19,6 +19,7 @@ package com.datatorrent.lib.io; import java.io.IOException; +import java.net.URI; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -26,10 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; - import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.util.PubSubMessage.PubSubMessageType; -import java.net.URI; /** * This is an app data pub sub result operator. This operator is used to send results to @@ -40,8 +39,9 @@ import java.net.URI; * @tags output, app data, result * @since 3.0.0 */ [email protected](value=true) -public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<String> implements AppData.ConnectionInfoProvider [email protected](value = true) +public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<String> + implements AppData.ConnectionInfoProvider { private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataResult.class); @@ -93,8 +93,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< try { jo = new JSONObject(t); - } - catch(JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } @@ -102,8 +101,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< try { id = jo.getString("id"); - } - catch(JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } @@ -115,8 +113,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< output.put("topic", topic); output.put("data", jo); output.put("type", PubSubMessageType.PUBLISH.getIdentifier()); - } - catch(JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java index 0b56924..4d5fa9a 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java @@ -82,8 +82,7 @@ public class PubSubWebSocketInputOperator<T> extends WebSocketInputOperator<T> super.run(); try { connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(topic, codec)); - } - catch (IOException ex) { + } catch (IOException ex) { LOG.error("Exception caught", ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java index 5bf31ff..12737ba 100644 --- a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java @@ -21,10 +21,13 @@ package com.datatorrent.lib.io; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; -import com.datatorrent.api.*; +import org.apache.commons.lang3.ClassUtils; + import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.common.util.BaseOperator; -import org.apache.commons.lang3.ClassUtils; /** * This an input operator which passes data from an asynchronous data source to a port processing thread. @@ -48,7 +51,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl * The single output port of this input operator. * Collects asynchronously emitted tuples and flushes in container thread. */ - final public transient BufferingOutputPort<T> outputPort; + public final transient BufferingOutputPort<T> outputPort; public SimpleSinglePortInputOperator(int portCapacity) { @@ -61,7 +64,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl } @Override - final public void activate(OperatorContext ctx) + public final void activate(OperatorContext ctx) { isActive = true; if (this instanceof Runnable) { @@ -71,7 +74,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl } @Override - final public void deactivate() + public final void deactivate() { isActive = false; if (ioThread != null) { @@ -80,7 +83,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl } } - final public boolean isActive() + public final boolean isActive() { return isActive; } @@ -115,8 +118,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl { try { tuples.put(tuple); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { throw new RuntimeException(ex); } } @@ -130,6 +132,6 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl } } - }; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java index 6477021..2d6ef86 100644 --- a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java @@ -18,24 +18,32 @@ */ package com.datatorrent.lib.io; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Context.OperatorContext; - -import java.util.*; - -import javax.mail.*; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +import javax.mail.Authenticator; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.PasswordAuthentication; +import javax.mail.Session; +import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import javax.validation.constraints.AssertTrue; import javax.validation.constraints.NotNull; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.StringUtils; + import com.google.common.collect.Maps; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + /** * This operator outputs data to an smtp server. * <p></p> @@ -90,8 +98,7 @@ public class SmtpOutputOperator extends BaseOperator message.setContent(mailContent, contentType); LOG.info("Sending email for tuple {}", t.toString()); Transport.send(message); - } - catch (MessagingException ex) { + } catch (MessagingException ex) { LOG.error("Something wrong with sending email.", ex); } } @@ -264,8 +271,7 @@ public class SmtpOutputOperator extends BaseOperator } message.setSubject(subject); LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients())); - } - catch (MessagingException ex) { + } catch (MessagingException ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index f805dcf..eae9e12 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -29,13 +29,12 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.ClassUtils; - import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient; import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean; import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket; import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener; import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler; +import org.apache.commons.lang3.ClassUtils; import com.datatorrent.api.Context.OperatorContext; @@ -60,8 +59,8 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> //Do not make this @NotNull since null is a valid value for some child classes private URI uri; private transient AsyncHttpClient client; - private transient final JsonFactory jsonFactory = new JsonFactory(); - protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); + private final transient JsonFactory jsonFactory = new JsonFactory(); + protected final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); protected transient WebSocket connection; private transient boolean connectionClosed = false; private transient volatile boolean shutdown = false; @@ -121,8 +120,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> shutdown = false; monThread = new MonitorThread(); monThread.start(); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } @@ -135,8 +133,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> if (monThread != null) { monThread.join(); } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Error joining monitor", ex); } @@ -171,8 +168,8 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> connection.close(); WebSocketInputOperator.this.activate(null); } - } - catch (Exception ex) { + } catch (Exception ex) { + //swallowing exception } } } @@ -213,11 +210,10 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> LOG.debug("Got: " + string); try { T o = convertMessage(string); - if(!(skipNull && o == null)) { + if (!(skipNull && o == null)) { outputPort.emit(o); } - } - catch (IOException ex) { + } catch (IOException ex) { LOG.error("Got exception: ", ex); } } @@ -242,8 +238,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> } }).build()).get(5, TimeUnit.SECONDS); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Error reading from " + uri, ex); if (client != null) { client.close(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java index b793183..382be67 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java @@ -38,9 +38,9 @@ import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListen import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler; import org.apache.commons.lang3.ClassUtils; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; /** * Reads via WebSocket from given URL as input stream. Incoming data is interpreted as JSONObject and converted to {@link java.util.Map}. @@ -59,8 +59,8 @@ public class WebSocketOutputOperator<T> extends BaseOperator //Do not make this @NotNull since null is a valid value for some child classes private URI uri; private transient AsyncHttpClient client; - private transient final JsonFactory jsonFactory = new JsonFactory(); - protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); + private final transient JsonFactory jsonFactory = new JsonFactory(); + protected final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); protected transient WebSocket connection; private int ioThreadMultiplier = 1; private int numRetries = 3; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java index bf1b40e..2814b49 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java @@ -18,18 +18,20 @@ */ package com.datatorrent.lib.io; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.InputOperator; -import com.datatorrent.netlet.util.DTThrowable; import javax.servlet.http.HttpServletRequest; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; + import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketServlet; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.InputOperator; +import com.datatorrent.netlet.util.DTThrowable; + @org.apache.hadoop.classification.InterfaceStability.Evolving /** * @since 3.3.0 @@ -75,8 +77,7 @@ public abstract class WebSocketServerInputOperator implements InputOperator try { server.start(); - } - catch(Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } } @@ -86,8 +87,7 @@ public abstract class WebSocketServerInputOperator implements InputOperator { try { server.stop(); - } - catch(Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java index 5b6259c..b027b58 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java @@ -25,17 +25,16 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import com.google.common.collect.Maps; - import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; +import com.google.common.collect.Maps; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; - import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.PubSubMessageCodec; @@ -60,16 +59,15 @@ import com.datatorrent.common.util.PubSubMessageCodec; @org.apache.hadoop.classification.InterfaceStability.Evolving public class WidgetOutputOperator extends BaseOperator { - protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new WebSocketOutputOperator<Pair<String,Object>>(){ - + protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new WebSocketOutputOperator<Pair<String, Object>>() + { private transient PubSubMessageCodec<Object> codec = new PubSubMessageCodec<>(mapper); @Override - public String convertMapToMessage(Pair<String,Object> t) throws IOException + public String convertMapToMessage(Pair<String, Object> t) throws IOException { return PubSubMessageCodec.constructPublishMessage(t.getLeft(), t.getRight(), codec); } - }; protected transient ConsoleOutputOperator coo = new ConsoleOutputOperator(); @@ -99,31 +97,31 @@ public class WidgetOutputOperator extends BaseOperator /** * Tuples received on this input port will be sent to a Simple Widget for display. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient SimpleInputPort simpleInput = new SimpleInputPort(this); /** * Tuples received on this input port will be sent to a Time Series Widget for display. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient TimeseriesInputPort timeSeriesInput = new TimeseriesInputPort(this); /** * Tuples received on this input port will be sent to a Percentage Widget. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient PercentageInputPort percentageInput = new PercentageInputPort(this); /** * Tuples received on this input port will be sent to a Top N Widget for display. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient TopNInputPort topNInput = new TopNInputPort(this); /** * Tuples received on this input port will be sent to a Pie Chart Widget for display. */ - @InputPortFieldAnnotation(optional=true) + @InputPortFieldAnnotation(optional = true) public final transient PiechartInputPort pieChartInput = new PiechartInputPort(this); protected transient boolean isWebSocketConnected = true; @@ -132,7 +130,7 @@ public class WidgetOutputOperator extends BaseOperator public void setup(OperatorContext context) { String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if(!StringUtils.isEmpty(gatewayAddress)){ + if (!StringUtils.isEmpty(gatewayAddress)) { wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub")); wsoo.setup(context); } else { @@ -205,8 +203,8 @@ public class WidgetOutputOperator extends BaseOperator } - public static class TopNInputPort extends DefaultInputPort<HashMap<String, Number>>{ - + public static class TopNInputPort extends DefaultInputPort<HashMap<String, Number>> + { private final WidgetOutputOperator operator; public TopNInputPort(WidgetOutputOperator oper) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index b5b20e4..bf1605d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -96,8 +96,8 @@ import com.datatorrent.lib.util.KryoCloneUtils; * @param <T> The type of the object that this input operator reads. * @since 1.0.2 */ -public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, - Operator.CheckpointListener +public abstract class AbstractFileInputOperator<T> + implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); @@ -110,11 +110,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par protected String currentFile; protected Set<String> processedFiles = new HashSet<String>(); protected int emitBatchSize = 1000; - protected int currentPartitions = 1 ; + protected int currentPartitions = 1; protected int partitionCount = 1; private int retryCount = 0; private int maxRetryCount = 5; - transient protected int skipCount = 0; + protected transient int skipCount = 0; private transient OperatorContext context; private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); @@ -140,7 +140,8 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par * failed file is retried for maxRetryCount number of times, after that the file is * ignored. */ - protected static class FailedFile { + protected static class FailedFile + { String path; int offset; int retryCount; @@ -150,13 +151,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par @SuppressWarnings("unused") protected FailedFile() {} - protected FailedFile(String path, int offset) { + protected FailedFile(String path, int offset) + { this.path = path; this.offset = offset; this.retryCount = 0; } - protected FailedFile(String path, int offset, int retryCount) { + protected FailedFile(String path, int offset, int retryCount) + { this.path = path; this.offset = offset; this.retryCount = retryCount; @@ -266,7 +269,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par * <p/> * @since 1.0.4 */ - public final static class FileCountersAggregator implements CountersAggregator, Serializable + public static final class FileCountersAggregator implements CountersAggregator, Serializable { private static final long serialVersionUID = 201409041428L; MutableLong totalLocalProcessedFiles = new MutableLong(); @@ -278,11 +281,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par @SuppressWarnings("unchecked") public Object aggregate(Collection<?> countersList) { - if(countersList.isEmpty()) { + if (countersList.isEmpty()) { return null; } - BasicCounters<MutableLong> tempFileCounters = (BasicCounters<MutableLong>) countersList.iterator().next(); + BasicCounters<MutableLong> tempFileCounters = (BasicCounters<MutableLong>)countersList.iterator().next(); MutableLong globalProcessedFiles = tempFileCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES); MutableLong globalNumberOfFailures = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES); MutableLong globalNumberOfRetries = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES); @@ -291,8 +294,8 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par totalLocalNumberOfFailures.setValue(0); totalLocalNumberOfRetries.setValue(0); - for(Object fileCounters: countersList) { - BasicCounters<MutableLong> basicFileCounters = (BasicCounters<MutableLong>) fileCounters; + for (Object fileCounters : countersList) { + BasicCounters<MutableLong> basicFileCounters = (BasicCounters<MutableLong>)fileCounters; totalLocalProcessedFiles.add(basicFileCounters.getCounter(FileCounters.LOCAL_PROCESSED_FILES)); pendingFiles.add(basicFileCounters.getCounter(FileCounters.PENDING_FILES)); totalLocalNumberOfFailures.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES)); @@ -441,25 +444,17 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par filePath = new Path(directory); configuration = new Configuration(); fs = getFSInstance(); - } - catch (IOException ex) { + } catch (IOException ex) { failureHandling(ex); } - fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, - globalProcessedFileCount); - fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, - localProcessedFileCount); - fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, - globalNumberOfFailures); - fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, - localNumberOfFailures); - fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, - globalNumberOfRetries); - fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, - localNumberOfRetries); - fileCounters.setCounter(FileCounters.PENDING_FILES, - pendingFileCount); + fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, globalProcessedFileCount); + fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, localProcessedFileCount); + fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, globalNumberOfFailures); + fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, localNumberOfFailures); + fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, globalNumberOfRetries); + fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, localNumberOfRetries); + fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount); idempotentStorageManager.setup(context); if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { @@ -487,11 +482,10 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par boolean fileFailed = false; try { - if(inputStream != null) { + if (inputStream != null) { inputStream.close(); } - } - catch (IOException ex) { + } catch (IOException ex) { savedException = ex; fileFailed = true; } @@ -500,20 +494,19 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par try { fs.close(); - } - catch (IOException ex) { + } catch (IOException ex) { savedException = ex; fsFailed = true; } - if(savedException != null) { + if (savedException != null) { String errorMessage = ""; - if(fileFailed) { + if (fileFailed) { errorMessage += "Failed to close " + currentFile + ". "; } - if(fsFailed) { + if (fsFailed) { errorMessage += "Failed to close filesystem."; } @@ -537,18 +530,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { try { idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("saving recovery", e); } } currentWindowRecoveryState.clear(); - if(context != null) { - pendingFileCount.setValue(pendingFiles.size() + - failedFiles.size() + - unfinishedFiles.size()); + if (context != null) { + pendingFileCount.setValue(pendingFiles.size() + failedFiles.size() + unfinishedFiles.size()); - if(currentFile != null) { + if (currentFile != null) { pendingFileCount.increment(); } @@ -567,7 +557,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par for (Object recovery : recoveryDataPerOperator.values()) { @SuppressWarnings("unchecked") - LinkedList<RecoveryEntry> recoveryData = (LinkedList<RecoveryEntry>) recovery; + LinkedList<RecoveryEntry> recoveryData = (LinkedList<RecoveryEntry>)recovery; for (RecoveryEntry recoveryEntry : recoveryData) { if (scanner.acceptFile(recoveryEntry.file)) { @@ -606,8 +596,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par offset++; emit(line); } - } - else { + } else { while (offset < recoveryEntry.endOffset) { T line = readEntity(); offset++; @@ -617,8 +606,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } } } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("replay", e); } } @@ -645,24 +633,20 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par offset = 0; skipCount = 0; } - } - else if (!unfinishedFiles.isEmpty()) { + } else if (!unfinishedFiles.isEmpty()) { retryFailedFile(unfinishedFiles.poll()); - } - else if (!pendingFiles.isEmpty()) { + } else if (!pendingFiles.isEmpty()) { String newPathString = pendingFiles.iterator().next(); pendingFiles.remove(newPathString); - if (fs.exists(new Path(newPathString))) + if (fs.exists(new Path(newPathString))) { this.inputStream = openFile(new Path(newPathString)); - } - else if (!failedFiles.isEmpty()) { + } + } else if (!failedFiles.isEmpty()) { retryFailedFile(failedFiles.poll()); - } - else { + } else { scanDirectory(); } - } - catch (IOException ex) { + } catch (IOException ex) { failureHandling(ex); } } @@ -687,13 +671,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par if (skipCount == 0) { offset++; emit(line); - } - else { + } else { skipCount--; } } - } - catch (IOException e) { + } catch (IOException e) { failureHandling(e); } //Only when something was emitted from the file then we record it for entry. @@ -708,10 +690,10 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ protected void scanDirectory() { - if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) { + if (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) { Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles); - for(Path newPath : newPaths) { + for (Path newPath : newPaths) { String newPathString = newPath.toString(); pendingFiles.add(newPathString); processedFiles.add(newPathString); @@ -729,27 +711,28 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par private void failureHandling(Exception e) { localNumberOfFailures.increment(); - if(maxRetryCount <= 0) { + if (maxRetryCount <= 0) { throw new RuntimeException(e); } LOG.error("FS reader error", e); addToFailedList(); } - protected void addToFailedList() { - + protected void addToFailedList() + { FailedFile ff = new FailedFile(currentFile, offset, retryCount); try { // try to close file - if (this.inputStream != null) + if (this.inputStream != null) { this.inputStream.close(); - } catch(IOException e) { + } + } catch (IOException e) { localNumberOfFailures.increment(); LOG.error("Could not close input stream on: " + currentFile); } - ff.retryCount ++; + ff.retryCount++; ff.lastFailedTime = System.currentTimeMillis(); ff.offset = this.offset; @@ -757,8 +740,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par this.currentFile = null; this.inputStream = null; - if (ff.retryCount > maxRetryCount) + if (ff.retryCount > maxRetryCount) { return; + } localNumberOfRetries.increment(); LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount); @@ -769,8 +753,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par { LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount); String path = ff.path; - if (!fs.exists(new Path(path))) + if (!fs.exists(new Path(path))) { return null; + } this.inputStream = openFile(new Path(path)); this.offset = ff.offset; this.retryCount = ff.retryCount; @@ -793,8 +778,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par { LOG.info("closing file {} offset {}", currentFile, offset); - if (is != null) + if (is != null) { is.close(); + } currentFile = null; inputStream = null; @@ -828,7 +814,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par List<String> totalPendingFiles = Lists.newLinkedList(); Set<Integer> deletedOperators = Sets.newHashSet(); - for(Partition<AbstractFileInputOperator<T>> partition : partitions) { + for (Partition<AbstractFileInputOperator<T>> partition : partitions) { AbstractFileInputOperator<T> oper = partition.getPartitionedInstance(); totalProcessedFiles.addAll(oper.processedFiles); totalFailedFiles.addAll(oper.failedFiles); @@ -853,7 +839,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par Collection<IdempotentStorageManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount); KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); - for (int i=0; i<scanners.size(); i++) { + for (int i = 0; i < scanners.size(); i++) { @SuppressWarnings("unchecked") AbstractFileInputOperator<T> oper = cloneUtils.getClone(); @@ -873,7 +859,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par oper.currentFile = null; oper.offset = 0; Iterator<FailedFile> unfinishedIter = currentFiles.iterator(); - while(unfinishedIter.hasNext()) { + while (unfinishedIter.hasNext()) { FailedFile unfinishedFile = unfinishedIter.next(); if (scn.acceptFile(unfinishedFile.path)) { oper.unfinishedFiles.add(unfinishedFile); @@ -895,9 +881,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par /* redistribute pending files properly */ oper.pendingFiles.clear(); Iterator<String> pendingFilesIterator = totalPendingFiles.iterator(); - while(pendingFilesIterator.hasNext()) { + while (pendingFilesIterator.hasNext()) { String pathString = pendingFilesIterator.next(); - if(scn.acceptFile(pathString)) { + if (scn.acceptFile(pathString)) { oper.pendingFiles.add(pathString); pendingFilesIterator.remove(); } @@ -932,8 +918,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par { try { idempotentStorageManager.deleteUpTo(operatorId, windowId); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -941,16 +926,18 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par /** * Read the next item from the stream. Depending on the type of stream, this could be a byte array, line or object. * Upon return of null, the stream will be considered fully consumed. - * @throws IOException + * * @return Depending on the type of stream an object is returned. When null is returned the stream is consumed. + * @throws IOException */ - abstract protected T readEntity() throws IOException; + protected abstract T readEntity() throws IOException; /** * Emit the tuple on the port + * * @param tuple */ - abstract protected void emit(T tuple); + protected abstract void emit(T tuple); /** @@ -1017,17 +1004,21 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par this.regex = null; } - public int getPartitionCount() { + public int getPartitionCount() + { return partitionCount; } - public int getPartitionIndex() { + public int getPartitionIndex() + { return partitionIndex; } - protected Pattern getRegex() { - if (this.regex == null && this.filePatternRegexp != null) + protected Pattern getRegex() + { + if (this.regex == null && this.filePatternRegexp != null) { this.regex = Pattern.compile(this.filePatternRegexp); + } return this.regex; } @@ -1037,8 +1028,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par try { LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp); FileStatus[] files = fs.listStatus(filePath); - for (FileStatus status : files) - { + for (FileStatus status : files) { Path path = status.getPath(); String filePathStr = path.toString(); @@ -1081,8 +1071,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } } Pattern regex = this.getRegex(); - if (regex != null) - { + if (regex != null) { Matcher matcher = regex.matcher(filePathStr); if (!matcher.matches()) { return false; @@ -1094,13 +1083,14 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par public List<DirectoryScanner> partition(int count) { ArrayList<DirectoryScanner> partitions = Lists.newArrayListWithExpectedSize(count); - for (int i=0; i<count; i++) { + for (int i = 0; i < count; i++) { partitions.add(this.createPartition(i, count)); } return partitions; } - public List<DirectoryScanner> partition(int count , @SuppressWarnings("unused") Collection<DirectoryScanner> scanners) { + public List<DirectoryScanner> partition(int count, @SuppressWarnings("unused") Collection<DirectoryScanner> scanners) + { return partition(count); } @@ -1153,7 +1143,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par return false; } - RecoveryEntry that = (RecoveryEntry) o; + RecoveryEntry that = (RecoveryEntry)o; if (endOffset != that.endOffset) { return false; @@ -1192,7 +1182,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ public static class FileLineInputOperator extends AbstractFileInputOperator<String> { - public transient final DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); protected transient BufferedReader br;
