Repository: spark
Updated Branches:
  refs/heads/master e47408814 -> 20fd25410


http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index c462ab1..b6fa9a0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -115,20 +115,20 @@ public final class ColumnarBatch {
      * Marks this row as being filtered out. This means a subsequent iteration 
over the rows
      * in this batch will not include this row.
      */
-    public final void markFiltered() {
+    public void markFiltered() {
       parent.markFiltered(rowId);
     }
 
     public ColumnVector[] columns() { return columns; }
 
     @Override
-    public final int numFields() { return columns.length; }
+    public int numFields() { return columns.length; }
 
     @Override
     /**
      * Revisit this. This is expensive. This is currently only used in test 
paths.
      */
-    public final InternalRow copy() {
+    public InternalRow copy() {
       GenericMutableRow row = new GenericMutableRow(columns.length);
       for (int i = 0; i < numFields(); i++) {
         if (isNullAt(i)) {
@@ -163,73 +163,73 @@ public final class ColumnarBatch {
     }
 
     @Override
-    public final boolean anyNull() {
+    public boolean anyNull() {
       throw new NotImplementedException();
     }
 
     @Override
-    public final boolean isNullAt(int ordinal) { return 
columns[ordinal].getIsNull(rowId); }
+    public boolean isNullAt(int ordinal) { return 
columns[ordinal].getIsNull(rowId); }
 
     @Override
-    public final boolean getBoolean(int ordinal) { return 
columns[ordinal].getBoolean(rowId); }
+    public boolean getBoolean(int ordinal) { return 
columns[ordinal].getBoolean(rowId); }
 
     @Override
-    public final byte getByte(int ordinal) { return 
columns[ordinal].getByte(rowId); }
+    public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); 
}
 
     @Override
-    public final short getShort(int ordinal) { return 
columns[ordinal].getShort(rowId); }
+    public short getShort(int ordinal) { return 
columns[ordinal].getShort(rowId); }
 
     @Override
-    public final int getInt(int ordinal) { return 
columns[ordinal].getInt(rowId); }
+    public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); }
 
     @Override
-    public final long getLong(int ordinal) { return 
columns[ordinal].getLong(rowId); }
+    public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); 
}
 
     @Override
-    public final float getFloat(int ordinal) { return 
columns[ordinal].getFloat(rowId); }
+    public float getFloat(int ordinal) { return 
columns[ordinal].getFloat(rowId); }
 
     @Override
-    public final double getDouble(int ordinal) { return 
columns[ordinal].getDouble(rowId); }
+    public double getDouble(int ordinal) { return 
columns[ordinal].getDouble(rowId); }
 
     @Override
-    public final Decimal getDecimal(int ordinal, int precision, int scale) {
+    public Decimal getDecimal(int ordinal, int precision, int scale) {
       return columns[ordinal].getDecimal(rowId, precision, scale);
     }
 
     @Override
-    public final UTF8String getUTF8String(int ordinal) {
+    public UTF8String getUTF8String(int ordinal) {
       return columns[ordinal].getUTF8String(rowId);
     }
 
     @Override
-    public final byte[] getBinary(int ordinal) {
+    public byte[] getBinary(int ordinal) {
       return columns[ordinal].getBinary(rowId);
     }
 
     @Override
-    public final CalendarInterval getInterval(int ordinal) {
+    public CalendarInterval getInterval(int ordinal) {
       final int months = columns[ordinal].getChildColumn(0).getInt(rowId);
       final long microseconds = 
columns[ordinal].getChildColumn(1).getLong(rowId);
       return new CalendarInterval(months, microseconds);
     }
 
     @Override
-    public final InternalRow getStruct(int ordinal, int numFields) {
+    public InternalRow getStruct(int ordinal, int numFields) {
       return columns[ordinal].getStruct(rowId);
     }
 
     @Override
-    public final ArrayData getArray(int ordinal) {
+    public ArrayData getArray(int ordinal) {
       return columns[ordinal].getArray(rowId);
     }
 
     @Override
-    public final MapData getMap(int ordinal) {
+    public MapData getMap(int ordinal) {
       throw new NotImplementedException();
     }
 
     @Override
-    public final Object get(int ordinal, DataType dataType) {
+    public Object get(int ordinal, DataType dataType) {
       throw new NotImplementedException();
     }
   }
@@ -357,7 +357,7 @@ public final class ColumnarBatch {
    * Marks this row as being filtered out. This means a subsequent iteration 
over the rows
    * in this batch will not include this row.
    */
-  public final void markFiltered(int rowId) {
+  public void markFiltered(int rowId) {
     assert(!filteredRows[rowId]);
     filteredRows[rowId] = true;
     ++numRowsFiltered;
@@ -367,7 +367,7 @@ public final class ColumnarBatch {
    * Marks a given column as non-nullable. Any row that has a NULL value for 
the corresponding
    * attribute is filtered out.
    */
-  public final void filterNullsInColumn(int ordinal) {
+  public void filterNullsInColumn(int ordinal) {
     nullFilteredColumns.add(ordinal);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index b06b7f2..d5a9163 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -52,7 +52,7 @@ public final class OffHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final long valuesNativeAddress() {
+  public long valuesNativeAddress() {
     return data;
   }
 
@@ -62,7 +62,7 @@ public final class OffHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final void close() {
+  public void close() {
     Platform.freeMemory(nulls);
     Platform.freeMemory(data);
     Platform.freeMemory(lengthData);
@@ -78,19 +78,19 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   //
 
   @Override
-  public final void putNotNull(int rowId) {
+  public void putNotNull(int rowId) {
     Platform.putByte(null, nulls + rowId, (byte) 0);
   }
 
   @Override
-  public final void putNull(int rowId) {
+  public void putNull(int rowId) {
     Platform.putByte(null, nulls + rowId, (byte) 1);
     ++numNulls;
     anyNullsSet = true;
   }
 
   @Override
-  public final void putNulls(int rowId, int count) {
+  public void putNulls(int rowId, int count) {
     long offset = nulls + rowId;
     for (int i = 0; i < count; ++i, ++offset) {
       Platform.putByte(null, offset, (byte) 1);
@@ -100,7 +100,7 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   }
 
   @Override
-  public final void putNotNulls(int rowId, int count) {
+  public void putNotNulls(int rowId, int count) {
     if (!anyNullsSet) return;
     long offset = nulls + rowId;
     for (int i = 0; i < count; ++i, ++offset) {
@@ -109,7 +109,7 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   }
 
   @Override
-  public final boolean getIsNull(int rowId) {
+  public boolean getIsNull(int rowId) {
     return Platform.getByte(null, nulls + rowId) == 1;
   }
 
@@ -118,12 +118,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putBoolean(int rowId, boolean value) {
+  public void putBoolean(int rowId, boolean value) {
     Platform.putByte(null, data + rowId, (byte)((value) ? 1 : 0));
   }
 
   @Override
-  public final void putBooleans(int rowId, int count, boolean value) {
+  public void putBooleans(int rowId, int count, boolean value) {
     byte v = (byte)((value) ? 1 : 0);
     for (int i = 0; i < count; ++i) {
       Platform.putByte(null, data + rowId + i, v);
@@ -131,32 +131,32 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final boolean getBoolean(int rowId) { return Platform.getByte(null, 
data + rowId) == 1; }
+  public boolean getBoolean(int rowId) { return Platform.getByte(null, data + 
rowId) == 1; }
 
   //
   // APIs dealing with Bytes
   //
 
   @Override
-  public final void putByte(int rowId, byte value) {
+  public void putByte(int rowId, byte value) {
     Platform.putByte(null, data + rowId, value);
 
   }
 
   @Override
-  public final void putBytes(int rowId, int count, byte value) {
+  public void putBytes(int rowId, int count, byte value) {
     for (int i = 0; i < count; ++i) {
       Platform.putByte(null, data + rowId + i, value);
     }
   }
 
   @Override
-  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+  public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data 
+ rowId, count);
   }
 
   @Override
-  public final byte getByte(int rowId) {
+  public byte getByte(int rowId) {
     if (dictionary == null) {
       return Platform.getByte(null, data + rowId);
     } else {
@@ -169,12 +169,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putShort(int rowId, short value) {
+  public void putShort(int rowId, short value) {
     Platform.putShort(null, data + 2 * rowId, value);
   }
 
   @Override
-  public final void putShorts(int rowId, int count, short value) {
+  public void putShorts(int rowId, int count, short value) {
     long offset = data + 2 * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putShort(null, offset, value);
@@ -182,13 +182,13 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void putShorts(int rowId, int count, short[] src, int srcIndex) 
{
+  public void putShorts(int rowId, int count, short[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2,
         null, data + 2 * rowId, count * 2);
   }
 
   @Override
-  public final short getShort(int rowId) {
+  public short getShort(int rowId) {
     if (dictionary == null) {
       return Platform.getShort(null, data + 2 * rowId);
     } else {
@@ -201,12 +201,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putInt(int rowId, int value) {
+  public void putInt(int rowId, int value) {
     Platform.putInt(null, data + 4 * rowId, value);
   }
 
   @Override
-  public final void putInts(int rowId, int count, int value) {
+  public void putInts(int rowId, int count, int value) {
     long offset = data + 4 * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putInt(null, offset, value);
@@ -214,19 +214,19 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+  public void putInts(int rowId, int count, int[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4,
         null, data + 4 * rowId, count * 4);
   }
 
   @Override
-  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+  public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
         null, data + 4 * rowId, count * 4);
   }
 
   @Override
-  public final int getInt(int rowId) {
+  public int getInt(int rowId) {
     if (dictionary == null) {
       return Platform.getInt(null, data + 4 * rowId);
     } else {
@@ -239,12 +239,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putLong(int rowId, long value) {
+  public void putLong(int rowId, long value) {
     Platform.putLong(null, data + 8 * rowId, value);
   }
 
   @Override
-  public final void putLongs(int rowId, int count, long value) {
+  public void putLongs(int rowId, int count, long value) {
     long offset = data + 8 * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putLong(null, offset, value);
@@ -252,19 +252,19 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+  public void putLongs(int rowId, int count, long[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8,
         null, data + 8 * rowId, count * 8);
   }
 
   @Override
-  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+  public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
         null, data + 8 * rowId, count * 8);
   }
 
   @Override
-  public final long getLong(int rowId) {
+  public long getLong(int rowId) {
     if (dictionary == null) {
       return Platform.getLong(null, data + 8 * rowId);
     } else {
@@ -277,12 +277,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putFloat(int rowId, float value) {
+  public void putFloat(int rowId, float value) {
     Platform.putFloat(null, data + rowId * 4, value);
   }
 
   @Override
-  public final void putFloats(int rowId, int count, float value) {
+  public void putFloats(int rowId, int count, float value) {
     long offset = data + 4 * rowId;
     for (int i = 0; i < count; ++i, offset += 4) {
       Platform.putFloat(null, offset, value);
@@ -290,19 +290,19 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void putFloats(int rowId, int count, float[] src, int srcIndex) 
{
+  public void putFloats(int rowId, int count, float[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4,
         null, data + 4 * rowId, count * 4);
   }
 
   @Override
-  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+  public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
         null, data + rowId * 4, count * 4);
   }
 
   @Override
-  public final float getFloat(int rowId) {
+  public float getFloat(int rowId) {
     if (dictionary == null) {
       return Platform.getFloat(null, data + rowId * 4);
     } else {
@@ -316,12 +316,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putDouble(int rowId, double value) {
+  public void putDouble(int rowId, double value) {
     Platform.putDouble(null, data + rowId * 8, value);
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, double value) {
+  public void putDoubles(int rowId, int count, double value) {
     long offset = data + 8 * rowId;
     for (int i = 0; i < count; ++i, offset += 8) {
       Platform.putDouble(null, offset, value);
@@ -329,19 +329,19 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, double[] src, int 
srcIndex) {
+  public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8,
       null, data + 8 * rowId, count * 8);
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) 
{
+  public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
         null, data + rowId * 8, count * 8);
   }
 
   @Override
-  public final double getDouble(int rowId) {
+  public double getDouble(int rowId) {
     if (dictionary == null) {
       return Platform.getDouble(null, data + rowId * 8);
     } else {
@@ -353,25 +353,25 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   // APIs dealing with Arrays.
   //
   @Override
-  public final void putArray(int rowId, int offset, int length) {
+  public void putArray(int rowId, int offset, int length) {
     assert(offset >= 0 && offset + length <= childColumns[0].capacity);
     Platform.putInt(null, lengthData + 4 * rowId, length);
     Platform.putInt(null, offsetData + 4 * rowId, offset);
   }
 
   @Override
-  public final int getArrayLength(int rowId) {
+  public int getArrayLength(int rowId) {
     return Platform.getInt(null, lengthData + 4 * rowId);
   }
 
   @Override
-  public final int getArrayOffset(int rowId) {
+  public int getArrayOffset(int rowId) {
     return Platform.getInt(null, offsetData + 4 * rowId);
   }
 
   // APIs dealing with ByteArrays
   @Override
-  public final int putByteArray(int rowId, byte[] value, int offset, int 
length) {
+  public int putByteArray(int rowId, byte[] value, int offset, int length) {
     int result = arrayData().appendBytes(length, value, offset);
     Platform.putInt(null, lengthData + 4 * rowId, length);
     Platform.putInt(null, offsetData + 4 * rowId, result);
@@ -379,7 +379,7 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   }
 
   @Override
-  public final void loadBytes(ColumnVector.Array array) {
+  public void loadBytes(ColumnVector.Array array) {
     if (array.tmpByteArray.length < array.length) array.tmpByteArray = new 
byte[array.length];
     Platform.copyMemory(
         null, data + array.offset, array.tmpByteArray, 
Platform.BYTE_ARRAY_OFFSET, array.length);
@@ -388,12 +388,12 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void reserve(int requiredCapacity) {
+  public void reserve(int requiredCapacity) {
     if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
   }
 
   // Split out the slow path.
-  private final void reserveInternal(int newCapacity) {
+  private void reserveInternal(int newCapacity) {
     if (this.resultArray != null) {
       this.lengthData =
           Platform.reallocateMemory(lengthData, elementsAppended * 4, 
newCapacity * 4);

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 03160d1..5b671a7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -52,16 +52,16 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final long valuesNativeAddress() {
+  public long valuesNativeAddress() {
     throw new RuntimeException("Cannot get native address for on heap column");
   }
   @Override
-  public final long nullsNativeAddress() {
+  public long nullsNativeAddress() {
     throw new RuntimeException("Cannot get native address for on heap column");
   }
 
   @Override
-  public final void close() {
+  public void close() {
   }
 
   //
@@ -69,19 +69,19 @@ public final class OnHeapColumnVector extends ColumnVector {
   //
 
   @Override
-  public final void putNotNull(int rowId) {
+  public void putNotNull(int rowId) {
     nulls[rowId] = (byte)0;
   }
 
   @Override
-  public final void putNull(int rowId) {
+  public void putNull(int rowId) {
     nulls[rowId] = (byte)1;
     ++numNulls;
     anyNullsSet = true;
   }
 
   @Override
-  public final void putNulls(int rowId, int count) {
+  public void putNulls(int rowId, int count) {
     for (int i = 0; i < count; ++i) {
       nulls[rowId + i] = (byte)1;
     }
@@ -90,7 +90,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final void putNotNulls(int rowId, int count) {
+  public void putNotNulls(int rowId, int count) {
     if (!anyNullsSet) return;
     for (int i = 0; i < count; ++i) {
       nulls[rowId + i] = (byte)0;
@@ -98,7 +98,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final boolean getIsNull(int rowId) {
+  public boolean getIsNull(int rowId) {
     return nulls[rowId] == 1;
   }
 
@@ -107,12 +107,12 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putBoolean(int rowId, boolean value) {
+  public void putBoolean(int rowId, boolean value) {
     byteData[rowId] = (byte)((value) ? 1 : 0);
   }
 
   @Override
-  public final void putBooleans(int rowId, int count, boolean value) {
+  public void putBooleans(int rowId, int count, boolean value) {
     byte v = (byte)((value) ? 1 : 0);
     for (int i = 0; i < count; ++i) {
       byteData[i + rowId] = v;
@@ -120,7 +120,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final boolean getBoolean(int rowId) {
+  public boolean getBoolean(int rowId) {
     return byteData[rowId] == 1;
   }
 
@@ -131,24 +131,24 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putByte(int rowId, byte value) {
+  public void putByte(int rowId, byte value) {
     byteData[rowId] = value;
   }
 
   @Override
-  public final void putBytes(int rowId, int count, byte value) {
+  public void putBytes(int rowId, int count, byte value) {
     for (int i = 0; i < count; ++i) {
       byteData[i + rowId] = value;
     }
   }
 
   @Override
-  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+  public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, byteData, rowId, count);
   }
 
   @Override
-  public final byte getByte(int rowId) {
+  public byte getByte(int rowId) {
     if (dictionary == null) {
       return byteData[rowId];
     } else {
@@ -161,24 +161,24 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putShort(int rowId, short value) {
+  public void putShort(int rowId, short value) {
     shortData[rowId] = value;
   }
 
   @Override
-  public final void putShorts(int rowId, int count, short value) {
+  public void putShorts(int rowId, int count, short value) {
     for (int i = 0; i < count; ++i) {
       shortData[i + rowId] = value;
     }
   }
 
   @Override
-  public final void putShorts(int rowId, int count, short[] src, int srcIndex) 
{
+  public void putShorts(int rowId, int count, short[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, shortData, rowId, count);
   }
 
   @Override
-  public final short getShort(int rowId) {
+  public short getShort(int rowId) {
     if (dictionary == null) {
       return shortData[rowId];
     } else {
@@ -192,24 +192,24 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putInt(int rowId, int value) {
+  public void putInt(int rowId, int value) {
     intData[rowId] = value;
   }
 
   @Override
-  public final void putInts(int rowId, int count, int value) {
+  public void putInts(int rowId, int count, int value) {
     for (int i = 0; i < count; ++i) {
       intData[i + rowId] = value;
     }
   }
 
   @Override
-  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+  public void putInts(int rowId, int count, int[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, intData, rowId, count);
   }
 
   @Override
-  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+  public void putIntsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
     for (int i = 0; i < count; ++i) {
       intData[i + rowId] = Platform.getInt(src, srcOffset);;
@@ -219,7 +219,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final int getInt(int rowId) {
+  public int getInt(int rowId) {
     if (dictionary == null) {
       return intData[rowId];
     } else {
@@ -232,24 +232,24 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putLong(int rowId, long value) {
+  public void putLong(int rowId, long value) {
     longData[rowId] = value;
   }
 
   @Override
-  public final void putLongs(int rowId, int count, long value) {
+  public void putLongs(int rowId, int count, long value) {
     for (int i = 0; i < count; ++i) {
       longData[i + rowId] = value;
     }
   }
 
   @Override
-  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+  public void putLongs(int rowId, int count, long[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, longData, rowId, count);
   }
 
   @Override
-  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+  public void putLongsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
     for (int i = 0; i < count; ++i) {
       longData[i + rowId] = Platform.getLong(src, srcOffset);
@@ -259,7 +259,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public final long getLong(int rowId) {
+  public long getLong(int rowId) {
     if (dictionary == null) {
       return longData[rowId];
     } else {
@@ -272,26 +272,26 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putFloat(int rowId, float value) { floatData[rowId] = 
value; }
+  public void putFloat(int rowId, float value) { floatData[rowId] = value; }
 
   @Override
-  public final void putFloats(int rowId, int count, float value) {
+  public void putFloats(int rowId, int count, float value) {
     Arrays.fill(floatData, rowId, rowId + count, value);
   }
 
   @Override
-  public final void putFloats(int rowId, int count, float[] src, int srcIndex) 
{
+  public void putFloats(int rowId, int count, float[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, floatData, rowId, count);
   }
 
   @Override
-  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
+  public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
         floatData, Platform.DOUBLE_ARRAY_OFFSET + rowId * 4, count * 4);
   }
 
   @Override
-  public final float getFloat(int rowId) {
+  public float getFloat(int rowId) {
     if (dictionary == null) {
       return floatData[rowId];
     } else {
@@ -304,28 +304,28 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final void putDouble(int rowId, double value) {
+  public void putDouble(int rowId, double value) {
     doubleData[rowId] = value;
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, double value) {
+  public void putDoubles(int rowId, int count, double value) {
     Arrays.fill(doubleData, rowId, rowId + count, value);
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, double[] src, int 
srcIndex) {
+  public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
     System.arraycopy(src, srcIndex, doubleData, rowId, count);
   }
 
   @Override
-  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) 
{
+  public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
         Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
   }
 
   @Override
-  public final double getDouble(int rowId) {
+  public double getDouble(int rowId) {
     if (dictionary == null) {
       return doubleData[rowId];
     } else {
@@ -338,22 +338,22 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   //
 
   @Override
-  public final int getArrayLength(int rowId) {
+  public int getArrayLength(int rowId) {
     return arrayLengths[rowId];
   }
   @Override
-  public final int getArrayOffset(int rowId) {
+  public int getArrayOffset(int rowId) {
     return arrayOffsets[rowId];
   }
 
   @Override
-  public final void putArray(int rowId, int offset, int length) {
+  public void putArray(int rowId, int offset, int length) {
     arrayOffsets[rowId] = offset;
     arrayLengths[rowId] = length;
   }
 
   @Override
-  public final void loadBytes(ColumnVector.Array array) {
+  public void loadBytes(ColumnVector.Array array) {
     array.byteArray = byteData;
     array.byteArrayOffset = array.offset;
   }
@@ -363,7 +363,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   //
 
   @Override
-  public final int putByteArray(int rowId, byte[] value, int offset, int 
length) {
+  public int putByteArray(int rowId, byte[] value, int offset, int length) {
     int result = arrayData().appendBytes(length, value, offset);
     arrayOffsets[rowId] = result;
     arrayLengths[rowId] = length;
@@ -371,12 +371,12 @@ public final class OnHeapColumnVector extends 
ColumnVector {
   }
 
   @Override
-  public final void reserve(int requiredCapacity) {
+  public void reserve(int requiredCapacity) {
     if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
   }
 
   // Spilt this function out since it is the slow path.
-  private final void reserveInternal(int newCapacity) {
+  private void reserveInternal(int newCapacity) {
     if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
       int[] newLengths = new int[newCapacity];
       int[] newOffsets = new int[newCapacity];

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index ae9c8cc..189cc39 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -145,12 +145,13 @@ public class JavaApplySchemaSuite implements Serializable 
{
 
     Dataset<Row> df = sqlContext.createDataFrame(rowRDD, schema);
     df.registerTempTable("people");
-    List<String> actual = sqlContext.sql("SELECT * FROM 
people").toJavaRDD().map(new Function<Row, String>() {
-      @Override
-      public String call(Row row) {
-        return row.getString(0) + "_" + row.get(1);
-      }
-    }).collect();
+    List<String> actual = sqlContext.sql("SELECT * FROM people").toJavaRDD()
+      .map(new Function<Row, String>() {
+        @Override
+        public String call(Row row) {
+          return row.getString(0) + "_" + row.get(1);
+        }
+      }).collect();
 
     List<String> expected = new ArrayList<>(2);
     expected.add("Michael_29");

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index f3c5a86..cf764c6 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -220,7 +220,8 @@ public class JavaDataFrameSuite {
     StructType schema1 = StructType$.MODULE$.apply(fields1);
     Assert.assertEquals(0, schema1.fieldIndex("id"));
 
-    List<StructField> fields2 = Arrays.asList(new StructField("id", 
DataTypes.StringType, true, Metadata.empty()));
+    List<StructField> fields2 =
+        Arrays.asList(new StructField("id", DataTypes.StringType, true, 
Metadata.empty()));
     StructType schema2 = StructType$.MODULE$.apply(fields2);
     Assert.assertEquals(0, schema2.fieldIndex("id"));
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java 
b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java
index fc24600..a8cbd4f 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/Complex.java
@@ -39,7 +39,7 @@ import java.util.BitSet;
  * does not contain union fields that are not supported by Spark SQL.
  */
 
-@SuppressWarnings({"ALL", "unchecked"})
+@SuppressWarnings("all")
 public class Complex implements org.apache.thrift.TBase<Complex, 
Complex._Fields>, java.io.Serializable, Cloneable {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("Complex");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 66448fd..01f0c4d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -349,7 +349,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
     JavaDStream<Integer> reducedWindowed;
     if (withInverse) {
       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
-                                              new IntegerDifference(), new 
Duration(2000), new Duration(1000));
+                                              new IntegerDifference(),
+                                              new Duration(2000),
+                                              new Duration(1000));
     } else {
       reducedWindowed = stream.reduceByWindow(new IntegerSum(),
                                               new Duration(2000), new 
Duration(1000));
@@ -497,7 +499,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
 
     pairStream.transformToPair(
         new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, 
String>>() {
-          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in, Time time) {
+          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in,
+                                                            Time time) {
             return null;
           }
         }
@@ -606,7 +609,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         pairStream1,
         new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, 
JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2, Time time) {
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2,
+                                      Time time) {
             return null;
           }
         }
@@ -616,7 +620,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         stream2,
         new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, 
JavaRDD<String> rdd2, Time time) {
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, 
JavaRDD<String> rdd2,
+                                                  Time time) {
             return null;
           }
         }
@@ -624,9 +629,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     stream1.transformWithToPair(
         pairStream1,
-        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, 
JavaPairRDD<Double, Double>>() {
+        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time,
+          JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2, Time time) {
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1,
+                                                  JavaPairRDD<String, Integer> 
rdd2,
+                                                  Time time) {
             return null;
           }
         }
@@ -636,7 +644,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         stream2,
         new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, 
JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, 
JavaRDD<String> rdd2, Time time) {
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, 
JavaRDD<String> rdd2,
+                                      Time time) {
             return null;
           }
         }
@@ -644,9 +653,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     pairStream1.transformWith(
         pairStream1,
-        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
Integer>, Time, JavaRDD<Double>>() {
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
Integer>, Time,
+          JavaRDD<Double>>() {
           @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2, Time time) {
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1,
+                                      JavaPairRDD<String, Integer> rdd2,
+                                      Time time) {
             return null;
           }
         }
@@ -654,9 +666,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     pairStream1.transformWithToPair(
         stream2,
-        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
+        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time,
+          JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1, JavaRDD<String> rdd2, Time time) {
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
+                                                  JavaRDD<String> rdd2,
+                                                  Time time) {
             return null;
           }
         }
@@ -664,9 +679,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     pairStream1.transformWithToPair(
         pairStream2,
-        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, 
Character>, Time, JavaPairRDD<Double, Double>>() {
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, 
Character>, Time,
+          JavaPairRDD<Double, Double>>() {
           @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1, JavaPairRDD<Double, Character> rdd2, Time time) {
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
+                                                  JavaPairRDD<Double, 
Character> rdd2,
+                                                  Time time) {
             return null;
           }
         }
@@ -722,13 +740,16 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       listOfDStreams2,
       new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, 
Tuple2<Integer, String>>>() {
         @Override
-        public JavaPairRDD<Integer, Tuple2<Integer, String>> 
call(List<JavaRDD<?>> listOfRDDs, Time time) {
+        public JavaPairRDD<Integer, Tuple2<Integer, String>> 
call(List<JavaRDD<?>> listOfRDDs,
+                                                                  Time time) {
           Assert.assertEquals(3, listOfRDDs.size());
           JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
           JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
-          JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, 
String>>)listOfRDDs.get(2);
+          JavaRDD<Tuple2<Integer, String>> rdd3 =
+            (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
           JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
-          PairFunction<Integer, Integer, Integer> mapToTuple = new 
PairFunction<Integer, Integer, Integer>() {
+          PairFunction<Integer, Integer, Integer> mapToTuple =
+            new PairFunction<Integer, Integer, Integer>() {
             @Override
             public Tuple2<Integer, Integer> call(Integer i) {
               return new Tuple2<>(i, i);
@@ -739,7 +760,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
       }
     );
     JavaTestUtils.attachTestOutputStream(transformed2);
-    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+      JavaTestUtils.runStreams(ssc, 2, 2);
     Assert.assertEquals(expected, result);
   }
 
@@ -981,7 +1003,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             new Tuple2<>(3, "new york"),
             new Tuple2<>(1, "new york")));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
         new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@@ -1014,7 +1037,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             new Tuple2<>(3, "new york"),
             new Tuple2<>(1, "new york")));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
         new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, 
String>() {
@@ -1044,7 +1068,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             Arrays.asList(1, 3, 4, 1),
             Arrays.asList(5, 5, 3, 1));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
     JavaDStream<Integer> reversed = pairStream.map(
         new Function<Tuple2<String, Integer>, Integer>() {
@@ -1116,7 +1141,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
             new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
 
-    JavaDStream<Tuple2<String, String>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, String>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Iterable<String>> grouped = 
pairStream.groupByKey();
@@ -1241,7 +1267,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       )
     );
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Iterable<Integer>> groupWindowed =
@@ -1255,7 +1282,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     }
   }
 
-  private static Set<Tuple2<String, HashSet<Integer>>> 
convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+  private static Set<Tuple2<String, HashSet<Integer>>>
+    convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
     List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
     for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
       newListOfTuples.add(convert(tuple));
@@ -1280,7 +1308,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<>("california", 10),
                       new Tuple2<>("new york", 4)));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> reduceWindowed =
@@ -1304,7 +1333,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<>("california", 14),
                       new Tuple2<>("new york", 9)));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
@@ -1347,7 +1377,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<>("california", 15),
                       new Tuple2<>("new york", 11)));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
@@ -1383,7 +1414,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<>("california", 10),
                       new Tuple2<>("new york", 4)));
 
-    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> reduceWindowed =
@@ -1630,19 +1662,27 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, stringStringKVStream2, 1);
     JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
 
-    JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> 
grouped = pairStream1.cogroup(pairStream2);
+    JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> 
grouped =
+        pairStream1.cogroup(pairStream2);
     JavaTestUtils.attachTestOutputStream(grouped);
-    List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
result = JavaTestUtils.runStreams(ssc, 2, 2);
+    List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
result =
+        JavaTestUtils.runStreams(ssc, 2, 2);
 
     Assert.assertEquals(expected.size(), result.size());
-    Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
resultItr = result.iterator();
-    Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> 
expectedItr = expected.iterator();
+    Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
resultItr =
+        result.iterator();
+    Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> 
expectedItr =
+        expected.iterator();
     while (resultItr.hasNext() && expectedItr.hasNext()) {
-      Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> 
resultElements = resultItr.next().iterator();
-      Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> 
expectedElements = expectedItr.next().iterator();
+      Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> 
resultElements =
+          resultItr.next().iterator();
+      Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> 
expectedElements =
+          expectedItr.next().iterator();
       while (resultElements.hasNext() && expectedElements.hasNext()) {
-        Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> 
resultElement = resultElements.next();
-        Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = 
expectedElements.next();
+        Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> 
resultElement =
+            resultElements.next();
+        Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement =
+            expectedElements.next();
         Assert.assertEquals(expectedElement._1(), resultElement._1());
         equalIterable(expectedElement._2()._1(), resultElement._2()._1());
         equalIterable(expectedElement._2()._2(), resultElement._2()._2());
@@ -1719,7 +1759,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, stringStringKVStream2, 1);
     JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
 
-    JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = 
pairStream1.leftOuterJoin(pairStream2);
+    JavaPairDStream<String, Tuple2<String, Optional<String>>> joined =
+        pairStream1.leftOuterJoin(pairStream2);
     JavaDStream<Long> counted = joined.count();
     JavaTestUtils.attachTestOutputStream(counted);
     List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);

http://git-wip-us.apache.org/repos/asf/spark/blob/20fd2541/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index 67b2a07..ff0be82 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -77,12 +77,14 @@ public class JavaStreamingListenerAPISuite extends 
JavaStreamingListener {
   }
 
   @Override
-  public void 
onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted 
outputOperationStarted) {
+  public void onOutputOperationStarted(
+      JavaStreamingListenerOutputOperationStarted outputOperationStarted) {
     super.onOutputOperationStarted(outputOperationStarted);
   }
 
   @Override
-  public void 
onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted 
outputOperationCompleted) {
+  public void onOutputOperationCompleted(
+      JavaStreamingListenerOutputOperationCompleted outputOperationCompleted) {
     super.onOutputOperationCompleted(outputOperationCompleted);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to