http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
index 353f1b2..bf0d089 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java
@@ -18,29 +18,27 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
-
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-
 import java.util.List;
 import java.util.Map;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
 import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
 import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
 import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.Context;
-
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
 import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java
index 835bcdd..89022a3 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStore.java
@@ -60,8 +60,7 @@ public class JdbcNonTransactionalStore extends 
JdbcTransactionalStore
 
     try {
       connection.setAutoCommit(true);
-    }
-    catch(SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -71,10 +70,9 @@ public class JdbcNonTransactionalStore extends 
JdbcTransactionalStore
   {
     Long lastWindowCommit = getCommittedWindowIdHelper(appId, operatorId);
 
-    if(lastWindowCommit == null) {
+    if (lastWindowCommit == null) {
       return -1L;
-    }
-    else {
+    } else {
       return lastWindowCommit;
     }
   }
@@ -87,8 +85,7 @@ public class JdbcNonTransactionalStore extends 
JdbcTransactionalStore
 
       lastWindowFetchCommand.close();
       lastWindowInsertCommand.close();
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java 
b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
index 7fff4e0..ea8f174 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
@@ -19,7 +19,6 @@
 package com.datatorrent.lib.fileaccess;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.TreeMap;
 
 import org.apache.hadoop.classification.InterfaceStability;
@@ -97,7 +96,9 @@ public class DTFileReader implements FileAccess.FileReader
   @Override
   public boolean next(Slice key, Slice value) throws IOException
   {
-    if (scanner.atEnd()) return false;
+    if (scanner.atEnd()) {
+      return false;
+    }
     Entry en = scanner.entry();
 
     key.buffer = en.getBlockBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java 
b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
index a9cfe00..7184a82 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -61,7 +61,8 @@ public abstract class FileAccessFSImpl implements FileAccess
     this.basePath = path;
   }
 
-  protected Path getFilePath(long bucketKey, String fileName) {
+  protected Path getFilePath(long bucketKey, String fileName)
+  {
     return new Path(getBucketPath(bucketKey), fileName);
   }
 
@@ -71,7 +72,8 @@ public abstract class FileAccessFSImpl implements FileAccess
   }
 
   @Override
-  public long getFileSize(long bucketKey, String fileName) throws IOException {
+  public long getFileSize(long bucketKey, String fileName) throws IOException
+  {
     return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java 
b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
index 2a3fd0e..7dfe4e9 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -143,14 +143,14 @@ public abstract class TFileImpl extends FileAccessFSImpl
   
   /**
    * Return {@link TFile} {@link Reader}
-   *
    */
-  public static class DefaultTFileImpl extends TFileImpl{
+  public static class DefaultTFileImpl extends TFileImpl
+  {
     
     @Override
     public FileReader getReader(long bucketKey, String fileName) throws 
IOException
     {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
       long fileLength = getFileSize(bucketKey, fileName);
       super.setupConfig(fs.getConf());
       return new TFileReader(fsdis, fileLength, fs.getConf());
@@ -158,17 +158,16 @@ public abstract class TFileImpl extends FileAccessFSImpl
     
   }
   
-  
   /**
    * Return {@link DTFile} {@link 
org.apache.hadoop.io.file.tfile.DTFile.Reader}
-   *
    */
-  public static class DTFileImpl extends TFileImpl {
+  public static class DTFileImpl extends TFileImpl
+  {
     
     @Override
     public FileReader getReader(long bucketKey, String fileName) throws 
IOException
     {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      FSDataInputStream fsdis = getInputStream(bucketKey, fileName);
       long fileLength = getFileSize(bucketKey, fileName);
       super.setupConfig(fs.getConf());
       return new DTFileReader(fsdis, fileLength, fs.getConf());

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java 
b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
index 9ab6f82..0f0c92a 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
@@ -93,17 +93,20 @@ public class TFileReader implements FileAccess.FileReader
     try {
       return scanner.seekTo(key.buffer, key.offset, key.length);
     } catch (NullPointerException ex) {
-      if (closed)
+      if (closed) {
         throw new IOException("Stream was closed");
-      else
+      } else {
         throw ex;
+      }
     }
   }
 
   @Override
   public boolean next(Slice key, Slice value) throws IOException
   {
-    if (scanner.atEnd()) return false;
+    if (scanner.atEnd()) {
+      return false;
+    }
     Entry en = scanner.entry();
     byte[] rkey = new byte[en.getKeyLength()];
     byte[] rval = new byte[en.getValueLength()];

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java 
b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
index 6566ae0..7e9d544 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -37,7 +37,8 @@ public final class TFileWriter implements 
FileAccess.FileWriter
   
   private FSDataOutputStream fsdos;
   
-  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String 
compressName, String comparator, Configuration conf) throws IOException
+  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String 
compressName,
+      String comparator, Configuration conf) throws IOException
   {
     this.fsdos = stream;
     writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
@@ -58,6 +59,9 @@ public final class TFileWriter implements 
FileAccess.FileWriter
   }
 
   @Override
-  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+  public long getBytesWritten() throws IOException
+  {
+    return fsdos.getPos();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java 
b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
index 7d0018e..25c0b96 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
@@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter;
  * 
  * @displayName Parser
  * @tags parser converter
- * @param <INPUT>
+ * @param <OUTPUT>
  * @since 3.2.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java
index fe9a50f..a84e5c7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpGetOperator.java
@@ -56,8 +56,7 @@ public abstract class AbstractHttpGetOperator<INPUT, OUTPUT> 
extends AbstractHtt
     if (output.isConnected()) {
       ClientResponse response = wr.get(ClientResponse.class);
       processResponse(response);
-    }
-    else {
+    } else {
       wr.get(ClientResponse.class);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java
index d7bb503..40ce4bc 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpInputOperator.java
@@ -28,12 +28,13 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+
 /**
  * This is a base implementation for an HTTP input operator that reads from a 
given url using the HTTP GET command like an input stream.&nbsp;
  * Subclasses must implement the method which handles the response to the HTTP 
GET request.
@@ -122,15 +123,13 @@ public abstract class AbstractHttpInputOperator<T> 
extends SimpleSinglePortInput
 
         ClientResponse response = builder.get(ClientResponse.class);
         processResponse(response);
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         LOG.error("Error reading from " + resource.getURI(), e);
       }
 
       try {
         Thread.sleep(500);
-      }
-      catch (InterruptedException e) {
+      } catch (InterruptedException e) {
         LOG.info("Exiting IO loop {}.", e.toString());
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java
index d5b8de6..60b123b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/AbstractHttpOperator.java
@@ -20,14 +20,14 @@ package com.datatorrent.lib.io;
 
 import javax.validation.constraints.NotNull;
 
-import com.sun.jersey.api.client.Client;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.sun.jersey.api.client.Client;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This is the base implementation for HTTP operators.&nbsp;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java
index 835f2a6..1e14fe1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.java
@@ -24,11 +24,11 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
 /**
@@ -62,7 +62,7 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, 
V> extends BaseOper
    * This input port receives tuples which are maps.
    * Each map may have many key value pairs.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient DefaultInputPort<Map<K, V>> input = new 
DefaultInputPort<Map<K, V>>()
   {
     @Override
@@ -78,7 +78,7 @@ public abstract class AbstractKeyValueStoreOutputOperator<K, 
V> extends BaseOper
   /**
    * This input port receives tuples which are individual key value pairs.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient DefaultInputPort<KeyValPair<K, V>> inputInd = new 
DefaultInputPort<KeyValPair<K, V>>()
   {
     @Override
@@ -141,8 +141,7 @@ public abstract class 
AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper
         put(getEndWindowKey(), String.valueOf(currentWindowId));
         commitTransaction();
         committedWindowId = currentWindowId;
-      }
-      else {
+      } else {
         LOG.info("Discarding data for window id {} because committed window is 
{}", currentWindowId, committedWindowId);
       }
     } catch (RuntimeException se) {
@@ -163,7 +162,8 @@ public abstract class 
AbstractKeyValueStoreOutputOperator<K, V> extends BaseOper
     return "_ew:" + appId + ":" + operatorId;
   }
 
-  private void logException(String message, Exception exception) {
+  private void logException(String message, Exception exception)
+  {
     if (continueOnError != 0) {
       LOG.warn(message, exception);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java
index 4f1e3df..450cdcb 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/AbstractSocketInputOperator.java
@@ -23,7 +23,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.*;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -159,8 +160,7 @@ public abstract class AbstractSocketInputOperator<T> 
implements InputOperator, A
       channel.configureBlocking(false);
       channel.connect(new InetSocketAddress(hostname, port));
       channel.register(selector, SelectionKey.OP_CONNECT | 
SelectionKey.OP_READ);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
     lock = new ReentrantLock();
@@ -176,8 +176,7 @@ public abstract class AbstractSocketInputOperator<T> 
implements InputOperator, A
       selector.close();
       scanThread.interrupt();
       scanThread.join();
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -200,11 +199,11 @@ public abstract class AbstractSocketInputOperator<T> 
implements InputOperator, A
             SelectionKey nextKey = keyIterator.next();
             keyIterator.remove();
             if (nextKey.isConnectable()) {
-              SocketChannel sChannel = (SocketChannel) nextKey.channel();
+              SocketChannel sChannel = (SocketChannel)nextKey.channel();
               sChannel.finishConnect();
             }
             if (nextKey.isReadable()) {
-              SocketChannel sChannel = (SocketChannel) nextKey.channel();
+              SocketChannel sChannel = (SocketChannel)nextKey.channel();
               lock.lock();
               acquiredLock = true;
               sChannel.read(byteBuffer);
@@ -215,8 +214,7 @@ public abstract class AbstractSocketInputOperator<T> 
implements InputOperator, A
           // Sleep for Scan interval
           Thread.sleep(scanIntervalInMilliSeconds);
         }
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         if (acquiredLock) {
           lock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java 
b/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java
index d1afc60..84bcb1d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java
+++ b/library/src/main/java/com/datatorrent/lib/io/ApacheGenRandomLogs.java
@@ -22,10 +22,10 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Random;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Generates apache server log entries. The apache access log has the following
@@ -43,7 +43,7 @@ import com.datatorrent.api.Context.OperatorContext;
  * %b - The number of bytes in the response
  * %{Referer} - The referer web site reported by the client, "-" if there is 
none
  * %{User-agent} - Unique string identifying the client browser e.g.,
- *                                                     "Mozilla/5.0 (Windows 
NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36"
+ * "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) 
Chrome/28.0.1468.0 Safari/537.36"
  *
  * Putting it all together a sample log string looks like :
  * --------------------------------------------------------
@@ -59,153 +59,168 @@ import com.datatorrent.api.Context.OperatorContext;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public class ApacheGenRandomLogs extends BaseOperator implements InputOperator
 {
-       /**
+  /**
    * This is the output port which emits generated log strings.
    */
-       public final transient DefaultOutputPort<String> outport = new 
DefaultOutputPort<String>();
-
-       // server name/ip-address  random variable
-       private Random rand = new Random();
-
-       // Apache date format
-       private static SimpleDateFormat apapcheDateFormat = new 
SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
-
-       // http status codes
-       private static String [] httpStatusCodes = {"100", "101", "200", "201", 
"202", "203", "204", "205", "206", "300", "301",
-                                                                               
                                                                                
                        "301", "302", "303", "304", "305", "306", "307", "400", 
"401", "402", "403",
-                                                                               
                                                                                
                        "405", "406", "407", "408", "409", "410", "411", "412", 
"413", "414",
-                                                                               
                                                                                
                        "415", "416", "417", "500", "501", "502", "503", "504", 
"505"};
-
-       // possible url string formats
-       private static String[] urlFormats = {
-               "mydomain.com/home.php", "mydomain.com/products.php", 
"mydomain.com/products.php?productid=%d",
-               "mydomain.com/solutions.php", 
"mydomain.com/solutions.php?solutionid=%d", "mydomain.com/support.php",
-               "mydomain.com/about.php", "mydomain.com/contactus.php", 
"mydomain.com/services.php",
-               "mydomain.com/services.php?serviceid=%d", 
"mydomain.com/partners.php", "mydomain.com/partners.php?partnerid=%d"
-       };
-
-       // browser id
-       private static String[] browserIds = {
-               "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/%d 
Firefox/20.0",
-               "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/%d 
Firefox/18.0",
-               "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.8) Gecko/%d 
Fedora/1.0.4-4 Firefox/1.0.",
-               "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.10) Gecko/%d 
CentOS/1.5.0.10-0.1.el4.centos Firefox/1.5.0.10"
-       };
-
-       // generate server name and IP address for server
-       private int genServerId()
-       {
-               return rand.nextInt(10);
-       }
-       private String genServerName(int serverId)
-       {
-               return new StringBuilder("server").append(new 
Integer(serverId).toString()).append(".mydomain.com:80").toString();
-       }
-       private String genIpAddress(int serverId)
-       {
-               return new StringBuilder().append(rand.nextInt(255))
-                                                                       
.append(".").append(rand.nextInt(255)).append(".").append(rand.nextInt(255))
-                                                                       
.append(".").append(rand.nextInt(255)).toString();
-       }
-       private String getTimeStamp()
-       {
-               return new 
StringBuilder("[").append(apapcheDateFormat.format(new 
Date())).append("]").toString();
-       }
-       private String genHttpCode()
-       {
-               return httpStatusCodes[rand.nextInt(httpStatusCodes.length)];
-       }
-       private String genUrl()
-       {
-               String format = urlFormats[rand.nextInt(urlFormats.length)];
-               return String.format(format, rand.nextInt(100));
-       }
-       private String genBrowserId()
-       {
-               String format = browserIds[rand.nextInt(browserIds.length)];
-               return String.format(format, rand.nextInt(100000));
-       }
-
-       // generate log string
-       private String genLogString(String ipAddress, String browserId, String 
httpCode, String url)
-       {
-               // server/ipaddress
-               int serverId = genServerId();
-               String serverName = genServerName(serverId);
-               if (ipAddress == null)
-               {
-                 ipAddress = genIpAddress(serverId);
-               }
-
-               // time
-               String logTime = getTimeStamp();
-
-               // url
-         if (url == null)
-         {
-                 url = new StringBuilder("\"").append("GET").append(" 
").append(genUrl()).append(" ").append("HTTP/1.1").append("\"").toString();
-         }
-
-               // http code
-               if (httpCode == null)
-               {
-                 httpCode = genHttpCode();
-               }
-
-               // number of bytes
-               int numBytes = rand.nextInt(4000);
-
-               // browser id
-               if(browserId == null)
-               {
-                       browserId = genBrowserId();
-               }
-
-               // print
-               return new StringBuilder().append(serverName).append(" 
").append(ipAddress).append(" - - ").append(logTime).append(" 
").append(url).append(" ")
-                                        .append(httpCode).append(" 
").append(numBytes).append(" \" \" 
\"").append(browserId).append("\"").toString();
-       }
-
-       @Override
-       public void beginWindow(long windowId)
-       {
-               // TODO Auto-generated method stub
-
-       }
-       @Override
-       public void endWindow()
-       {
-               // TODO Auto-generated method stub
-
-       }
-       boolean genTuples;
-       int attackInterval;
-       @Override
-       public void setup(OperatorContext context)
-       {
-               genTuples = true;
-               attackInterval = rand.nextInt(10)+ 1;
-       }
-       @Override
-       public void teardown()
-       {
-               genTuples = false;
-       }
-       @Override
-       public void emitTuples()
-       {
-               attackInterval--;
-               String browserId = null;
-               String ipAdddress = null;
-               if (attackInterval == 0)
-               {
-                       browserId = genBrowserId();
-                       ipAdddress = genIpAddress(rand.nextInt(10));
-                       attackInterval += rand.nextInt(10) + 1;
-                       for (int i = 0; i < rand.nextInt(3); i++) 
outport.emit(genLogString(ipAdddress, browserId, "404", null));
-                       String url = new 
StringBuilder("\"").append("GET").append(" ").append(genUrl()).append(" 
").append("HTTP/1.1").append("\"").toString();
-                       for (int i = 0; i < rand.nextInt(3); i++) 
outport.emit(genLogString(ipAdddress, browserId, "404", url));
-               }
-               for (int i = 0; i < rand.nextInt(100000); i++) 
outport.emit(genLogString(ipAdddress, browserId, null, null));
-       }
+  public final transient DefaultOutputPort<String> outport = new 
DefaultOutputPort<String>();
+
+  // server name/ip-address  random variable
+  private Random rand = new Random();
+
+  // Apache date format
+  private static SimpleDateFormat apapcheDateFormat = new 
SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
+
+  // http status codes
+  private static String[] httpStatusCodes = {"100", "101", "200", "201", 
"202", "203", "204", "205", "206", "300", "301",
+      "301", "302", "303", "304", "305", "306", "307", "400", "401", "402", 
"403",
+      "405", "406", "407", "408", "409", "410", "411", "412", "413", "414",
+      "415", "416", "417", "500", "501", "502", "503", "504", "505"};
+
+  // possible url string formats
+  private static String[] urlFormats = {
+    "mydomain.com/home.php", "mydomain.com/products.php", 
"mydomain.com/products.php?productid=%d",
+    "mydomain.com/solutions.php", "mydomain.com/solutions.php?solutionid=%d", 
"mydomain.com/support.php",
+    "mydomain.com/about.php", "mydomain.com/contactus.php", 
"mydomain.com/services.php",
+    "mydomain.com/services.php?serviceid=%d", "mydomain.com/partners.php", 
"mydomain.com/partners.php?partnerid=%d"
+  };
+
+  // browser id
+  private static String[] browserIds = {
+    "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:20.0) Gecko/%d Firefox/20.0",
+    "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/%d Firefox/18.0",
+    "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.8) Gecko/%d Fedora/1.0.4-4 
Firefox/1.0.",
+    "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.10) Gecko/%d 
CentOS/1.5.0.10-0.1.el4.centos Firefox/1.5.0.10"
+  };
+
+  // generate server name and IP address for server
+  private int genServerId()
+  {
+    return rand.nextInt(10);
+  }
+
+  private String genServerName(int serverId)
+  {
+    return new StringBuilder("server").append(new 
Integer(serverId).toString()).append(".mydomain.com:80").toString();
+  }
+
+  private String genIpAddress(int serverId)
+  {
+    return new StringBuilder().append(rand.nextInt(255))
+        
.append(".").append(rand.nextInt(255)).append(".").append(rand.nextInt(255))
+        .append(".").append(rand.nextInt(255)).toString();
+  }
+
+  private String getTimeStamp()
+  {
+    return new StringBuilder("[").append(apapcheDateFormat.format(new 
Date())).append("]").toString();
+  }
+
+  private String genHttpCode()
+  {
+    return httpStatusCodes[rand.nextInt(httpStatusCodes.length)];
+  }
+
+  private String genUrl()
+  {
+    String format = urlFormats[rand.nextInt(urlFormats.length)];
+    return String.format(format, rand.nextInt(100));
+  }
+
+  private String genBrowserId()
+  {
+    String format = browserIds[rand.nextInt(browserIds.length)];
+    return String.format(format, rand.nextInt(100000));
+  }
+
+  // generate log string
+  private String genLogString(String ipAddress, String browserId, String 
httpCode, String url)
+  {
+    // server/ipaddress
+    int serverId = genServerId();
+    String serverName = genServerName(serverId);
+    if (ipAddress == null) {
+      ipAddress = genIpAddress(serverId);
+    }
+
+    // time
+    String logTime = getTimeStamp();
+
+    // url
+    if (url == null) {
+      url = new StringBuilder("\"").append("GET").append(" 
").append(genUrl()).append(" ").append("HTTP/1.1")
+          .append("\"").toString();
+    }
+
+    // http code
+    if (httpCode == null) {
+      httpCode = genHttpCode();
+    }
+
+    // number of bytes
+    int numBytes = rand.nextInt(4000);
+
+    // browser id
+    if (browserId == null) {
+      browserId = genBrowserId();
+    }
+
+    // print
+    return new StringBuilder().append(serverName).append(" 
").append(ipAddress).append(" - - ").append(logTime)
+        .append(" ").append(url).append(" ").append(httpCode).append(" 
").append(numBytes).append(" \" \" \"")
+        .append(browserId).append("\"").toString();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  boolean genTuples;
+  int attackInterval;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    genTuples = true;
+    attackInterval = rand.nextInt(10) + 1;
+  }
+
+  @Override
+  public void teardown()
+  {
+    genTuples = false;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    attackInterval--;
+    String browserId = null;
+    String ipAdddress = null;
+    if (attackInterval == 0) {
+      browserId = genBrowserId();
+      ipAdddress = genIpAddress(rand.nextInt(10));
+      attackInterval += rand.nextInt(10) + 1;
+      for (int i = 0; i < rand.nextInt(3); i++) {
+        outport.emit(genLogString(ipAdddress, browserId, "404", null));
+      }
+      String url = new StringBuilder("\"").append("GET").append(" 
").append(genUrl()).append(" ").append("HTTP/1.1")
+          .append("\"").toString();
+      for (int i = 0; i < rand.nextInt(3); i++) {
+        outport.emit(genLogString(ipAdddress, browserId, "404", url));
+      }
+    }
+    for (int i = 0; i < rand.nextInt(100000); i++) {
+      outport.emit(genLogString(ipAdddress, browserId, null, null));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java
index 1c142a4..36b042f 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/CollectionMultiConsoleOutputOperator.java
@@ -23,8 +23,8 @@ import java.util.Collection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This output operator receives collections as tuples.&nbsp;
@@ -62,7 +62,8 @@ public class CollectionMultiConsoleOutputOperator<E> extends 
BaseOperator
   /**
    * This input port which receives collection tuples.
    */
-  public final transient DefaultInputPort<Collection<E>> input = new 
DefaultInputPort<Collection<E>>() {
+  public final transient DefaultInputPort<Collection<E>> input = new 
DefaultInputPort<Collection<E>>()
+  {
     @Override
     public void process(Collection<E> t)
     {
@@ -73,8 +74,9 @@ public class CollectionMultiConsoleOutputOperator<E> extends 
BaseOperator
         if (!silent) {
           System.out.println(obj.toString());
         }
-        if (debug)
+        if (debug) {
           logger.info(obj.toString());
+        }
       }
       System.out.println("}");
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java
index 5e72d3d..64046b2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/ConsoleOutputOperator.java
@@ -18,13 +18,13 @@
  */
 package com.datatorrent.lib.io;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.Stateless;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * Writes tuples to stdout of the container.
  * <p>
@@ -55,8 +55,7 @@ public class ConsoleOutputOperator extends BaseOperator
       String s;
       if (stringFormat == null) {
         s = t.toString();
-      }
-      else {
+      } else {
         s = String.format(stringFormat, t);
       }
       if (!silent) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java
index ea1fc8d..ad6f7a6 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/HttpJsonChunksInputOperator.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.io;
 
-import com.sun.jersey.api.client.ClientResponse;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,12 +26,16 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.io.IOUtils;
+
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+
+import com.sun.jersey.api.client.ClientResponse;
+
 /**
  * This operator reads in JSON data and outputs it as a map.
  * <p>
@@ -75,8 +78,7 @@ public class HttpJsonChunksInputOperator extends 
AbstractHttpInputOperator<Map<S
           response.close();
           break;
         }
-      }
-      catch (JSONException ex) {
+      } catch (JSONException ex) {
         LOG.error("Caught JSON error:", ex);
       }
       if (bytesRead == -1) {
@@ -108,8 +110,7 @@ public class HttpJsonChunksInputOperator extends 
AbstractHttpInputOperator<Map<S
         currentChunkLength = nextLength;
 
         //LOG.debug("chunk length: " + line);
-      }
-      catch (NumberFormatException e) {
+      } catch (NumberFormatException e) {
         // add to chunk
         chunkStr.append(line);
         chunkStr.append("\n");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java
index a483df1..c355b16 100644
--- a/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/HttpLinesInputOperator.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.lib.io;
 
-import com.sun.jersey.api.client.ClientResponse;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
+import com.sun.jersey.api.client.ClientResponse;
+
 /**
  * Incoming data is interpreted as lines of plain text and each tuple output 
is a line in the content.
  * <p></p>
@@ -47,8 +48,7 @@ public class HttpLinesInputOperator extends 
AbstractHttpInputOperator<String>
         rawOutput.emit(line);
         outputPort.emit(line);
       }
-    }
-    finally {
+    } finally {
       br.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java
index a69761c..57a4d91 100644
--- a/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/HttpPostOutputOperator.java
@@ -22,10 +22,10 @@ import java.util.Map;
 
 import javax.ws.rs.core.MediaType;
 
-import com.sun.jersey.api.client.WebResource;
-
 import org.codehaus.jettison.json.JSONObject;
 
+import com.sun.jersey.api.client.WebResource;
+
 import com.datatorrent.api.Context.OperatorContext;
 
 /**
@@ -51,8 +51,7 @@ public class HttpPostOutputOperator<T> extends 
AbstractHttpOperator<T>
   {
     if (t instanceof Map) {
       resource.type(MediaType.APPLICATION_JSON).post(new JSONObject((Map<?, 
?>)t).toString());
-    }
-    else {
+    } else {
       resource.post(t.toString());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java 
b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index 65bda89..11b68a7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -162,7 +162,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
 
             for (FileStatus status : 
fs.listStatus(operatorDirStatus.getPath())) {
               String fileName = status.getPath().getName();
-              if(fileName.endsWith(FSStorageAgent.TMP_FILE)) {
+              if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
                 continue;
               }
               long windowId = Long.parseLong(fileName, 16);
@@ -173,8 +173,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
             }
           }
         }
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }
@@ -255,8 +254,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
                 deletedOperators.remove(loperator);
                 fs.delete(loperatorPath, true);
               }
-            }
-            else if (loperator == operatorId) {
+            } else if (loperator == operatorId) {
               storageAgent.delete(loperator, lwindow);
             }
           }
@@ -296,7 +294,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
 
       for (IdempotentStorageManager storageManager : newManagers) {
 
-        FSIdempotentStorageManager lmanager = (FSIdempotentStorageManager) 
storageManager;
+        FSIdempotentStorageManager lmanager = 
(FSIdempotentStorageManager)storageManager;
         lmanager.recoveryPath = this.recoveryPath;
         lmanager.storageAgent = this.storageAgent;
 
@@ -318,7 +316,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
         //If some operators were removed then there needs to be a manager 
which can clean there state when it is not needed.
         if (deletedOperatorsManager == null) {
           //None of the managers were handling deleted operators data.
-          deletedOperatorsManager = (FSIdempotentStorageManager) 
newManagers.iterator().next();
+          deletedOperatorsManager = 
(FSIdempotentStorageManager)newManagers.iterator().next();
           deletedOperatorsManager.deletedOperators = Sets.newHashSet();
         }
 
@@ -331,8 +329,7 @@ public interface IdempotentStorageManager extends 
StorageAgent, Component<Contex
     {
       try {
         fs.close();
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java
index c461163..1e31552 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/MapMultiConsoleOutputOperator.java
@@ -23,8 +23,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator writes tuples which are maps to standard out of the container.
@@ -57,7 +57,8 @@ public class MapMultiConsoleOutputOperator<K, V> extends 
BaseOperator
   }
 
   private static final Logger logger = 
LoggerFactory.getLogger(MapMultiConsoleOutputOperator.class);
-  public final transient DefaultInputPort<Map<K, V>> input = new 
DefaultInputPort<Map<K, V>>() {
+  public final transient DefaultInputPort<Map<K, V>> input = new 
DefaultInputPort<Map<K, V>>()
+  {
     @Override
     public void process(Map<K, V> t)
     {
@@ -66,8 +67,9 @@ public class MapMultiConsoleOutputOperator<K, V> extends 
BaseOperator
         if (!silent) {
           System.out.println(entry.getKey().toString() + "=" + 
entry.getValue().toString());
         }
-        if (debug)
+        if (debug) {
           logger.info(entry.getKey().toString() + "=" + 
entry.getValue().toString());
+        }
       }
       System.out.println("}");
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 3e8d8d3..7cf883f 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -161,23 +161,18 @@ public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<St
       JSONArray ja = jo.names();
 
       //Make sure that only the correct keys are in the first level of JSON
-      for(int keyIndex = 0;
-          keyIndex < ja.length();
-          keyIndex++) {
+      for (int keyIndex = 0; keyIndex < ja.length(); keyIndex++) {
         String key = ja.getString(keyIndex);
-        if(!(PubSubMessage.DATA_KEY.equals(key) ||
-           PubSubMessage.TOPIC_KEY.equals(key) ||
-           PubSubMessage.TYPE_KEY.equals(key))) {
-          logger.error("{} is not a valid key in the first level of the 
following pubsub message:\n{}",
-                       key,
-                       message);
+        if (!(PubSubMessage.DATA_KEY.equals(key) ||
+            PubSubMessage.TOPIC_KEY.equals(key) ||
+            PubSubMessage.TYPE_KEY.equals(key))) {
+          logger.error("{} is not a valid key in the first level of the 
following pubsub message:\n{}", key, message);
           return null;
         }
       }
 
       data = jo.getString(PubSubMessage.DATA_KEY);
-    }
-    catch(JSONException e) {
+    } catch (JSONException e) {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index 4d25f49..e1f3fa1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.io;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -26,10 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
-
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
-import java.net.URI;
 
 /**
  * This is an app data pub sub result operator. This operator is used to send 
results to
@@ -40,8 +39,9 @@ import java.net.URI;
  * @tags output, app data, result
  * @since 3.0.0
  */
[email protected](value=true)
-public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<String> implements AppData.ConnectionInfoProvider
[email protected](value = true)
+public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<String>
+    implements AppData.ConnectionInfoProvider
 {
   private static final Logger logger = 
LoggerFactory.getLogger(PubSubWebSocketAppDataResult.class);
 
@@ -93,8 +93,7 @@ public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<
 
     try {
       jo = new JSONObject(t);
-    }
-    catch(JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -102,8 +101,7 @@ public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<
 
     try {
       id = jo.getString("id");
-    }
-    catch(JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
 
@@ -115,8 +113,7 @@ public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<
       output.put("topic", topic);
       output.put("data", jo);
       output.put("type", PubSubMessageType.PUBLISH.getIdentifier());
-    }
-    catch(JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
index 0b56924..4d5fa9a 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
@@ -82,8 +82,7 @@ public class PubSubWebSocketInputOperator<T> extends 
WebSocketInputOperator<T>
     super.run();
     try {
       
connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(topic, 
codec));
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       LOG.error("Exception caught", ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
index 5bf31ff..12737ba 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
@@ -21,10 +21,13 @@ package com.datatorrent.lib.io;
 import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 
-import com.datatorrent.api.*;
+import org.apache.commons.lang3.ClassUtils;
+
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.common.util.BaseOperator;
-import org.apache.commons.lang3.ClassUtils;
 
 /**
  * This an input operator which passes data from an asynchronous data source 
to a port processing thread.
@@ -48,7 +51,7 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
    * The single output port of this input operator.
    * Collects asynchronously emitted tuples and flushes in container thread.
    */
-  final public transient BufferingOutputPort<T> outputPort;
+  public final transient BufferingOutputPort<T> outputPort;
 
   public SimpleSinglePortInputOperator(int portCapacity)
   {
@@ -61,7 +64,7 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
   }
 
   @Override
-  final public void activate(OperatorContext ctx)
+  public final void activate(OperatorContext ctx)
   {
     isActive = true;
     if (this instanceof Runnable) {
@@ -71,7 +74,7 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
   }
 
   @Override
-  final public void deactivate()
+  public final void deactivate()
   {
     isActive = false;
     if (ioThread != null) {
@@ -80,7 +83,7 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
     }
   }
 
-  final public boolean isActive()
+  public final boolean isActive()
   {
     return isActive;
   }
@@ -115,8 +118,7 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
     {
       try {
         tuples.put(tuple);
-      }
-      catch (InterruptedException ex) {
+      } catch (InterruptedException ex) {
         throw new RuntimeException(ex);
       }
     }
@@ -130,6 +132,6 @@ public abstract class SimpleSinglePortInputOperator<T> 
extends BaseOperator impl
       }
     }
 
-  };
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
index 6477021..2d6ef86 100644
--- a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
@@ -18,24 +18,32 @@
  */
 package com.datatorrent.lib.io;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
-import java.util.*;
-
-import javax.mail.*;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.mail.Authenticator;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.Transport;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.MimeMessage;
 import javax.validation.constraints.AssertTrue;
 import javax.validation.constraints.NotNull;
 
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * This operator outputs data to an smtp server.
  * <p></p>
@@ -90,8 +98,7 @@ public class SmtpOutputOperator extends BaseOperator
         message.setContent(mailContent, contentType);
         LOG.info("Sending email for tuple {}", t.toString());
         Transport.send(message);
-      }
-      catch (MessagingException ex) {
+      } catch (MessagingException ex) {
         LOG.error("Something wrong with sending email.", ex);
       }
     }
@@ -264,8 +271,7 @@ public class SmtpOutputOperator extends BaseOperator
       }
       message.setSubject(subject);
       LOG.debug("all recipients {}", 
Arrays.toString(message.getAllRecipients()));
-    }
-    catch (MessagingException ex) {
+    } catch (MessagingException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index f805dcf..eae9e12 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -29,13 +29,12 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.ClassUtils;
-
 import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
 import 
org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
 import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
 import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
 import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.commons.lang3.ClassUtils;
 
 import com.datatorrent.api.Context.OperatorContext;
 
@@ -60,8 +59,8 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
   //Do not make this @NotNull since null is a valid value for some child 
classes
   private URI uri;
   private transient AsyncHttpClient client;
-  private transient final JsonFactory jsonFactory = new JsonFactory();
-  protected transient final ObjectMapper mapper = new 
ObjectMapper(jsonFactory);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  protected final transient ObjectMapper mapper = new 
ObjectMapper(jsonFactory);
   protected transient WebSocket connection;
   private transient boolean connectionClosed = false;
   private transient volatile boolean shutdown = false;
@@ -121,8 +120,7 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
       shutdown = false;
       monThread = new MonitorThread();
       monThread.start();
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -135,8 +133,7 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
       if (monThread != null) {
         monThread.join();
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.error("Error joining monitor", ex);
     }
 
@@ -171,8 +168,8 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
             connection.close();
             WebSocketInputOperator.this.activate(null);
           }
-        }
-        catch (Exception ex) {
+        } catch (Exception ex) {
+          //swallowing exception
         }
       }
     }
@@ -213,11 +210,10 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
           LOG.debug("Got: " + string);
           try {
             T o = convertMessage(string);
-            if(!(skipNull && o == null)) {
+            if (!(skipNull && o == null)) {
               outputPort.emit(o);
             }
-          }
-          catch (IOException ex) {
+          } catch (IOException ex) {
             LOG.error("Got exception: ", ex);
           }
         }
@@ -242,8 +238,7 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
         }
 
       }).build()).get(5, TimeUnit.SECONDS);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.error("Error reading from " + uri, ex);
       if (client != null) {
         client.close();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index b793183..382be67 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -38,9 +38,9 @@ import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListen
 import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
 import org.apache.commons.lang3.ClassUtils;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Reads via WebSocket from given URL as input stream.&nbsp;Incoming data is 
interpreted as JSONObject and converted to {@link java.util.Map}.
@@ -59,8 +59,8 @@ public class WebSocketOutputOperator<T> extends BaseOperator
   //Do not make this @NotNull since null is a valid value for some child 
classes
   private URI uri;
   private transient AsyncHttpClient client;
-  private transient final JsonFactory jsonFactory = new JsonFactory();
-  protected transient final ObjectMapper mapper = new 
ObjectMapper(jsonFactory);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  protected final transient ObjectMapper mapper = new 
ObjectMapper(jsonFactory);
   protected transient WebSocket connection;
   private int ioThreadMultiplier = 1;
   private int numRetries = 3;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java
index bf1b40e..2814b49 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketServerInputOperator.java
@@ -18,18 +18,20 @@
  */
 package com.datatorrent.lib.io;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.netlet.util.DTThrowable;
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
+
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketServlet;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 /**
  * @since 3.3.0
@@ -75,8 +77,7 @@ public abstract class WebSocketServerInputOperator implements 
InputOperator
 
     try {
       server.start();
-    }
-    catch(Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
   }
@@ -86,8 +87,7 @@ public abstract class WebSocketServerInputOperator implements 
InputOperator
   {
     try {
       server.stop();
-    }
-    catch(Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
index 5b6259c..b027b58 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
@@ -25,17 +25,16 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.common.collect.Maps;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
+import com.google.common.collect.Maps;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.PubSubMessageCodec;
 
@@ -60,16 +59,15 @@ import com.datatorrent.common.util.PubSubMessageCodec;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public class WidgetOutputOperator extends BaseOperator
 {
-  protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new 
WebSocketOutputOperator<Pair<String,Object>>(){
-
+  protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new 
WebSocketOutputOperator<Pair<String, Object>>()
+  {
     private transient PubSubMessageCodec<Object> codec = new 
PubSubMessageCodec<>(mapper);
 
     @Override
-    public String convertMapToMessage(Pair<String,Object> t) throws IOException
+    public String convertMapToMessage(Pair<String, Object> t) throws 
IOException
     {
       return PubSubMessageCodec.constructPublishMessage(t.getLeft(), 
t.getRight(), codec);
     }
-
   };
 
   protected transient ConsoleOutputOperator coo = new ConsoleOutputOperator();
@@ -99,31 +97,31 @@ public class WidgetOutputOperator extends BaseOperator
   /**
    * Tuples received on this input port will be sent to a Simple Widget for 
display.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient SimpleInputPort simpleInput = new 
SimpleInputPort(this);
 
   /**
    * Tuples received on this input port will be sent to a Time Series Widget 
for display.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient TimeseriesInputPort timeSeriesInput = new 
TimeseriesInputPort(this);
 
   /**
    * Tuples received on this input port will be sent to a Percentage Widget.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient PercentageInputPort percentageInput = new 
PercentageInputPort(this);
 
   /**
    * Tuples received on this input port will be sent to a Top N Widget for 
display.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient TopNInputPort topNInput = new TopNInputPort(this);
 
   /**
    * Tuples received on this input port will be sent to a Pie Chart Widget for 
display.
    */
-  @InputPortFieldAnnotation(optional=true)
+  @InputPortFieldAnnotation(optional = true)
   public final transient PiechartInputPort pieChartInput = new 
PiechartInputPort(this);
 
   protected transient boolean isWebSocketConnected = true;
@@ -132,7 +130,7 @@ public class WidgetOutputOperator extends BaseOperator
   public void setup(OperatorContext context)
   {
     String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if(!StringUtils.isEmpty(gatewayAddress)){
+    if (!StringUtils.isEmpty(gatewayAddress)) {
       wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub"));
       wsoo.setup(context);
     } else {
@@ -205,8 +203,8 @@ public class WidgetOutputOperator extends BaseOperator
 
   }
 
-  public static class TopNInputPort extends DefaultInputPort<HashMap<String, 
Number>>{
-
+  public static class TopNInputPort extends DefaultInputPort<HashMap<String, 
Number>>
+  {
     private final WidgetOutputOperator operator;
 
     public TopNInputPort(WidgetOutputOperator oper)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index b5b20e4..bf1605d 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -96,8 +96,8 @@ import com.datatorrent.lib.util.KryoCloneUtils;
  * @param <T> The type of the object that this input operator reads.
  * @since 1.0.2
  */
-public abstract class AbstractFileInputOperator<T> implements InputOperator, 
Partitioner<AbstractFileInputOperator<T>>, StatsListener,
-  Operator.CheckpointListener
+public abstract class AbstractFileInputOperator<T>
+    implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, 
StatsListener, Operator.CheckpointListener
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFileInputOperator.class);
 
@@ -110,11 +110,11 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   protected String currentFile;
   protected Set<String> processedFiles = new HashSet<String>();
   protected int emitBatchSize = 1000;
-  protected int currentPartitions = 1 ;
+  protected int currentPartitions = 1;
   protected int partitionCount = 1;
   private int retryCount = 0;
   private int maxRetryCount = 5;
-  transient protected int skipCount = 0;
+  protected transient int skipCount = 0;
   private transient OperatorContext context;
 
   private final BasicCounters<MutableLong> fileCounters = new 
BasicCounters<MutableLong>(MutableLong.class);
@@ -140,7 +140,8 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
    * failed file is retried for maxRetryCount number of times, after that the 
file is
    * ignored.
    */
-  protected static class FailedFile {
+  protected static class FailedFile
+  {
     String path;
     int   offset;
     int    retryCount;
@@ -150,13 +151,15 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     @SuppressWarnings("unused")
     protected FailedFile() {}
 
-    protected FailedFile(String path, int offset) {
+    protected FailedFile(String path, int offset)
+    {
       this.path = path;
       this.offset = offset;
       this.retryCount = 0;
     }
 
-    protected FailedFile(String path, int offset, int retryCount) {
+    protected FailedFile(String path, int offset, int retryCount)
+    {
       this.path = path;
       this.offset = offset;
       this.retryCount = retryCount;
@@ -266,7 +269,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
    * <p/>
    * @since 1.0.4
    */
-  public final static class FileCountersAggregator implements 
CountersAggregator, Serializable
+  public static final class FileCountersAggregator implements 
CountersAggregator, Serializable
   {
     private static final long serialVersionUID = 201409041428L;
     MutableLong totalLocalProcessedFiles = new MutableLong();
@@ -278,11 +281,11 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     @SuppressWarnings("unchecked")
     public Object aggregate(Collection<?> countersList)
     {
-      if(countersList.isEmpty()) {
+      if (countersList.isEmpty()) {
         return null;
       }
 
-      BasicCounters<MutableLong> tempFileCounters = 
(BasicCounters<MutableLong>) countersList.iterator().next();
+      BasicCounters<MutableLong> tempFileCounters = 
(BasicCounters<MutableLong>)countersList.iterator().next();
       MutableLong globalProcessedFiles = 
tempFileCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES);
       MutableLong globalNumberOfFailures = 
tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES);
       MutableLong globalNumberOfRetries = 
tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES);
@@ -291,8 +294,8 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       totalLocalNumberOfFailures.setValue(0);
       totalLocalNumberOfRetries.setValue(0);
 
-      for(Object fileCounters: countersList) {
-        BasicCounters<MutableLong> basicFileCounters = 
(BasicCounters<MutableLong>) fileCounters;
+      for (Object fileCounters : countersList) {
+        BasicCounters<MutableLong> basicFileCounters = 
(BasicCounters<MutableLong>)fileCounters;
         
totalLocalProcessedFiles.add(basicFileCounters.getCounter(FileCounters.LOCAL_PROCESSED_FILES));
         
pendingFiles.add(basicFileCounters.getCounter(FileCounters.PENDING_FILES));
         
totalLocalNumberOfFailures.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES));
@@ -441,25 +444,17 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       filePath = new Path(directory);
       configuration = new Configuration();
       fs = getFSInstance();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       failureHandling(ex);
     }
 
-    fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES,
-                            globalProcessedFileCount);
-    fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES,
-                            localProcessedFileCount);
-    fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES,
-                            globalNumberOfFailures);
-    fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES,
-                            localNumberOfFailures);
-    fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES,
-                            globalNumberOfRetries);
-    fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES,
-                            localNumberOfRetries);
-    fileCounters.setCounter(FileCounters.PENDING_FILES,
-                            pendingFileCount);
+    fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, 
globalProcessedFileCount);
+    fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, 
localProcessedFileCount);
+    fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, 
globalNumberOfFailures);
+    fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, 
localNumberOfFailures);
+    fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, 
globalNumberOfRetries);
+    fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, 
localNumberOfRetries);
+    fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount);
 
     idempotentStorageManager.setup(context);
     if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
idempotentStorageManager.getLargestRecoveryWindow()) {
@@ -487,11 +482,10 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     boolean fileFailed = false;
 
     try {
-      if(inputStream != null) {
+      if (inputStream != null) {
         inputStream.close();
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       savedException = ex;
       fileFailed = true;
     }
@@ -500,20 +494,19 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
 
     try {
       fs.close();
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       savedException = ex;
       fsFailed = true;
     }
 
-    if(savedException != null) {
+    if (savedException != null) {
       String errorMessage = "";
 
-      if(fileFailed) {
+      if (fileFailed) {
         errorMessage += "Failed to close " + currentFile + ". ";
       }
 
-      if(fsFailed) {
+      if (fsFailed) {
         errorMessage += "Failed to close filesystem.";
       }
 
@@ -537,18 +530,15 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
       try {
         idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
     }
     currentWindowRecoveryState.clear();
-    if(context != null) {
-      pendingFileCount.setValue(pendingFiles.size() +
-                                     failedFiles.size() +
-                                     unfinishedFiles.size());
+    if (context != null) {
+      pendingFileCount.setValue(pendingFiles.size() + failedFiles.size() + 
unfinishedFiles.size());
 
-      if(currentFile != null) {
+      if (currentFile != null) {
         pendingFileCount.increment();
       }
 
@@ -567,7 +557,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
 
       for (Object recovery : recoveryDataPerOperator.values()) {
         @SuppressWarnings("unchecked")
-        LinkedList<RecoveryEntry> recoveryData = (LinkedList<RecoveryEntry>) 
recovery;
+        LinkedList<RecoveryEntry> recoveryData = 
(LinkedList<RecoveryEntry>)recovery;
 
         for (RecoveryEntry recoveryEntry : recoveryData) {
           if (scanner.acceptFile(recoveryEntry.file)) {
@@ -606,8 +596,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
                 offset++;
                 emit(line);
               }
-            }
-            else {
+            } else {
               while (offset < recoveryEntry.endOffset) {
                 T line = readEntity();
                 offset++;
@@ -617,8 +606,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
           }
         }
       }
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException("replay", e);
     }
   }
@@ -645,24 +633,20 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
             offset = 0;
             skipCount = 0;
           }
-        }
-        else if (!unfinishedFiles.isEmpty()) {
+        } else if (!unfinishedFiles.isEmpty()) {
           retryFailedFile(unfinishedFiles.poll());
-        }
-        else if (!pendingFiles.isEmpty()) {
+        } else if (!pendingFiles.isEmpty()) {
           String newPathString = pendingFiles.iterator().next();
           pendingFiles.remove(newPathString);
-          if (fs.exists(new Path(newPathString)))
+          if (fs.exists(new Path(newPathString))) {
             this.inputStream = openFile(new Path(newPathString));
-        }
-        else if (!failedFiles.isEmpty()) {
+          }
+        } else if (!failedFiles.isEmpty()) {
           retryFailedFile(failedFiles.poll());
-        }
-        else {
+        } else {
           scanDirectory();
         }
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         failureHandling(ex);
       }
     }
@@ -687,13 +671,11 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
           if (skipCount == 0) {
             offset++;
             emit(line);
-          }
-          else {
+          } else {
             skipCount--;
           }
         }
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         failureHandling(e);
       }
       //Only when something was emitted from the file then we record it for 
entry.
@@ -708,10 +690,10 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
    */
   protected void scanDirectory()
   {
-    if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
+    if (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
       Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
 
-      for(Path newPath : newPaths) {
+      for (Path newPath : newPaths) {
         String newPathString = newPath.toString();
         pendingFiles.add(newPathString);
         processedFiles.add(newPathString);
@@ -729,27 +711,28 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   private void failureHandling(Exception e)
   {
     localNumberOfFailures.increment();
-    if(maxRetryCount <= 0) {
+    if (maxRetryCount <= 0) {
       throw new RuntimeException(e);
     }
     LOG.error("FS reader error", e);
     addToFailedList();
   }
 
-  protected void addToFailedList() {
-
+  protected void addToFailedList()
+  {
     FailedFile ff = new FailedFile(currentFile, offset, retryCount);
 
     try {
       // try to close file
-      if (this.inputStream != null)
+      if (this.inputStream != null) {
         this.inputStream.close();
-    } catch(IOException e) {
+      }
+    } catch (IOException e) {
       localNumberOfFailures.increment();
       LOG.error("Could not close input stream on: " + currentFile);
     }
 
-    ff.retryCount ++;
+    ff.retryCount++;
     ff.lastFailedTime = System.currentTimeMillis();
     ff.offset = this.offset;
 
@@ -757,8 +740,9 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     this.currentFile = null;
     this.inputStream = null;
 
-    if (ff.retryCount > maxRetryCount)
+    if (ff.retryCount > maxRetryCount) {
       return;
+    }
 
     localNumberOfRetries.increment();
     LOG.info("adding to failed list path {} offset {} retry {}", ff.path, 
ff.offset, ff.retryCount);
@@ -769,8 +753,9 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   {
     LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, 
ff.retryCount);
     String path = ff.path;
-    if (!fs.exists(new Path(path)))
+    if (!fs.exists(new Path(path))) {
       return null;
+    }
     this.inputStream = openFile(new Path(path));
     this.offset = ff.offset;
     this.retryCount = ff.retryCount;
@@ -793,8 +778,9 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   {
     LOG.info("closing file {} offset {}", currentFile, offset);
 
-    if (is != null)
+    if (is != null) {
       is.close();
+    }
 
     currentFile = null;
     inputStream = null;
@@ -828,7 +814,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     List<String> totalPendingFiles = Lists.newLinkedList();
     Set<Integer> deletedOperators =  Sets.newHashSet();
 
-    for(Partition<AbstractFileInputOperator<T>> partition : partitions) {
+    for (Partition<AbstractFileInputOperator<T>> partition : partitions) {
       AbstractFileInputOperator<T> oper = partition.getPartitionedInstance();
       totalProcessedFiles.addAll(oper.processedFiles);
       totalFailedFiles.addAll(oper.failedFiles);
@@ -853,7 +839,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     Collection<IdempotentStorageManager> newManagers = 
Lists.newArrayListWithExpectedSize(totalCount);
 
     KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
-    for (int i=0; i<scanners.size(); i++) {
+    for (int i = 0; i < scanners.size(); i++) {
 
       @SuppressWarnings("unchecked")
       AbstractFileInputOperator<T> oper = cloneUtils.getClone();
@@ -873,7 +859,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       oper.currentFile = null;
       oper.offset = 0;
       Iterator<FailedFile> unfinishedIter = currentFiles.iterator();
-      while(unfinishedIter.hasNext()) {
+      while (unfinishedIter.hasNext()) {
         FailedFile unfinishedFile = unfinishedIter.next();
         if (scn.acceptFile(unfinishedFile.path)) {
           oper.unfinishedFiles.add(unfinishedFile);
@@ -895,9 +881,9 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       /* redistribute pending files properly */
       oper.pendingFiles.clear();
       Iterator<String> pendingFilesIterator = totalPendingFiles.iterator();
-      while(pendingFilesIterator.hasNext()) {
+      while (pendingFilesIterator.hasNext()) {
         String pathString = pendingFilesIterator.next();
-        if(scn.acceptFile(pathString)) {
+        if (scn.acceptFile(pathString)) {
           oper.pendingFiles.add(pathString);
           pendingFilesIterator.remove();
         }
@@ -932,8 +918,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   {
     try {
       idempotentStorageManager.deleteUpTo(operatorId, windowId);
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
@@ -941,16 +926,18 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
   /**
    * Read the next item from the stream. Depending on the type of stream, this 
could be a byte array, line or object.
    * Upon return of null, the stream will be considered fully consumed.
-   * @throws IOException
+   *
    * @return Depending on the type of stream an object is returned. When null 
is returned the stream is consumed.
+   * @throws IOException
    */
-  abstract protected T readEntity() throws IOException;
+  protected abstract T readEntity() throws IOException;
 
   /**
    * Emit the tuple on the port
+   *
    * @param tuple
    */
-  abstract protected void emit(T tuple);
+  protected abstract void emit(T tuple);
 
 
   /**
@@ -1017,17 +1004,21 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       this.regex = null;
     }
 
-    public int getPartitionCount() {
+    public int getPartitionCount()
+    {
       return partitionCount;
     }
 
-    public int getPartitionIndex() {
+    public int getPartitionIndex()
+    {
       return partitionIndex;
     }
 
-    protected Pattern getRegex() {
-      if (this.regex == null && this.filePatternRegexp != null)
+    protected Pattern getRegex()
+    {
+      if (this.regex == null && this.filePatternRegexp != null) {
         this.regex = Pattern.compile(this.filePatternRegexp);
+      }
       return this.regex;
     }
 
@@ -1037,8 +1028,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
       try {
         LOG.debug("Scanning {} with pattern {}", filePath, 
this.filePatternRegexp);
         FileStatus[] files = fs.listStatus(filePath);
-        for (FileStatus status : files)
-        {
+        for (FileStatus status : files) {
           Path path = status.getPath();
           String filePathStr = path.toString();
 
@@ -1081,8 +1071,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
         }
       }
       Pattern regex = this.getRegex();
-      if (regex != null)
-      {
+      if (regex != null) {
         Matcher matcher = regex.matcher(filePathStr);
         if (!matcher.matches()) {
           return false;
@@ -1094,13 +1083,14 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
     public List<DirectoryScanner> partition(int count)
     {
       ArrayList<DirectoryScanner> partitions = 
Lists.newArrayListWithExpectedSize(count);
-      for (int i=0; i<count; i++) {
+      for (int i = 0; i < count; i++) {
         partitions.add(this.createPartition(i, count));
       }
       return partitions;
     }
 
-    public List<DirectoryScanner>  partition(int count , 
@SuppressWarnings("unused") Collection<DirectoryScanner> scanners) {
+    public List<DirectoryScanner>  partition(int count, 
@SuppressWarnings("unused") Collection<DirectoryScanner> scanners)
+    {
       return partition(count);
     }
 
@@ -1153,7 +1143,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
         return false;
       }
 
-      RecoveryEntry that = (RecoveryEntry) o;
+      RecoveryEntry that = (RecoveryEntry)o;
 
       if (endOffset != that.endOffset) {
         return false;
@@ -1192,7 +1182,7 @@ public abstract class AbstractFileInputOperator<T> 
implements InputOperator, Par
    */
   public static class FileLineInputOperator extends 
AbstractFileInputOperator<String>
   {
-    public transient final DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();
+    public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();
 
     protected transient BufferedReader br;
 

Reply via email to