[FLINK-3700] [core] Remove Guava dependency from flink-core

Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Files by simple util methods arount java.nio.Files
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils 
(with attribution comments)

Some util classes where moved from flink-runtime to flink-core.

This closes #1854


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/760a0d9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/760a0d9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/760a0d9e

Branch: refs/heads/master
Commit: 760a0d9e7fd9fa88e9f7408b137d78df384d764f
Parents: 1b93b32
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 15:18:32 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Apr 15 19:38:14 2016 +0200

----------------------------------------------------------------------
 .../storm/tests/StormFieldsGroupingITCase.java  |   2 +-
 flink-core/pom.xml                              |   6 -
 .../java/org/apache/flink/api/common/Plan.java  |   5 +-
 .../org/apache/flink/api/common/TaskInfo.java   |   4 +-
 .../util/AbstractRuntimeUDFContext.java         |   9 +-
 .../api/common/io/DelimitedInputFormat.java     |  17 +-
 .../flink/api/common/io/FileInputFormat.java    |  28 +--
 .../api/common/io/GenericCsvInputFormat.java    |  43 ++--
 .../common/operators/GenericDataSinkBase.java   |  21 +-
 .../apache/flink/api/common/operators/Keys.java |  19 +-
 .../base/GroupCombineOperatorBase.java          |   6 +-
 .../operators/base/GroupReduceOperatorBase.java |   6 +-
 .../api/common/operators/util/FieldList.java    |   6 +-
 .../operators/util/UserCodeObjectWrapper.java   |   7 +-
 .../api/common/typeinfo/BasicArrayTypeInfo.java |  10 +-
 .../api/common/typeinfo/BasicTypeInfo.java      |   8 +-
 .../api/common/typeinfo/FractionalTypeInfo.java |  19 +-
 .../api/common/typeinfo/IntegerTypeInfo.java    |  25 ++-
 .../api/common/typeinfo/NumericTypeInfo.java    |  29 +--
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  22 +-
 .../api/common/typeutils/CompositeType.java     |   4 +-
 .../common/typeutils/base/EnumSerializer.java   |   5 +-
 .../typeutils/base/GenericArraySerializer.java  |   7 +-
 .../flink/api/java/typeutils/EnumTypeInfo.java  |   7 +-
 .../api/java/typeutils/GenericTypeInfo.java     |   4 +-
 .../api/java/typeutils/ObjectArrayTypeInfo.java |  15 +-
 .../flink/api/java/typeutils/PojoField.java     |   7 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  40 ++--
 .../flink/api/java/typeutils/TupleTypeInfo.java |  32 ++-
 .../api/java/typeutils/TupleTypeInfoBase.java   |   6 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   8 +-
 .../flink/api/java/typeutils/ValueTypeInfo.java |  12 +-
 .../api/java/typeutils/WritableTypeInfo.java    |  11 +-
 .../java/typeutils/runtime/AvroSerializer.java  |   7 +-
 .../runtime/CopyableValueSerializer.java        |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  10 +-
 .../typeutils/runtime/TupleSerializerBase.java  |   6 +-
 .../java/typeutils/runtime/ValueSerializer.java |   5 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |   8 +-
 .../org/apache/flink/types/StringValue.java     |  14 +-
 .../java/org/apache/flink/util/FileUtils.java   |  91 ++++++++
 .../java/org/apache/flink/util/IOUtils.java     | 223 +++++++++++++++++++
 .../java/org/apache/flink/util/MathUtils.java   | 175 +++++++++++++++
 .../java/org/apache/flink/util/NetUtils.java    |  86 ++++++-
 .../org/apache/flink/util/Preconditions.java    |  57 ++++-
 .../org/apache/flink/util/UnionIterator.java    | 102 +++++++++
 .../org/apache/flink/util/XORShiftRandom.java   |   4 +-
 .../base/OuterJoinOperatorBaseTest.java         |   4 +-
 .../flink/api/java/tuple/TupleGenerator.java    |   7 +-
 .../java/typeutils/PojoTypeExtractionTest.java  |  13 +-
 .../typeutils/runtime/PojoSerializerTest.java   |   8 +-
 .../runtime/PojoSubclassSerializerTest.java     |   5 +-
 .../SubclassFromInterfaceSerializerTest.java    |   5 +-
 .../apache/flink/testutils/junit/RetryRule.java |   5 +-
 .../org/apache/flink/util/MathUtilTest.java     | 102 +++++++++
 .../org/apache/flink/util/NetUtilsTest.java     |   4 -
 .../apache/flink/util/UnionIteratorTest.java    | 143 ++++++++++++
 .../apache/flink/runtime/blob/BlobUtils.java    |   2 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |   2 +-
 .../checkpoint/FileSystemStateStore.java        |   2 +-
 .../flink/runtime/filecache/FileCache.java      |   2 +-
 .../runtime/io/disk/FileChannelInputView.java   |   2 +-
 .../runtime/io/disk/RandomAccessInputView.java  |   2 +-
 .../runtime/io/disk/RandomAccessOutputView.java |   2 +-
 .../io/disk/SeekableFileChannelInputView.java   |   2 +-
 .../io/disk/SimpleCollectingOutputView.java     |   2 +-
 .../flink/runtime/memory/MemoryManager.java     |   2 +-
 .../operators/hash/CompactingHashTable.java     |   2 +-
 .../runtime/operators/hash/HashPartition.java   |   2 +-
 .../operators/hash/MutableHashTable.java        |   2 +-
 .../operators/shipping/OutputEmitter.java       |   2 +-
 .../apache/flink/runtime/util/FileUtils.java    |  61 -----
 .../org/apache/flink/runtime/util/IOUtils.java  | 213 ------------------
 .../apache/flink/runtime/util/MathUtils.java    | 176 ---------------
 .../flink/runtime/util/UnionIterator.java       | 100 ---------
 .../FileSystemStateStorageHelper.java           |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../runtime/testutils/CommonTestUtils.java      |   2 +-
 .../apache/flink/runtime/util/MathUtilTest.java | 101 ---------
 .../flink/runtime/util/UnionIteratorTest.java   | 142 ------------
 .../source/SocketTextStreamFunction.java        |   2 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   2 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |   2 +-
 .../runtime/operators/windowing/KeyMap.java     |   2 +-
 .../runtime/partitioner/HashPartitioner.java    |   2 +-
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../streaming/api/StreamingOperatorsITCase.java |   2 +-
 .../api/scala/StreamingOperatorsITCase.scala    |   2 +-
 88 files changed, 1294 insertions(+), 1087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 1875ecb..b43b24d 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -21,7 +21,7 @@ import backtype.storm.Config;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.storm.api.FlinkLocalCluster;
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 59b620b..eb55bdd 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -70,12 +70,6 @@ under the License.
                        <artifactId>${shading-artifact.name}</artifactId>
                        <version>${project.version}</version>
                </dependency>
-               
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
 
                <!-- test depedencies -->
                

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 30f2c2f..0235af0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.api.common;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
@@ -43,6 +40,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 /**
  * This class represents Flink programs, in the form of dataflow plans.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 6482cde..ac87e74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common;
 
 import org.apache.flink.annotation.Internal;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Encapsulates task-specific information: name, index of subtask, parallelism 
and attempt number.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index afe9f77..74b78df 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Future;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -45,6 +44,8 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A standalone implementation of the {@link RuntimeContext}, created by 
runtime UDF operators.
  */
@@ -66,11 +67,11 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
                                                                                
ExecutionConfig executionConfig,
                                                                                
Map<String, Accumulator<?,?>> accumulators,
                                                                                
Map<String, Future<Path>> cpTasks) {
-               this.taskInfo = Preconditions.checkNotNull(taskInfo);
+               this.taskInfo = checkNotNull(taskInfo);
                this.userCodeClassLoader = userCodeClassLoader;
                this.executionConfig = executionConfig;
-               this.distributedCache = new 
DistributedCache(Preconditions.checkNotNull(cpTasks));
-               this.accumulators = Preconditions.checkNotNull(accumulators);
+               this.distributedCache = new 
DistributedCache(checkNotNull(cpTasks));
+               this.accumulators = checkNotNull(accumulators);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index f6b6d49..243e2a4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
 import org.apache.flink.annotation.Public;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -33,7 +28,12 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 
 /**
  * Base implementation for input formats that split the input at a delimiter 
into records.
@@ -53,6 +53,9 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
         * The log.
         */
        private static final Logger LOG = 
LoggerFactory.getLogger(DelimitedInputFormat.class);
+
+       /** The default charset  to convert strings to bytes */
+       private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
        
        /**
         * The default read buffer size = 1MB.
@@ -185,7 +188,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        }
        
        public void setDelimiter(String delimiter) {
-               this.delimiter = delimiter.getBytes(Charsets.UTF_8);
+               this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
        }
        
        public int getLineLengthLimit() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index c4cd2b3..fd69cc3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -18,22 +18,10 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Public;
 import 
org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
 import 
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
 import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -45,6 +33,20 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The base class for {@link RichInputFormat}s that read from files. For 
specific input types the
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
@@ -143,7 +145,7 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         * @return the extension of the file name or {@code null} if there is 
no extension.
         */
        protected static String extractFileExtension(String fileName) {
-               Preconditions.checkNotNull(fileName);
+               checkNotNull(fileName);
                int lastPeriodIndex = fileName.lastIndexOf('.');
                if (lastPeriodIndex < 0){
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index cb20f81..31d42ff 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -30,6 +25,7 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,13 +37,20 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT> {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(GenericCsvInputFormat.class);
-       
        private static final long serialVersionUID = 1L;
        
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(GenericCsvInputFormat.class);
+
+       /** The default charset  to convert strings to bytes */
+       private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+       
        private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
        
        private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -127,7 +130,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        }
 
        public void setCommentPrefix(String commentPrefix) {
-               setCommentPrefix(commentPrefix, Charsets.UTF_8);
+               setCommentPrefix(commentPrefix, UTF_8_CHARSET);
        }
 
        public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
@@ -171,7 +174,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        }
 
        public void setFieldDelimiter(String delimiter) {
-               this.fieldDelim = delimiter.getBytes(Charsets.UTF_8);
+               this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
        }
 
        public boolean isLenient() {
@@ -247,9 +250,9 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        }
        
        protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] 
fieldTypes) {
-               Preconditions.checkNotNull(sourceFieldIndices);
-               Preconditions.checkNotNull(fieldTypes);
-               Preconditions.checkArgument(sourceFieldIndices.length == 
fieldTypes.length,
+               checkNotNull(sourceFieldIndices);
+               checkNotNull(fieldTypes);
+               checkArgument(sourceFieldIndices.length == fieldTypes.length,
                        "Number of field indices and field types must match.");
 
                for (int i : sourceFieldIndices) {
@@ -258,7 +261,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                        }
                }
 
-               int largestFieldIndex = Ints.max(sourceFieldIndices);
+               int largestFieldIndex = max(sourceFieldIndices);
                this.fieldIncluded = new boolean[largestFieldIndex + 1];
                ArrayList<Class<?>> types = new ArrayList<Class<?>>();
 
@@ -280,8 +283,8 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        }
        
        protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] 
fieldTypes) {
-               Preconditions.checkNotNull(includedMask);
-               Preconditions.checkNotNull(fieldTypes);
+               checkNotNull(includedMask);
+               checkNotNull(fieldTypes);
 
                ArrayList<Class<?>> types = new ArrayList<Class<?>>();
 
@@ -530,4 +533,14 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                        lastPos = positions[i];
                }
        }
+       
+       private static int max(int[] ints) {
+               checkArgument(ints.length > 0);
+               
+               int max = ints[0];
+               for (int i = 1 ; i < ints.length; i++) {
+                       max = Math.max(max, ints[i]);
+               }
+               return max;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index 33f11f3..1f3875d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.List;
@@ -39,7 +38,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.util.Visitor;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Operator for nodes that act as data sinks, storing the data they receive.
@@ -66,7 +65,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
        public GenericDataSinkBase(OutputFormat<IN> f, 
UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
                super(operatorInfo, name);
 
-               Preconditions.checkNotNull(f, "The OutputFormat may not be 
null.");
+               checkNotNull(f, "The OutputFormat may not be null.");
                this.formatWrapper = new 
UserCodeObjectWrapper<OutputFormat<IN>>(f);
        }
 
@@ -79,8 +78,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
         */
        public GenericDataSinkBase(UserCodeWrapper<? extends OutputFormat<IN>> 
f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
                super(operatorInfo, name);
-               Preconditions.checkNotNull(f, "The OutputFormat class may not 
be null.");
-               this.formatWrapper = f;
+               this.formatWrapper = checkNotNull(f, "The OutputFormat class 
may not be null.");
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -100,8 +98,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
         * @param input The operator to use as the input.
         */
        public void setInput(Operator<IN> input) {
-               Preconditions.checkNotNull(input, "The input may not be null.");
-               this.input = input;
+               this.input = checkNotNull(input, "The input may not be null.");
        }
 
        /**
@@ -112,7 +109,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
         */
        @Deprecated
        public void setInputs(Operator<IN>... inputs) {
-               Preconditions.checkNotNull(inputs, "The inputs may not be 
null.");
+               checkNotNull(inputs, "The inputs may not be null.");
                this.input = Operator.createUnionCascade(inputs);
        }
 
@@ -124,7 +121,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
         */
        @Deprecated
        public void setInputs(List<Operator<IN>> inputs) {
-               Preconditions.checkNotNull(inputs, "The inputs may not be 
null.");
+               checkNotNull(inputs, "The inputs may not be null.");
                this.input = Operator.createUnionCascade(inputs);
        }
 
@@ -136,7 +133,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
         */
        @Deprecated
        public void addInput(Operator<IN>... inputs) {
-               Preconditions.checkNotNull(inputs, "The input may not be 
null.");
+               checkNotNull(inputs, "The input may not be null.");
                this.input = Operator.createUnionCascade(this.input, inputs);
        }
 
@@ -149,7 +146,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
        @SuppressWarnings("unchecked")
        @Deprecated
        public void addInputs(List<? extends Operator<IN>> inputs) {
-               Preconditions.checkNotNull(inputs, "The inputs may not be 
null.");
+               checkNotNull(inputs, "The inputs may not be null.");
                this.input = createUnionCascade(this.input, (Operator<IN>[]) 
inputs.toArray(new Operator[inputs.size()]));
        }
 
@@ -259,7 +256,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
                format.configure(this.parameters);
 
                if(format instanceof RichOutputFormat){
-                       ((RichOutputFormat) format).setRuntimeContext(ctx);
+                       ((RichOutputFormat<?>) format).setRuntimeContext(ctx);
                }
                format.open(0, 1);
                for (IN element : inputData) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
index abe41af..0b771e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -33,7 +30,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
 public abstract class Keys<T> {
@@ -232,7 +233,8 @@ public abstract class Keys<T> {
                        } else {
                                rangeCheckFields(keyPositions, type.getArity() 
- 1);
                        }
-                       Preconditions.checkArgument(keyPositions.length > 0, 
"Grouping fields can not be empty at this point");
+                       
+                       checkArgument(keyPositions.length > 0, "Grouping fields 
can not be empty at this point");
 
                        // extract key field types
                        CompositeType<T> cType = (CompositeType<T>)type;
@@ -266,7 +268,7 @@ public abstract class Keys<T> {
                 * Create String-based (nested) field expression keys on a 
composite type.
                 */
                public ExpressionKeys(String[] keyExpressions, 
TypeInformation<T> type) {
-                       Preconditions.checkNotNull(keyExpressions, "Field 
expression cannot be null.");
+                       checkNotNull(keyExpressions, "Field expression cannot 
be null.");
 
                        this.keyFields = new ArrayList<>(keyExpressions.length);
 
@@ -375,8 +377,7 @@ public abstract class Keys<T> {
 
                @Override
                public String toString() {
-                       Joiner join = Joiner.on('.');
-                       return "ExpressionKeys: " + join.join(keyFields);
+                       return "ExpressionKeys: " + StringUtils.join(keyFields, 
'.');
                }
 
                public static boolean isSortKey(int fieldPos, 
TypeInformation<?> type) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index b660506..15a0f5a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.ArrayUtils;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -43,6 +43,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base operator for the combineGroup transformation. It receives the UDF 
GroupCombineFunction as an input.
  * This class is later processed by the compiler to generate the plan.
@@ -109,7 +111,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends 
GroupCombineFunction<I
                }
 
                if(sortColumns.length == 0) { // => all reduce. No comparator
-                       Preconditions.checkArgument(sortOrderings.length == 0);
+                       checkArgument(sortOrderings.length == 0);
                } else {
                        final TypeComparator<IN> sortComparator = 
getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 0cc0209..0794a77 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -41,13 +41,13 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import com.google.common.base.Preconditions;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
@@ -181,7 +181,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
                }
 
                if(sortColumns.length == 0) { // => all reduce. No comparator
-                       Preconditions.checkArgument(sortOrderings.length == 0);
+                       checkArgument(sortOrderings.length == 0);
                } else {
                        final TypeComparator<IN> sortComparator = 
getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
                        Collections.sort(inputData, new Comparator<IN>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index c69c875..15a993c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.util;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Immutable ordered list of fields IDs.
  */
@@ -44,7 +44,7 @@ public class FieldList extends FieldSet {
        }
        
        public FieldList(Integer fieldId) {
-               
super(Collections.singletonList(Preconditions.checkNotNull(fieldId, "The fields 
ID must not be null.")));
+               super(Collections.singletonList(checkNotNull(fieldId, "The 
fields ID must not be null.")));
        }
        
        public FieldList(int... columnIndexes) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
index 890caac..5d75c95 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/UserCodeObjectWrapper.java
@@ -27,7 +27,8 @@ import java.lang.reflect.Modifier;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.NonSerializableUserCodeException;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This holds an actual object containing user defined code.
@@ -39,8 +40,8 @@ public class UserCodeObjectWrapper<T> implements 
UserCodeWrapper<T> {
        private final T userCodeObject;
        
        public UserCodeObjectWrapper(T userCodeObject) {
-               Preconditions.checkNotNull(userCodeObject, "The user code 
object may not be null.");
-               Preconditions.checkArgument(userCodeObject instanceof 
Serializable, "User code object is not serializable: " + 
userCodeObject.getClass().getName());
+               checkNotNull(userCodeObject, "The user code object may not be 
null.");
+               checkArgument(userCodeObject instanceof Serializable, "User 
code object is not serializable: " + userCodeObject.getClass().getName());
                
                this.userCodeObject = userCodeObject;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 2c61fb2..25b2850 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -31,6 +30,8 @@ import 
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Public
 public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 
@@ -51,11 +52,10 @@ public final class BasicArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        private final Class<T> arrayClass;
        private final TypeInformation<C> componentInfo;
-
-       @SuppressWarnings("unchecked")
+       
        private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> 
componentInfo) {
-               this.arrayClass = Preconditions.checkNotNull(arrayClass);
-               this.componentInfo = Preconditions.checkNotNull(componentInfo);
+               this.arrayClass = checkNotNull(arrayClass);
+               this.componentInfo = checkNotNull(componentInfo);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 4eb70c1..c1d5605 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -54,6 +53,7 @@ import 
org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Type information for primitive types (int, long, double, byte, ...), 
String, Date, and Void.
@@ -87,9 +87,9 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
        
        
        protected BasicTypeInfo(Class<T> clazz, Class<?>[] 
possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends 
TypeComparator<T>> comparatorClass) {
-               this.clazz = Preconditions.checkNotNull(clazz);
-               this.possibleCastTargetTypes = 
Preconditions.checkNotNull(possibleCastTargetTypes);
-               this.serializer = Preconditions.checkNotNull(serializer);
+               this.clazz = checkNotNull(clazz);
+               this.possibleCastTargetTypes = 
checkNotNull(possibleCastTargetTypes);
+               this.serializer = checkNotNull(serializer);
                // comparator can be null as in VOID_TYPE_INFO
                this.comparatorClass = comparatorClass;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
index aa22ac6..b4e53d4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric fractional primitive types (double, float).
@@ -34,15 +35,15 @@ public class FractionalTypeInfo<T> extends 
NumericTypeInfo<T> {
 
        private static final long serialVersionUID = 554334260950199994L;
 
-       private static final Set<Class<?>> fractionalTypes = 
Sets.<Class<?>>newHashSet(
-                       Double.class,
-                       Float.class
-       );
+       private static final HashSet<Class<?>> fractionalTypes = new 
HashSet<Class<?>>(
+                       Arrays.asList(
+                               Double.class,
+                               Float.class));
 
        protected FractionalTypeInfo(Class<T> clazz, Class<?>[] 
possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends 
TypeComparator<T>> comparatorClass) {
                super(clazz, possibleCastTargetTypes, serializer, 
comparatorClass);
 
-               Preconditions.checkArgument(fractionalTypes.contains(clazz), 
"The given class " +
-                       clazz.getSimpleName() + " is not a fractional type.");
+               checkArgument(fractionalTypes.contains(clazz), 
+                               "The given class %s is not a fractional type.", 
clazz.getSimpleName());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
index bff3ab7..02b416a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric integer primitive types: int, long, byte, 
short, character.
@@ -34,18 +35,18 @@ public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
 
        private static final long serialVersionUID = -8068827354966766955L;
 
-       private static final Set<Class<?>> integerTypes = 
Sets.<Class<?>>newHashSet(
-                       Integer.class,
-                       Long.class,
-                       Byte.class,
-                       Short.class,
-                       Character.class
-       );
+       private static final HashSet<Class<?>> integerTypes = new 
HashSet<Class<?>>(
+                       Arrays.asList(
+                               Integer.class,
+                               Long.class,
+                               Byte.class,
+                               Short.class,
+                               Character.class));
 
        protected IntegerTypeInfo(Class<T> clazz, Class<?>[] 
possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends 
TypeComparator<T>> comparatorClass) {
                super(clazz, possibleCastTargetTypes, serializer, 
comparatorClass);
 
-               Preconditions.checkArgument(integerTypes.contains(clazz), "The 
given class " +
-               clazz.getSimpleName() + " is not a integer type.");
+               checkArgument(integerTypes.contains(clazz),
+                               "The given class %s is not a integer type.", 
clazz.getSimpleName());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
index 6969520..fea8abe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Type information for numeric primitive types: int, long, double, byte, 
short, float, char.
@@ -34,21 +35,21 @@ public abstract class NumericTypeInfo<T> extends 
BasicTypeInfo<T> {
 
        private static final long serialVersionUID = -5937777910658986986L;
 
-       private static final Set<Class<?>> numericalTypes = 
Sets.<Class<?>>newHashSet(
-                       Integer.class,
-                       Long.class,
-                       Double.class,
-                       Byte.class,
-                       Short.class,
-                       Float.class,
-                       Character.class
-       );
+       private static final HashSet<Class<?>> numericalTypes = new 
HashSet<Class<?>>(
+                       Arrays.asList(
+                               Integer.class,
+                               Long.class,
+                               Double.class,
+                               Byte.class,
+                               Short.class,
+                               Float.class,
+                               Character.class));
 
        protected NumericTypeInfo(Class<T> clazz, Class<?>[] 
possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends
                        TypeComparator<T>> comparatorClass) {
                super(clazz, possibleCastTargetTypes, serializer, 
comparatorClass);
 
-               Preconditions.checkArgument(numericalTypes.contains(clazz), 
"The given class " +
-                               clazz.getSimpleName() + " is not a numerical 
type.");
+               checkArgument(numericalTypes.contains(clazz), 
+                               "The given class %s is not a numerical type", 
clazz.getSimpleName());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 2c75458..1c6ce00 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +41,13 @@ import 
org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator
 import 
org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import 
org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for arrays of primitive types (int, long, double, 
...).
  * Supports the creation of dedicated efficient serializers for these types.
@@ -83,11 +85,11 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
         * @param comparatorClass The class of the array comparator
         */
        private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> 
serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
-               this.arrayClass = Preconditions.checkNotNull(arrayClass);
-               this.serializer = Preconditions.checkNotNull(serializer);
-               this.comparatorClass = 
Preconditions.checkNotNull(comparatorClass);
+               this.arrayClass = checkNotNull(arrayClass);
+               this.serializer = checkNotNull(serializer);
+               this.comparatorClass = checkNotNull(comparatorClass);
 
-               Preconditions.checkArgument(
+               checkArgument(
                        arrayClass.isArray() && 
arrayClass.getComponentType().isPrimitive(),
                        "Class must represent an array of primitives");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 19b6eaf..4bf17ea 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base type information class for Tuple and Pojo types
@@ -44,7 +44,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
 
        @PublicEvolving
        public CompositeType(Class<T> typeClass) {
-               this.typeClass = Preconditions.checkNotNull(typeClass);
+               this.typeClass = checkNotNull(typeClass);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index c64e399..21a6ea0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -22,12 +22,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.lang.reflect.Method;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> 
{
 
@@ -38,7 +39,7 @@ public final class EnumSerializer<T extends Enum<T>> extends 
TypeSerializer<T> {
        private final Class<T> enumClass;
 
        public EnumSerializer(Class<T> enumClass) {
-               this.enumClass = Preconditions.checkNotNull(enumClass);
+               this.enumClass = checkNotNull(enumClass);
                this.values = createValues(enumClass);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index eb6e708..f35d71b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -21,12 +21,13 @@ package org.apache.flink.api.common.typeutils.base;
 import java.io.IOException;
 import java.lang.reflect.Array;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A serializer for arrays of objects.
  * 
@@ -45,8 +46,8 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
        
        
        public GenericArraySerializer(Class<C> componentClass, 
TypeSerializer<C> componentSerializer) {
-               this.componentClass = 
Preconditions.checkNotNull(componentClass);
-               this.componentSerializer = 
Preconditions.checkNotNull(componentSerializer);
+               this.componentClass = checkNotNull(componentClass);
+               this.componentSerializer = checkNotNull(componentSerializer);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index aec3c1d..61d9ae4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumComparator;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.annotation.Public;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link TypeInformation} for java enumeration types. 
@@ -43,7 +44,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
 
        @PublicEvolving
        public EnumTypeInfo(Class<T> typeClass) {
-               Preconditions.checkNotNull(typeClass, "Enum type class must not 
be null.");
+               checkNotNull(typeClass, "Enum type class must not be null.");
 
                if (!Enum.class.isAssignableFrom(typeClass) ) {
                        throw new IllegalArgumentException("EnumTypeInfo can 
only be used for subclasses of " + Enum.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 0cca8bd..bc4e87a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Public
 public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
@@ -39,7 +39,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
 
        @PublicEvolving
        public GenericTypeInfo(Class<T> typeClass) {
-               this.typeClass = Preconditions.checkNotNull(typeClass);
+               this.typeClass = checkNotNull(typeClass);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 1e8fbe2..2edb3b9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 @Public
 public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@@ -38,8 +39,8 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        private final TypeInformation<C> componentInfo;
 
        private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> 
componentInfo) {
-               this.arrayType = Preconditions.checkNotNull(arrayType);
-               this.componentInfo = Preconditions.checkNotNull(componentInfo);
+               this.arrayType = checkNotNull(arrayType);
+               this.componentInfo = checkNotNull(componentInfo);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -128,9 +129,9 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        @PublicEvolving
        public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> 
arrayClass, TypeInformation<C> componentInfo) {
-               Preconditions.checkNotNull(arrayClass);
-               Preconditions.checkNotNull(componentInfo);
-               Preconditions.checkArgument(arrayClass.isArray(), "Class " + 
arrayClass + " must be an array.");
+               checkNotNull(arrayClass);
+               checkNotNull(componentInfo);
+               checkArgument(arrayClass.isArray(), "Class " + arrayClass + " 
must be an array.");
 
                return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
        }
@@ -146,7 +147,7 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        @SuppressWarnings("unchecked")
        @PublicEvolving
        public static <T, C> ObjectArrayTypeInfo<T, C> 
getInfoFor(TypeInformation<C> componentInfo) {
-               Preconditions.checkNotNull(componentInfo);
+               checkNotNull(componentInfo);
 
                return new ObjectArrayTypeInfo<T, C>(
                        
(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index c37fc77..026cfa6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -25,10 +25,11 @@ import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Represent a field definition for {@link PojoTypeInfo} type of objects.
  */
@@ -41,8 +42,8 @@ public class PojoField implements Serializable {
        private final TypeInformation<?> type;
 
        public PojoField(Field field, TypeInformation<?> type) {
-               this.field = Preconditions.checkNotNull(field);
-               this.type = Preconditions.checkNotNull(type);
+               this.field = checkNotNull(field);
+               this.type = checkNotNull(type);
        }
 
        public Field getField() {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index be2a027..9c65263 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -18,19 +18,9 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -40,10 +30,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-
-import com.google.common.base.Joiner;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
  * since the conditions are slightly different from Java Beans.
@@ -78,8 +78,8 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
                super(typeClass);
 
-               
Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
-                               "POJO " + typeClass + " is not public");
+               checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+                               "POJO %s is not public", typeClass);
 
                this.fields = fields.toArray(new PojoField[fields.size()]);
 
@@ -350,7 +350,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                        fieldStrings.add(field.getField().getName() + ": " + 
field.getTypeInformation().toString());
                }
                return "PojoType<" + getTypeClass().getName()
-                               + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
+                               + ", fields = [" + 
StringUtils.join(fieldStrings, ", ") + "]"
                                + ">";
        }
 
@@ -381,15 +381,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
                @Override
                public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
-                       Preconditions.checkState(
+                       checkState(
                                keyFields.size() > 0,
                                "No keys were defined for the 
PojoTypeComparatorBuilder.");
 
-                       Preconditions.checkState(
+                       checkState(
                                fieldComparators.size() > 0,
                                "No type comparators were defined for the 
PojoTypeComparatorBuilder.");
 
-                       Preconditions.checkState(
+                       checkState(
                                keyFields.size() == fieldComparators.size(),
                                "Number of key fields and field comparators is 
not equal.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 9ecbe73..051ad0df 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,24 +22,26 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.runtime.Tuple0Serializer;
-//CHECKSTYLE.ON: AvoidStarImport
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.types.Value;
 
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link TypeInformation} for the tuple types of the Java API.
  *
@@ -62,7 +64,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
                super(tupleType, types);
 
-               Preconditions.checkArgument(
+               checkArgument(
                        types.length <= Tuple.MAX_ARITY,
                        "The tuple type exceeds the maximum supported arity.");
 
@@ -131,24 +133,24 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
 
                @Override
                public TypeComparator<T> createTypeComparator(ExecutionConfig 
config) {
-                       Preconditions.checkState(
+                       checkState(
                                fieldComparators.size() > 0,
                                "No field comparators were defined for the 
TupleTypeComparatorBuilder."
                        );
 
-                       Preconditions.checkState(
+                       checkState(
                                logicalKeyFields.size() > 0,
                                "No key fields were defined for the 
TupleTypeComparatorBuilder."
                        );
 
-                       Preconditions.checkState(
+                       checkState(
                                fieldComparators.size() == 
logicalKeyFields.size(),
                                "The number of field comparators and key fields 
is not equal."
                        );
 
                        final int maxKey = Collections.max(logicalKeyFields);
 
-                       Preconditions.checkState(
+                       checkState(
                                maxKey >= 0,
                                "The maximum key field must be greater or equal 
than 0."
                        );
@@ -160,7 +162,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                        }
 
                        return new TupleComparator<T>(
-                               Ints.toArray(logicalKeyFields),
+                               listToPrimitives(logicalKeyFields),
                                fieldComparators.toArray(new 
TypeComparator[fieldComparators.size()]),
                                fieldSerializers
                        );
@@ -255,4 +257,12 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
 
                return (TupleTypeInfo<X>) new TupleTypeInfo<>(infos);
        }
+       
+       private static int[] listToPrimitives(ArrayList<Integer> ints) {
+               int[] result = new int[ints.size()];
+               for (int i = 0; i < result.length; i++) {
+                       result[i] = ints.get(i);
+               }
+               return result;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 753eb66..807fd54 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 
        private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
        public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... 
types) {
                super(tupleType);
 
-               this.types = Preconditions.checkNotNull(types);
+               this.types = checkNotNull(types);
 
                int fieldCounter = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 151f359..0469cc2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -66,7 +66,7 @@ import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A utility for reflection analysis on classes, to determine the return type 
of implementations of transformation
@@ -902,7 +902,7 @@ public class TypeExtractor {
                if (isClassType(originalType)) {
                        originalTypeAsClass = typeToClass(originalType);
                }
-               Preconditions.checkNotNull(originalTypeAsClass, "originalType 
has an unexpected type");
+               checkNotNull(originalTypeAsClass, "originalType has an 
unexpected type");
                // check if the class we assumed to conform to the defining 
type so far is actually a pojo because the
                // original type contains additional fields.
                // check for additional fields.
@@ -1466,7 +1466,7 @@ public class TypeExtractor {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <OUT,IN1,IN2> TypeInformation<OUT> 
privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-               Preconditions.checkNotNull(clazz);
+               checkNotNull(clazz);
 
                // Object is handled as generic type info
                if (clazz.equals(Object.class)) {
@@ -1822,7 +1822,7 @@ public class TypeExtractor {
 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <X> TypeInformation<X> privateGetForObject(X value) {
-               Preconditions.checkNotNull(value);
+               checkNotNull(value);
 
                // check if we can extract the types from tuples, otherwise 
work with the class
                if (value instanceof Tuple) {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 7c173c0..495a324 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -44,6 +43,9 @@ import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Type information for data types that extend the {@link Value} interface. 
The value
  * interface allows types to define their custom serialization and 
deserialization routines.
@@ -70,11 +72,11 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
 
        @PublicEvolving
        public ValueTypeInfo(Class<T> type) {
-               this.type = Preconditions.checkNotNull(type);
+               this.type = checkNotNull(type);
 
-               Preconditions.checkArgument(
+               checkArgument(
                        Value.class.isAssignableFrom(type) || 
type.equals(Value.class),
-                       "ValueTypeInfo can only be used for subclasses of " + 
Value.class.getName());
+                       "ValueTypeInfo can only be used for subclasses of %s", 
Value.class.getName());
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 5e3b2bc..7ca7a91 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -29,8 +28,12 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
 import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+
 import org.apache.hadoop.io.Writable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Type information for data types that extend Hadoop's {@link Writable} 
interface. The Writable
  * interface defines the serialization and deserialization routines for the 
data type.
@@ -46,11 +49,11 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
 
        @PublicEvolving
        public WritableTypeInfo(Class<T> typeClass) {
-               this.typeClass = Preconditions.checkNotNull(typeClass);
+               this.typeClass = checkNotNull(typeClass);
 
-               Preconditions.checkArgument(
+               checkArgument(
                        Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class),
-                       "WritableTypeInfo can only be used for subclasses of " 
+ Writable.class.getName());
+                       "WritableTypeInfo can only be used for subclasses of 
%s", Writable.class.getName());
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index bc04367..4c2a7f9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.util.Utf8;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
@@ -34,6 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
 import com.esotericsoftware.kryo.Kryo;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
@@ -66,8 +67,8 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        }
        
        public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this.type = Preconditions.checkNotNull(type);
-               this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
+               this.type = checkNotNull(type);
+               this.typeToInstantiate = checkNotNull(typeToInstantiate);
                
                InstantiationUtil.checkForInstantiation(typeToInstantiate);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 9e46f27..f30a767 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.InstantiationUtil;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class CopyableValueSerializer<T extends CopyableValue<T>> extends 
TypeSerializer<T> {
 
@@ -39,7 +39,7 @@ public class CopyableValueSerializer<T extends 
CopyableValue<T>> extends TypeSer
        
        
        public CopyableValueSerializer(Class<T> valueClass) {
-               this.valueClass = Preconditions.checkNotNull(valueClass);
+               this.valueClass = checkNotNull(valueClass);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index de24956..9958540 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -40,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public final class PojoSerializer<T> extends TypeSerializer<T> {
 
@@ -75,11 +75,11 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        Field[] fields,
                        ExecutionConfig executionConfig) {
 
-               this.clazz = Preconditions.checkNotNull(clazz);
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
-               this.fields = Preconditions.checkNotNull(fields);
+               this.clazz = checkNotNull(clazz);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
checkNotNull(fieldSerializers);
+               this.fields = checkNotNull(fields);
                this.numFields = fieldSerializers.length;
-               this.executionConfig = 
Preconditions.checkNotNull(executionConfig);
+               this.executionConfig = checkNotNull(executionConfig);
 
                LinkedHashSet<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 5b5d462..5a93cc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -27,6 +26,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
@@ -42,8 +42,8 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        @SuppressWarnings("unchecked")
        public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
-               this.tupleClass = Preconditions.checkNotNull(tupleClass);
-               this.fieldSerializers = (TypeSerializer<Object>[]) 
Preconditions.checkNotNull(fieldSerializers);
+               this.tupleClass = checkNotNull(tupleClass);
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
checkNotNull(fieldSerializers);
                this.arity = fieldSerializers.length;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 9329866..73dc0fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,6 +29,8 @@ import org.apache.flink.util.InstantiationUtil;
 import com.esotericsoftware.kryo.Kryo;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Serializer for {@link Value} types. Uses the value's serialization methods, 
and uses
  * Kryo for deep object copies.
@@ -49,7 +50,7 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        // 
--------------------------------------------------------------------------------------------
        
        public ValueSerializer(Class<T> type) {
-               this.type = Preconditions.checkNotNull(type);
+               this.type = checkNotNull(type);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 276ffc4..d5c2f67 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,9 +25,9 @@ import 
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
 
 import org.apache.avro.generic.GenericData;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -36,7 +36,9 @@ import 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
 import 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.objenesis.strategy.StdInstantiatorStrategy;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,8 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A type serializer that serializes its type using the Kryo serialization
  * framework (https://github.com/EsotericSoftware/kryo).
@@ -92,7 +96,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        // 
------------------------------------------------------------------------
 
        public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-               this.type = Preconditions.checkNotNull(type);
+               this.type = checkNotNull(type);
 
                this.defaultSerializers = 
executionConfig.getDefaultKryoSerializers();
                this.defaultSerializerClasses = 
executionConfig.getDefaultKryoSerializerClasses();

Reply via email to