http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java 
b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index e20083e..92f364e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 
-import com.google.common.base.Preconditions;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Mutable string data type that implements the Key interface.
@@ -147,7 +147,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         * @param value The new string value.
         */
        public void setValue(CharSequence value) {
-               Preconditions.checkNotNull(value);
+               checkNotNull(value);
                setValue(value, 0, value.length());
        }
        
@@ -158,7 +158,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         */
        @Override
        public void setValue(StringValue value) {
-               Preconditions.checkNotNull(value);
+               checkNotNull(value);
                setValue(value.value, 0, value.len);
        }
 
@@ -170,7 +170,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         * @param len The length of the substring.
         */
        public void setValue(StringValue value, int offset, int len) {
-               Preconditions.checkNotNull(value);
+               checkNotNull(value);
                setValue(value.value, offset, len);
        }
        
@@ -182,7 +182,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         * @param len The length of the substring.
         */
        public void setValue(CharSequence value, int offset, int len) {
-               Preconditions.checkNotNull(value);
+               checkNotNull(value);
                if (offset < 0 || len < 0 || offset > value.length() - len) {
                        throw new IndexOutOfBoundsException("offset: " + offset 
+ " len: " + len + " value.len: " + len);
                }
@@ -204,7 +204,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         * @param buffer The character buffer to read the characters from.
         */
        public void setValue(CharBuffer buffer) {
-               Preconditions.checkNotNull(buffer);
+               checkNotNull(buffer);
                final int len = buffer.length();
                ensureSize(len);
                buffer.get(this.value, 0, len);
@@ -220,7 +220,7 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
         * @param len The length of the substring.
         */
        public void setValue(char[] chars, int offset, int len) {
-               Preconditions.checkNotNull(chars);
+               checkNotNull(chars);
                if (offset < 0 || len < 0 || offset > chars.length - len) {
                        throw new IndexOutOfBoundsException();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
new file mode 100644
index 0000000..078599d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * This is a utility class to deal with temporary files.
+ */
+public final class FileUtils {
+
+       /**
+        * The alphabet to construct the random part of the filename from.
+        */
+       private static final char[] ALPHABET = { '0', '1', '2', '3', '4', '5', 
'6', '7', '8', '9', '0', 'a', 'b', 'c', 'd',
+               'e', 'f' };
+
+       /**
+        * The length of the random part of the filename.
+        */
+       private static final int LENGTH = 12;
+
+       
+
+       /**
+        * Constructs a random filename with the given prefix and
+        * a random part generated from hex characters.
+        * 
+        * @param prefix
+        *        the prefix to the filename to be constructed
+        * @return the generated random filename with the given prefix
+        */
+       public static String getRandomFilename(final String prefix) {
+
+               final StringBuilder stringBuilder = new StringBuilder(prefix);
+
+               for (int i = 0; i < LENGTH; i++) {
+                       stringBuilder.append(ALPHABET[(int) 
Math.floor(Math.random() * (double) ALPHABET.length)]);
+               }
+
+               return stringBuilder.toString();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Simple reading and writing of files
+       // 
------------------------------------------------------------------------
+       
+       public static String readFile(File file, String charsetName) throws 
IOException {
+               byte[] bytes = Files.readAllBytes(file.toPath());
+               return new String(bytes, charsetName);
+       }
+
+       public static String readFileUtf8(File file) throws IOException {
+               return readFile(file, "UTF-8");
+       }
+
+       public static void writeFile(File file, String contents, String 
encoding) throws IOException {
+               byte[] bytes = contents.getBytes(encoding);
+               Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+       }
+       
+       public static void writeFileUtf8(File file, String contents) throws 
IOException {
+               writeFile(file, contents, "UTF-8");
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Private default constructor to avoid instantiation.
+        */
+       private FileUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
new file mode 100644
index 0000000..12d70ce
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+
+/**
+ * An utility class for I/O related functionality.
+ * 
+ */
+public final class IOUtils {
+
+       /** The block size for byte operations in byte. */
+       private static final int BLOCKSIZE = 4096;
+       
+       // 
------------------------------------------------------------------------
+       //  Byte copy operations
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Copies from one stream to another.
+        * 
+        * @param in
+        *        InputStream to read from
+        * @param out
+        *        OutputStream to write to
+        * @param buffSize
+        *        the size of the buffer
+        * @param close
+        *        whether or not close the InputStream and OutputStream at the 
end. The streams are closed in the finally
+        *        clause.
+        * @throws IOException
+        *         thrown if an error occurred while writing to the output 
stream
+        */
+       public static void copyBytes(final InputStream in, final OutputStream 
out, final int buffSize, final boolean close)
+                       throws IOException {
+
+               @SuppressWarnings("resource")
+               final PrintStream ps = out instanceof PrintStream ? 
(PrintStream) out : null;
+               final byte[] buf = new byte[buffSize];
+               try {
+                       int bytesRead = in.read(buf);
+                       while (bytesRead >= 0) {
+                               out.write(buf, 0, bytesRead);
+                               if ((ps != null) && ps.checkError()) {
+                                       throw new IOException("Unable to write 
to output stream.");
+                               }
+                               bytesRead = in.read(buf);
+                       }
+               } finally {
+                       if (close) {
+                               out.close();
+                               in.close();
+                       }
+               }
+       }
+
+       /**
+        * Copies from one stream to another. <strong>closes the input and 
output
+        * streams at the end</strong>.
+        * 
+        * @param in
+        *        InputStream to read from
+        * @param out
+        *        OutputStream to write to
+        * @throws IOException
+        *         thrown if an I/O error occurs while copying
+        */
+       public static void copyBytes(final InputStream in, final OutputStream 
out) throws IOException {
+               copyBytes(in, out, BLOCKSIZE, true);
+       }
+
+       /**
+        * Copies from one stream to another.
+        * 
+        * @param in
+        *        InputStream to read from
+        * @param out
+        *        OutputStream to write to
+        * @param close
+        *        whether or not close the InputStream and OutputStream at the
+        *        end. The streams are closed in the finally clause.
+        * @throws IOException
+        *         thrown if an I/O error occurs while copying
+        */
+       public static void copyBytes(final InputStream in, final OutputStream 
out, final boolean close) throws IOException {
+               copyBytes(in, out, BLOCKSIZE, close);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Stream input skipping
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Reads len bytes in a loop.
+        * 
+        * @param in
+        *        The InputStream to read from
+        * @param buf
+        *        The buffer to fill
+        * @param off
+        *        offset from the buffer
+        * @param len
+        *        the length of bytes to read
+        * @throws IOException
+        *         if it could not read requested number of bytes for any 
reason (including EOF)
+        */
+       public static void readFully(final InputStream in, final byte[] buf, 
int off, final int len)
+                       throws IOException {
+               int toRead = len;
+               while (toRead > 0) {
+                       final int ret = in.read(buf, off, toRead);
+                       if (ret < 0) {
+                               throw new IOException("Premeture EOF from 
inputStream");
+                       }
+                       toRead -= ret;
+                       off += ret;
+               }
+       }
+
+       /**
+        * Similar to readFully(). Skips bytes in a loop.
+        * 
+        * @param in
+        *        The InputStream to skip bytes from
+        * @param len
+        *        number of bytes to skip
+        * @throws IOException
+        *         if it could not skip requested number of bytes for any 
reason (including EOF)
+        */
+       public static void skipFully(final InputStream in, long len) throws 
IOException {
+               while (len > 0) {
+                       final long ret = in.skip(len);
+                       if (ret < 0) {
+                               throw new IOException("Premeture EOF from 
inputStream");
+                       }
+                       len -= ret;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Silent I/O cleanup / closing
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Close the Closeable objects and <b>ignore</b> any {@link 
IOException} or
+        * null pointers. Must only be used for cleanup in exception handlers.
+        * 
+        * @param log
+        *        the log to record problems to at debug level. Can be 
<code>null</code>.
+        * @param closeables
+        *        the objects to close
+        */
+       public static void cleanup(final Logger log, final java.io.Closeable... 
closeables) {
+               for (java.io.Closeable c : closeables) {
+                       if (c != null) {
+                               try {
+                                       c.close();
+                               } catch (IOException e) {
+                                       if (log != null && 
log.isDebugEnabled()) {
+                                               log.debug("Exception in closing 
" + c, e);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Closes the stream ignoring {@link IOException}. Must only be called 
in
+        * cleaning up from exception handlers.
+        * 
+        * @param stream
+        *        the stream to close
+        */
+       public static void closeStream(final java.io.Closeable stream) {
+               cleanup(null, stream);
+       }
+
+       /**
+        * Closes the socket ignoring {@link IOException}.
+        * 
+        * @param sock
+        *        the socket to close
+        */
+       public static void closeSocket(final Socket sock) {
+               // avoids try { close() } dance
+               if (sock != null) {
+                       try {
+                               sock.close();
+                       } catch (IOException ignored) {
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Private constructor to prevent instantiation.
+        */
+       private IOUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
new file mode 100644
index 0000000..2bdddc8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Collection of simple mathematical routines.
+ */
+public final class MathUtils {
+       
+       /**
+        * Computes the logarithm of the given value to the base of 2, rounded 
down. It corresponds to the
+        * position of the highest non-zero bit. The position is counted, 
starting with 0 from the least
+        * significant bit to the most significant bit. For example, 
<code>log2floor(16) = 4</code>, and
+        * <code>log2floor(10) = 3</code>.
+        * 
+        * @param value The value to compute the logarithm for.
+        * @return The logarithm (rounded down) to the base of 2.
+        * @throws ArithmeticException Thrown, if the given value is zero.
+        */
+       public static int log2floor(int value) throws ArithmeticException {
+               if (value == 0) {
+                       throw new ArithmeticException("Logarithm of zero is 
undefined.");
+               }
+               
+               int log = 0;
+               while ((value = value >>> 1) != 0) {
+                       log++;
+               }
+               
+               return log;
+       }
+       
+       /**
+        * Computes the logarithm of the given value to the base of 2. This 
method throws an error,
+        * if the given argument is not a power of 2.
+        * 
+        * @param value The value to compute the logarithm for.
+        * @return The logarithm to the base of 2.
+        * @throws ArithmeticException Thrown, if the given value is zero.
+        * @throws IllegalArgumentException Thrown, if the given value is not a 
power of two.
+        */
+       public static int log2strict(int value) throws ArithmeticException, 
IllegalArgumentException {
+               if (value == 0) {
+                       throw new ArithmeticException("Logarithm of zero is 
undefined.");
+               }
+               if ((value & (value - 1)) != 0) {
+                       throw new IllegalArgumentException("The given value " + 
value + " is not a power of two.");
+               }
+               
+               int log = 0;
+               while ((value = value >>> 1) != 0) {
+                       log++;
+               }
+               
+               return log;
+       }
+       
+       /**
+        * Decrements the given number down to the closest power of two. If the 
argument is a
+        * power of two, it remains unchanged.
+        * 
+        * @param value The value to round down.
+        * @return The closest value that is a power of to and less or equal 
than the given value.
+        */
+       public static int roundDownToPowerOf2(int value) {
+               return Integer.highestOneBit(value);
+       }
+       
+       /**
+        * Casts the given value to a 32 bit integer, if it can be safely done. 
If the cast would change the numeric
+        * value, this method raises an exception.
+        * <p>
+        * This method is a protection in places where one expects to be able 
to safely case, but where unexpected
+        * situations could make the cast unsafe and would cause hidden 
problems that are hard to track down.
+        * 
+        * @param value The value to be cast to an integer.
+        * @return The given value as an integer.
+        */
+       public static int checkedDownCast(long value) {
+               if (value > Integer.MAX_VALUE) {
+                       throw new IllegalArgumentException("Cannot downcast 
long value " + value + " to integer.");
+               }
+               return (int) value;
+       }
+
+       /**
+        * Checks whether the given value is a power of two.
+        *
+        * @param value The value to check.
+        * @return True, if the value is a power of two, false otherwise.
+        */
+       public static boolean isPowerOf2(long value) {
+               return (value & (value - 1)) == 0;
+       }
+
+       /**
+        * This function hashes an integer value. It is adapted from Bob 
Jenkins' website
+        * <a 
href="http://www.burtleburtle.net/bob/hash/integer.html";>http://www.burtleburtle.net/bob/hash/integer.html</a>.
+        * The hash function has the <i>full avalanche</i> property, meaning 
that every bit of the value to be hashed
+        * affects every bit of the hash value.
+        *
+        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
+        * data structures. This hash function is intended for partitioning 
internally in data structures.
+        *
+        * @param code The integer to be hashed.
+        * @return The non-negative hash code for the integer.
+        */
+       public static int jenkinsHash(int code) {
+               code = (code + 0x7ed55d16) + (code << 12);
+               code = (code ^ 0xc761c23c) ^ (code >>> 19);
+               code = (code + 0x165667b1) + (code << 5);
+               code = (code + 0xd3a2646c) ^ (code << 9);
+               code = (code + 0xfd7046c5) + (code << 3);
+               code = (code ^ 0xb55a4f09) ^ (code >>> 16);
+               return code >= 0 ? code : -(code + 1);
+       }
+
+       /**
+        * This function hashes an integer value.
+        *
+        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
+        * data structures. This hash function is intended for partitioning 
across machines.
+        *
+        * @param code The integer to be hashed.
+        * @return The non-negative hash code for the integer.
+        */
+       public static int murmurHash(int code) {
+               code *= 0xcc9e2d51;
+               code = Integer.rotateLeft(code, 15);
+               code *= 0x1b873593;
+
+               code = Integer.rotateLeft(code, 13);
+               code = code * 5 + 0xe6546b64;
+
+               code ^= 4;
+               code ^= code >>> 16;
+               code *= 0x85ebca6b;
+               code ^= code >>> 13;
+               code *= 0xc2b2ae35;
+               code ^= code >>> 16;
+
+               if (code >= 0) {
+                       return code;
+               }
+               else if (code != Integer.MIN_VALUE) {
+                       return -code;
+               }
+               else {
+                       return 0;
+               }
+       }
+
+       // 
============================================================================================
+       
+       /**
+        * Prevent Instantiation through private constructor.
+        */
+       private MathUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d9c4d3c..6f63eb4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.util;
 
-import com.google.common.collect.Iterators;
-import com.google.common.net.InetAddresses;
 import org.apache.flink.annotation.Internal;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,9 +32,9 @@ import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 @Internal
 public class NetUtils {
@@ -116,8 +115,6 @@ public class NetUtils {
        /**
         * Encodes an IP address properly as a URL string. This method makes 
sure that IPv6 addresses
         * have the proper formatting to be included in URLs.
-        * <p>
-        * This method internally uses Guava's functionality to properly encode 
IPv6 addresses.
         * 
         * @param address The IP address to encode.
         * @return The proper URL string encoded IP address.
@@ -130,7 +127,7 @@ public class NetUtils {
                        return address.getHostAddress();
                }
                else if (address instanceof Inet6Address) {
-                       return '[' + InetAddresses.toAddrString(address) + ']';
+                       return getIPv6UrlRepresentation((Inet6Address) address);
                }
                else {
                        throw new IllegalArgumentException("Unrecognized type 
of InetAddress: " + address);
@@ -178,6 +175,70 @@ public class NetUtils {
        }
 
        /**
+        * Creates a compressed URL style representation of an Inet6Address.
+        * 
+        * <p>This method copies and adopts code from Google's Guava library.
+        * We re-implement this here in order to reduce dependency on Guava.
+        * The Guava library has frequently caused dependency conflicts in the 
past.
+        */
+       private static String getIPv6UrlRepresentation(Inet6Address address) {
+               // first, convert bytes to 16 bit chunks
+               byte[] addressBytes = address.getAddress();
+               int[] hextets = new int[8];
+               for (int i = 0; i < hextets.length; i++) {
+                       hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | 
(addressBytes[2 * i + 1] & 0xFF);
+               }
+
+               // now, find the sequence of zeros that should be compressed
+               int bestRunStart = -1;
+               int bestRunLength = -1;
+               int runStart = -1;
+               for (int i = 0; i < hextets.length + 1; i++) {
+                       if (i < hextets.length && hextets[i] == 0) {
+                               if (runStart < 0) {
+                                       runStart = i;
+                               }
+                       } else if (runStart >= 0) {
+                               int runLength = i - runStart;
+                               if (runLength > bestRunLength) {
+                                       bestRunStart = runStart;
+                                       bestRunLength = runLength;
+                               }
+                               runStart = -1;
+                       }
+               }
+               if (bestRunLength >= 2) {
+                       Arrays.fill(hextets, bestRunStart, bestRunStart + 
bestRunLength, -1);
+               }
+
+               // convert into text form
+               StringBuilder buf = new StringBuilder(40);
+               buf.append('[');
+               
+               boolean lastWasNumber = false;
+               for (int i = 0; i < hextets.length; i++) {
+                       boolean thisIsNumber = hextets[i] >= 0;
+                       if (thisIsNumber) {
+                               if (lastWasNumber) {
+                                       buf.append(':');
+                               }
+                               buf.append(Integer.toHexString(hextets[i]));
+                       } else {
+                               if (i == 0 || lastWasNumber) {
+                                       buf.append("::");
+                               }
+                       }
+                       lastWasNumber = thisIsNumber;
+               }
+               buf.append(']');
+               return buf.toString();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Port range parsing
+       // 
------------------------------------------------------------------------
+       
+       /**
         * Returns an iterator over available ports defined by the range 
definition.
         *
         * @param rangeDefinition String describing a single port, a range of 
ports or multiple ranges.
@@ -186,14 +247,16 @@ public class NetUtils {
         */
        public static Iterator<Integer> getPortRangeFromString(String 
rangeDefinition) throws NumberFormatException {
                final String[] ranges = rangeDefinition.trim().split(",");
-               List<Iterator<Integer>> iterators = new 
ArrayList<>(ranges.length);
-               for(String rawRange: ranges) {
+               
+               UnionIterator<Integer> iterators = new UnionIterator<>();
+               
+               for (String rawRange: ranges) {
                        Iterator<Integer> rangeIterator;
                        String range = rawRange.trim();
                        int dashIdx = range.indexOf('-');
                        if (dashIdx == -1) {
                                // only one port in range:
-                               rangeIterator = 
Iterators.singletonIterator(Integer.valueOf(range));
+                               rangeIterator = 
Collections.singleton(Integer.valueOf(range)).iterator();
                        } else {
                                // evaluate range
                                final int start = 
Integer.valueOf(range.substring(0, dashIdx));
@@ -218,7 +281,8 @@ public class NetUtils {
                        }
                        iterators.add(rangeIterator);
                }
-               return Iterators.concat(iterators.iterator());
+               
+               return iterators;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java 
b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
index 135038b..a9bd166 100644
--- a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -107,7 +107,7 @@ public final class Preconditions {
        }
 
        // 
------------------------------------------------------------------------
-       //  Boolean Condition Checking
+       //  Boolean Condition Checking (Argument)
        // 
------------------------------------------------------------------------
        
        /**
@@ -162,6 +162,61 @@ public final class Preconditions {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Boolean Condition Checking (State)
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Checks the given boolean condition, and throws an {@code 
IllegalStateException} if
+        * the condition is not met (evaluates to {@code false}).
+        *
+        * @param condition The condition to check
+        *
+        * @throws IllegalStateException Thrown, if the condition is violated.
+        */
+       public static void checkState(boolean condition) {
+               if (!condition) {
+                       throw new IllegalStateException();
+               }
+       }
+
+       /**
+        * Checks the given boolean condition, and throws an {@code 
IllegalStateException} if
+        * the condition is not met (evaluates to {@code false}). The exception 
will have the
+        * given error message.
+        *
+        * @param condition The condition to check
+        * @param errorMessage The message for the {@code 
IllegalStateException} that is thrown if the check fails.
+        *
+        * @throws IllegalStateException Thrown, if the condition is violated.
+        */
+       public static void checkState(boolean condition, @Nullable Object 
errorMessage) {
+               if (!condition) {
+                       throw new 
IllegalStateException(String.valueOf(errorMessage));
+               }
+       }
+
+       /**
+        * Checks the given boolean condition, and throws an {@code 
IllegalStateException} if
+        * the condition is not met (evaluates to {@code false}).
+        *
+        * @param condition The condition to check
+        * @param errorMessageTemplate The message template for the {@code 
IllegalStateException}
+        *                             that is thrown if the check fails. The 
template substitutes its
+        *                             {@code %s} placeholders with the error 
message arguments.
+        * @param errorMessageArgs The arguments for the error message, to be 
inserted into the
+        *                         message template for the {@code %s} 
placeholders.
+        *
+        * @throws IllegalStateException Thrown, if the condition is violated.
+        */
+       public static void checkState(boolean condition,
+                       @Nullable String errorMessageTemplate,
+                       @Nullable Object... errorMessageArgs) {
+
+               if (!condition) {
+                       throw new 
IllegalStateException(format(errorMessageTemplate, errorMessageArgs));
+               }
+       }
 
        // 
------------------------------------------------------------------------
        //  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java 
b/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
new file mode 100644
index 0000000..17204ce
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/UnionIterator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class UnionIterator<T> implements Iterator<T>, Iterable<T> {
+       
+       private Iterator<T> currentIterator;
+       
+       private ArrayList<Iterator<T>> furtherIterators = new ArrayList<>();
+       
+       private int nextIterator;
+       
+       private boolean iteratorAvailable = true;
+
+       // 
------------------------------------------------------------------------
+       
+       public void clear() {
+               currentIterator = null;
+               furtherIterators.clear();
+               nextIterator = 0;
+               iteratorAvailable = true;
+       }
+       
+       public void addList(List<T> list) {
+               add(list.iterator());
+       }
+
+       public void add(Iterator<T> iterator) {
+               if (currentIterator == null) {
+                       currentIterator = iterator;
+               }
+               else {
+                       furtherIterators.add(iterator);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public Iterator<T> iterator() {
+               if (iteratorAvailable) {
+                       iteratorAvailable = false;
+                       return this;
+               } else {
+                       throw new TraversableOnceException();
+               }
+       }
+
+       @Override
+       public boolean hasNext() {
+               while (currentIterator != null) {
+                       if (currentIterator.hasNext()) {
+                               return true;
+                       }
+                       else if (nextIterator < furtherIterators.size()) {
+                               currentIterator = 
furtherIterators.get(nextIterator);
+                               nextIterator++;
+                       }
+                       else {
+                               currentIterator = null;
+                       }
+               }
+               
+               return false;
+       }
+
+       @Override
+       public T next() {
+               if (hasNext()) {
+                       return currentIterator.next();
+               }
+               else {
+                       throw new NoSuchElementException();
+               }
+       }
+
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java 
b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
index 47ed561..b3ec393 100644
--- a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
+++ b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.util;
 
-import com.google.common.hash.Hashing;
 import org.apache.flink.annotation.Public;
 
 import java.util.Random;
@@ -42,7 +42,7 @@ public class XORShiftRandom extends Random {
 
        public XORShiftRandom(long input) {
                super(input);
-               this.seed = Hashing.murmur3_128().hashLong(input).asLong();
+               this.seed = MathUtils.murmurHash((int) input) ^ 
MathUtils.murmurHash((int) (input >>> 32));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 679e4ce..69159f2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.common.operators.base;
 
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
-import com.google.common.base.Joiner;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -39,7 +37,7 @@ public class OuterJoinOperatorBaseTest implements 
Serializable {
        private final FlatJoinFunction<String, String, String> joiner = new 
FlatJoinFunction<String, String, String>() {
                @Override
                public void join(String first, String second, Collector<String> 
out) throws Exception {
-                       out.collect(Joiner.on(',').join(String.valueOf(first), 
String.valueOf(second)));
+                       out.collect(String.valueOf(first) + ',' + 
String.valueOf(second));
                }
        };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java 
b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index e783a1d..ee37194 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.api.java.tuple;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import org.apache.flink.util.FileUtils;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -85,7 +84,7 @@ class TupleGenerator {
        }
 
        private static void insertCodeIntoFile(String code, File file) throws 
IOException {
-               String fileContent = Files.toString(file, Charsets.UTF_8);
+               String fileContent = FileUtils.readFileUtf8(file);
                
                try (Scanner s = new Scanner(fileContent)) {
                        StringBuilder sb = new StringBuilder();
@@ -126,7 +125,7 @@ class TupleGenerator {
                        while (s.hasNextLine() && (line = s.nextLine()) != 
null) {
                                sb.append(line).append("\n");
                        }
-                       Files.write(sb.toString(), file, Charsets.UTF_8);
+                       FileUtils.writeFileUtf8(file, sb.toString());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index bc11848..ffcfd52 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.MapFunction;
@@ -37,8 +38,6 @@ import 
org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.HashMultiset;
-
 /**
  *  Pojo Type tests
  *
@@ -87,7 +86,7 @@ public class PojoTypeExtractionTest {
        // all public test
        public static class AllPublic extends ComplexNestedClass {
                public ArrayList<String> somethingFancy; // generic type
-               public HashMultiset<Integer> fancyIds; // generic type
+               public FancyCollectionSubtype<Integer> fancyIds; // generic type
                public String[] fancyArray;                      // generic type
        }
 
@@ -436,7 +435,7 @@ public class PojoTypeExtractionTest {
                                }
                                multisetSeen = true;
                                Assert.assertTrue(field.getTypeInformation() 
instanceof GenericTypeInfo);
-                               Assert.assertEquals(HashMultiset.class, 
field.getTypeInformation().getTypeClass());
+                               
Assert.assertEquals(FancyCollectionSubtype.class, 
field.getTypeInformation().getTypeClass());
                        } else if(name.equals("fancyArray")) {
                                if(strArraySeen) {
                                        Assert.fail("already seen");
@@ -809,4 +808,10 @@ public class PojoTypeExtractionTest {
                Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
                Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
        }
+       
+       // 
------------------------------------------------------------------------
+       
+       public static class FancyCollectionSubtype<T> extends HashSet<T> {
+               private static final long serialVersionUID = 
-3494469602638179921L;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 260f7e9..4712ed1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -36,11 +37,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.base.Objects;
-
 /**
  * A test for the {@link PojoSerializer}.
  */
@@ -104,7 +104,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                @Override
                public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2, dumm3, dumm4, 
nestedClass);
+                       return Objects.hash(dumm1, dumm2, dumm3, dumm4, 
nestedClass);
                }
 
                @Override
@@ -162,7 +162,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
 
                @Override
                public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2, dumm3, dumm4);
+                       return Objects.hash(dumm1, dumm2, dumm3, dumm4);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
index 8c61a19..ddf1d0e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Objects;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Test;
 
+import java.util.Objects;
 import java.util.Random;
 
 /**
@@ -88,7 +89,7 @@ public class PojoSubclassSerializerTest extends 
SerializerTestBase<PojoSubclassS
 
                @Override
                public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2);
+                       return Objects.hash(dumm1, dumm2);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
index b797090..e6ffd07 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import com.google.common.base.Objects;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
 import org.junit.Test;
 
+import java.util.Objects;
 import java.util.Random;
 
 /**
@@ -90,7 +91,7 @@ public class SubclassFromInterfaceSerializerTest extends 
SerializerTestBase<Subc
 
                @Override
                public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2);
+                       return Objects.hash(dumm1, dumm2);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java 
b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
index 2b3a37a..782f4fb 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
@@ -22,11 +22,12 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A rule to retry failed tests for a fixed number of times.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java 
b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
new file mode 100644
index 0000000..7917a7b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.util.MathUtils;
+import org.junit.Test;
+
+public class MathUtilTest {
+
+       @Test
+       public void testLog2Computation() {
+               assertEquals(0, MathUtils.log2floor(1));
+               assertEquals(1, MathUtils.log2floor(2));
+               assertEquals(1, MathUtils.log2floor(3));
+               assertEquals(2, MathUtils.log2floor(4));
+               assertEquals(2, MathUtils.log2floor(5));
+               assertEquals(2, MathUtils.log2floor(7));
+               assertEquals(3, MathUtils.log2floor(8));
+               assertEquals(3, MathUtils.log2floor(9));
+               assertEquals(4, MathUtils.log2floor(16));
+               assertEquals(4, MathUtils.log2floor(17));
+               assertEquals(13, MathUtils.log2floor((0x1 << 13) + 1));
+               assertEquals(30, MathUtils.log2floor(Integer.MAX_VALUE));
+               assertEquals(31, MathUtils.log2floor(-1));
+               
+               try {
+                       MathUtils.log2floor(0);
+                       fail();
+               }
+               catch (ArithmeticException aex) {}
+       }
+       
+       @Test
+       public void testRoundDownToPowerOf2() {
+               assertEquals(0, MathUtils.roundDownToPowerOf2(0));
+               assertEquals(1, MathUtils.roundDownToPowerOf2(1));
+               assertEquals(2, MathUtils.roundDownToPowerOf2(2));
+               assertEquals(2, MathUtils.roundDownToPowerOf2(3));
+               assertEquals(4, MathUtils.roundDownToPowerOf2(4));
+               assertEquals(4, MathUtils.roundDownToPowerOf2(5));
+               assertEquals(4, MathUtils.roundDownToPowerOf2(6));
+               assertEquals(4, MathUtils.roundDownToPowerOf2(7));
+               assertEquals(8, MathUtils.roundDownToPowerOf2(8));
+               assertEquals(8, MathUtils.roundDownToPowerOf2(9));
+               assertEquals(8, MathUtils.roundDownToPowerOf2(15));
+               assertEquals(16, MathUtils.roundDownToPowerOf2(16));
+               assertEquals(16, MathUtils.roundDownToPowerOf2(17));
+               assertEquals(16, MathUtils.roundDownToPowerOf2(31));
+               assertEquals(32, MathUtils.roundDownToPowerOf2(32));
+               assertEquals(32, MathUtils.roundDownToPowerOf2(33));
+               assertEquals(32, MathUtils.roundDownToPowerOf2(42));
+               assertEquals(32, MathUtils.roundDownToPowerOf2(63));
+               assertEquals(64, MathUtils.roundDownToPowerOf2(64));
+               assertEquals(64, MathUtils.roundDownToPowerOf2(125));
+               assertEquals(16384, MathUtils.roundDownToPowerOf2(25654));
+               assertEquals(33554432, MathUtils.roundDownToPowerOf2(34366363));
+               assertEquals(33554432, MathUtils.roundDownToPowerOf2(63463463));
+               assertEquals(1073741824, 
MathUtils.roundDownToPowerOf2(1852987883));
+               assertEquals(1073741824, 
MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE));
+       }
+
+       @Test
+       public void testPowerOfTwo() {
+               assertTrue(MathUtils.isPowerOf2(1));
+               assertTrue(MathUtils.isPowerOf2(2));
+               assertTrue(MathUtils.isPowerOf2(4));
+               assertTrue(MathUtils.isPowerOf2(8));
+               assertTrue(MathUtils.isPowerOf2(32768));
+               assertTrue(MathUtils.isPowerOf2(65536));
+               assertTrue(MathUtils.isPowerOf2(1 << 30));
+               assertTrue(MathUtils.isPowerOf2(1L + Integer.MAX_VALUE));
+               assertTrue(MathUtils.isPowerOf2(1L << 41));
+               assertTrue(MathUtils.isPowerOf2(1L << 62));
+
+               assertFalse(MathUtils.isPowerOf2(3));
+               assertFalse(MathUtils.isPowerOf2(5));
+               assertFalse(MathUtils.isPowerOf2(567923));
+               assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE));
+               assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index e367e8b..72cc89e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.util;
 
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Iterators;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
 import static org.hamcrest.core.IsCollectionContaining.hasItems;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java 
b/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
new file mode 100644
index 0000000..4c1fc41
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/UnionIteratorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.util.TraversableOnceException;
+import org.apache.flink.util.UnionIterator;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.*;
+
+public class UnionIteratorTest {
+
+       @Test
+       public void testUnion() {
+               try {
+                       UnionIterator<Integer> iter = new UnionIterator<>();
+
+                       // should succeed and be empty
+                       assertFalse(iter.iterator().hasNext());
+
+                       iter.clear();
+                       
+                       try {
+                               iter.iterator().next();
+                               fail("should fail with an exception");
+                       } catch (NoSuchElementException e) {
+                               // expected
+                       }
+
+                       iter.clear();
+                       iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+                       iter.addList(Collections.<Integer>emptyList());
+                       iter.addList(Arrays.asList(8, 9, 10, 11));
+                       
+                       int val = 1;
+                       for (int i : iter) {
+                               assertEquals(val++, i);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testTraversableOnce() {
+               try {
+                       UnionIterator<Integer> iter = new UnionIterator<>();
+                       
+                       // should succeed
+                       iter.iterator();
+                       
+                       // should fail
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // should fail again
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, keep it empty
+                       iter.clear();
+
+                       // should succeed
+                       iter.iterator();
+
+                       // should fail
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // should fail again
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, add some data
+                       iter.clear();
+                       iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+                       
+                       // should succeed
+                       Iterator<Integer> ints = iter.iterator();
+                       assertNotNull(ints.next());
+                       assertNotNull(ints.next());
+                       assertNotNull(ints.next());
+                       
+                       // should fail if called in the middle of operations
+                       try {
+                               iter.iterator();
+                               fail("should fail with an exception");
+                       } catch (TraversableOnceException e) {
+                               // expected
+                       }
+
+                       // reset the thing, keep it empty
+                       iter.clear();
+
+                       // should succeed again
+                       assertFalse(iter.iterator().hasNext());
+                       
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index d8f744b..99237a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,7 +23,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index efdb003..5f65564 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
index 8d9d3a7..73a094a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FileSystemStateStore.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 1b8b1d9..b9ee2c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.util.IOUtils;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index 2ca7f78..992631b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a 
{@link BlockChannelReader},

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
index 23dccb0..2ff6c94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 
 public class RandomAccessInputView extends AbstractPagedInputView implements 
SeekableDataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
index 427fe84..b0ad184 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessOutputView.java
@@ -24,7 +24,7 @@ import java.io.EOFException;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 355b2eb..e768c77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a 
{@link BlockChannelReader},

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
index a107e79..1a45ff2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The list with the full segments contains at any point all completely full 
segments, plus the segment that is

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 094d065..3ade753 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The memory manager governs the memory that Flink uses for sorting, hashing, 
and caching. Memory

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index b4d03e7..0ff2ce8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memory.ListMemorySegmentSource;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.runtime.util.LongArrayList;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 44ee163..9221e7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 1495ee1..351cd3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -42,7 +42,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import 
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.operators.util.BloomFilter;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index fdbcd9f..4e20842 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.util.MathUtils;
 
 /**
  * The output emitter decides to which of the possibly multiple output 
channels a record is sent.

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
deleted file mode 100644
index 42994d3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-/**
- * This is a utility class to deal with temporary files.
- */
-public final class FileUtils {
-
-       /**
-        * The alphabet to construct the random part of the filename from.
-        */
-       private static final char[] ALPHABET = { '0', '1', '2', '3', '4', '5', 
'6', '7', '8', '9', '0', 'a', 'b', 'c', 'd',
-               'e', 'f' };
-
-       /**
-        * The length of the random part of the filename.
-        */
-       private static final int LENGTH = 12;
-
-       /**
-        * Empty private constructor to avoid instantiation.
-        */
-       private FileUtils() {
-       }
-
-       /**
-        * Constructs a random filename with the given prefix and
-        * a random part generated from hex characters.
-        * 
-        * @param prefix
-        *        the prefix to the filename to be constructed
-        * @return the generated random filename with the given prefix
-        */
-       public static String getRandomFilename(final String prefix) {
-
-               final StringBuilder stringBuilder = new StringBuilder(prefix);
-
-               for (int i = 0; i < LENGTH; i++) {
-                       stringBuilder.append(ALPHABET[(int) 
Math.floor(Math.random() * (double) ALPHABET.length)]);
-               }
-
-               return stringBuilder.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
deleted file mode 100644
index d03b72c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IOUtils.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.Socket;
-
-import org.slf4j.Logger;
-
-/**
- * An utility class for I/O related functionality.
- * 
- */
-public final class IOUtils {
-
-       /**
-        * The block size for byte operations in byte.
-        */
-       private static final int BLOCKSIZE = 4096;
-
-       /**
-        * Private constructor to overwrite the public one.
-        */
-       private IOUtils() {
-       }
-
-       /**
-        * Copies from one stream to another.
-        * 
-        * @param in
-        *        InputStream to read from
-        * @param out
-        *        OutputStream to write to
-        * @param buffSize
-        *        the size of the buffer
-        * @param close
-        *        whether or not close the InputStream and OutputStream at the 
end. The streams are closed in the finally
-        *        clause.
-        * @throws IOException
-        *         thrown if an error occurred while writing to the output 
stream
-        */
-       public static void copyBytes(final InputStream in, final OutputStream 
out, final int buffSize, final boolean close)
-                       throws IOException {
-
-               @SuppressWarnings("resource")
-               final PrintStream ps = out instanceof PrintStream ? 
(PrintStream) out : null;
-               final byte[] buf = new byte[buffSize];
-               try {
-                       int bytesRead = in.read(buf);
-                       while (bytesRead >= 0) {
-                               out.write(buf, 0, bytesRead);
-                               if ((ps != null) && ps.checkError()) {
-                                       throw new IOException("Unable to write 
to output stream.");
-                               }
-                               bytesRead = in.read(buf);
-                       }
-               } finally {
-                       if (close) {
-                               out.close();
-                               in.close();
-                       }
-               }
-       }
-
-       /**
-        * Copies from one stream to another. <strong>closes the input and 
output
-        * streams at the end</strong>.
-        * 
-        * @param in
-        *        InputStream to read from
-        * @param out
-        *        OutputStream to write to
-        * @throws IOException
-        *         thrown if an I/O error occurs while copying
-        */
-       public static void copyBytes(final InputStream in, final OutputStream 
out) throws IOException {
-               copyBytes(in, out, BLOCKSIZE, true);
-       }
-
-       /**
-        * Copies from one stream to another.
-        * 
-        * @param in
-        *        InputStream to read from
-        * @param out
-        *        OutputStream to write to
-        * @param close
-        *        whether or not close the InputStream and OutputStream at the
-        *        end. The streams are closed in the finally clause.
-        * @throws IOException
-        *         thrown if an I/O error occurs while copying
-        */
-       public static void copyBytes(final InputStream in, final OutputStream 
out, final boolean close) throws IOException {
-               copyBytes(in, out, BLOCKSIZE, close);
-       }
-
-       /**
-        * Reads len bytes in a loop.
-        * 
-        * @param in
-        *        The InputStream to read from
-        * @param buf
-        *        The buffer to fill
-        * @param off
-        *        offset from the buffer
-        * @param len
-        *        the length of bytes to read
-        * @throws IOException
-        *         if it could not read requested number of bytes for any 
reason (including EOF)
-        */
-       public static void readFully(final InputStream in, final byte[] buf, 
int off, final int len)
-                       throws IOException {
-               int toRead = len;
-               while (toRead > 0) {
-                       final int ret = in.read(buf, off, toRead);
-                       if (ret < 0) {
-                               throw new IOException("Premeture EOF from 
inputStream");
-                       }
-                       toRead -= ret;
-                       off += ret;
-               }
-       }
-
-       /**
-        * Similar to readFully(). Skips bytes in a loop.
-        * 
-        * @param in
-        *        The InputStream to skip bytes from
-        * @param len
-        *        number of bytes to skip
-        * @throws IOException
-        *         if it could not skip requested number of bytes for any 
reason (including EOF)
-        */
-       public static void skipFully(final InputStream in, long len) throws 
IOException {
-               while (len > 0) {
-                       final long ret = in.skip(len);
-                       if (ret < 0) {
-                               throw new IOException("Premeture EOF from 
inputStream");
-                       }
-                       len -= ret;
-               }
-       }
-
-       /**
-        * Close the Closeable objects and <b>ignore</b> any {@link 
IOException} or
-        * null pointers. Must only be used for cleanup in exception handlers.
-        * 
-        * @param log
-        *        the log to record problems to at debug level. Can be 
<code>null</code>.
-        * @param closeables
-        *        the objects to close
-        */
-       public static void cleanup(final Logger log, final java.io.Closeable... 
closeables) {
-               for (java.io.Closeable c : closeables) {
-                       if (c != null) {
-                               try {
-                                       c.close();
-                               } catch (IOException e) {
-                                       if (log != null && 
log.isDebugEnabled()) {
-                                               log.debug("Exception in closing 
" + c, e);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Closes the stream ignoring {@link IOException}. Must only be called 
in
-        * cleaning up from exception handlers.
-        * 
-        * @param stream
-        *        the stream to close
-        */
-       public static void closeStream(final java.io.Closeable stream) {
-               cleanup(null, stream);
-       }
-
-       /**
-        * Closes the socket ignoring {@link IOException}.
-        * 
-        * @param sock
-        *        the socket to close
-        */
-       public static void closeSocket(final Socket sock) {
-               // avoids try { close() } dance
-               if (sock != null) {
-                       try {
-                               sock.close();
-                       } catch (IOException ignored) {
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
deleted file mode 100644
index 5d26186..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-/**
- * Collection of simple mathematical routines.
- */
-public final class MathUtils {
-       
-       /**
-        * Computes the logarithm of the given value to the base of 2, rounded 
down. It corresponds to the
-        * position of the highest non-zero bit. The position is counted, 
starting with 0 from the least
-        * significant bit to the most significant bit. For example, 
<code>log2floor(16) = 4</code>, and
-        * <code>log2floor(10) = 3</code>.
-        * 
-        * @param value The value to compute the logarithm for.
-        * @return The logarithm (rounded down) to the base of 2.
-        * @throws ArithmeticException Thrown, if the given value is zero.
-        */
-       public static int log2floor(int value) throws ArithmeticException {
-               if (value == 0) {
-                       throw new ArithmeticException("Logarithm of zero is 
undefined.");
-               }
-               
-               int log = 0;
-               while ((value = value >>> 1) != 0) {
-                       log++;
-               }
-               
-               return log;
-       }
-       
-       /**
-        * Computes the logarithm of the given value to the base of 2. This 
method throws an error,
-        * if the given argument is not a power of 2.
-        * 
-        * @param value The value to compute the logarithm for.
-        * @return The logarithm to the base of 2.
-        * @throws ArithmeticException Thrown, if the given value is zero.
-        * @throws IllegalArgumentException Thrown, if the given value is not a 
power of two.
-        */
-       public static int log2strict(int value) throws ArithmeticException, 
IllegalArgumentException {
-               if (value == 0) {
-                       throw new ArithmeticException("Logarithm of zero is 
undefined.");
-               }
-               if ((value & (value - 1)) != 0) {
-                       throw new IllegalArgumentException("The given value " + 
value + " is not a power of two.");
-               }
-               
-               int log = 0;
-               while ((value = value >>> 1) != 0) {
-                       log++;
-               }
-               
-               return log;
-       }
-       
-       /**
-        * Decrements the given number down to the closest power of two. If the 
argument is a
-        * power of two, it remains unchanged.
-        * 
-        * @param value The value to round down.
-        * @return The closest value that is a power of to and less or equal 
than the given value.
-        */
-       public static int roundDownToPowerOf2(int value) {
-               return Integer.highestOneBit(value);
-       }
-       
-       /**
-        * Casts the given value to a 32 bit integer, if it can be safely done. 
If the cast would change the numeric
-        * value, this method raises an exception.
-        * <p>
-        * This method is a protection in places where one expects to be able 
to safely case, but where unexpected
-        * situations could make the cast unsafe and would cause hidden 
problems that are hard to track down.
-        * 
-        * @param value The value to be cast to an integer.
-        * @return The given value as an integer.
-        */
-       public static int checkedDownCast(long value) {
-               if (value > Integer.MAX_VALUE) {
-                       throw new IllegalArgumentException("Cannot downcast 
long value " + value + " to integer.");
-               }
-               return (int) value;
-       }
-
-       /**
-        * Checks whether the given value is a power of two.
-        *
-        * @param value The value to check.
-        * @return True, if the value is a power of two, false otherwise.
-        */
-       public static boolean isPowerOf2(long value) {
-               return (value & (value - 1)) == 0;
-       }
-
-       /**
-        * This function hashes an integer value. It is adapted from Bob 
Jenkins' website
-        * <a 
href="http://www.burtleburtle.net/bob/hash/integer.html";>http://www.burtleburtle.net/bob/hash/integer.html</a>.
-        * The hash function has the <i>full avalanche</i> property, meaning 
that every bit of the value to be hashed
-        * affects every bit of the hash value.
-        *
-        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
-        * data structures. This hash function is intended for partitioning 
internally in data structures.
-        *
-        * @param code The integer to be hashed.
-        * @return The non-negative hash code for the integer.
-        */
-       public static int jenkinsHash(int code) {
-               code = (code + 0x7ed55d16) + (code << 12);
-               code = (code ^ 0xc761c23c) ^ (code >>> 19);
-               code = (code + 0x165667b1) + (code << 5);
-               code = (code + 0xd3a2646c) ^ (code << 9);
-               code = (code + 0xfd7046c5) + (code << 3);
-               code = (code ^ 0xb55a4f09) ^ (code >>> 16);
-               return code >= 0 ? code : -(code + 1);
-       }
-
-       /**
-        * This function hashes an integer value.
-        *
-        * It is crucial to use different hash functions to partition data 
across machines and the internal partitioning of
-        * data structures. This hash function is intended for partitioning 
across machines.
-        *
-        * @param code The integer to be hashed.
-        * @return The non-negative hash code for the integer.
-        */
-       public static int murmurHash(int code) {
-               code *= 0xcc9e2d51;
-               code = Integer.rotateLeft(code, 15);
-               code *= 0x1b873593;
-
-               code = Integer.rotateLeft(code, 13);
-               code = code * 5 + 0xe6546b64;
-
-               code ^= 4;
-               code ^= code >>> 16;
-               code *= 0x85ebca6b;
-               code ^= code >>> 13;
-               code *= 0xc2b2ae35;
-               code ^= code >>> 16;
-
-               if (code >= 0) {
-                       return code;
-               }
-               else if (code != Integer.MIN_VALUE) {
-                       return -code;
-               }
-               else {
-                       return 0;
-               }
-       }
-
-       // 
============================================================================================
-       
-       /**
-        * Prevent Instantiation through private constructor.
-        */
-       private MathUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
deleted file mode 100644
index c279adf..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.TraversableOnceException;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class UnionIterator<T> implements Iterator<T>, Iterable<T> {
-       
-       private Iterator<T> currentIterator;
-       
-       private ArrayList<List<T>> furtherLists = new ArrayList<>();
-       
-       private int nextList;
-       
-       private boolean iteratorAvailable = true;
-
-       // 
------------------------------------------------------------------------
-       
-       public void clear() {
-               currentIterator = null;
-               furtherLists.clear();
-               nextList = 0;
-               iteratorAvailable = true;
-       }
-       
-       public void addList(List<T> list) {
-               if (currentIterator == null) {
-                       currentIterator = list.iterator();
-               }
-               else {
-                       furtherLists.add(list);
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public Iterator<T> iterator() {
-               if (iteratorAvailable) {
-                       iteratorAvailable = false;
-                       return this;
-               } else {
-                       throw new TraversableOnceException();
-               }
-       }
-
-       @Override
-       public boolean hasNext() {
-               while (currentIterator != null) {
-                       if (currentIterator.hasNext()) {
-                               return true;
-                       }
-                       else if (nextList < furtherLists.size()) {
-                               currentIterator = 
furtherLists.get(nextList).iterator();
-                               nextList++;
-                       }
-                       else {
-                               currentIterator = null;
-                       }
-               }
-               
-               return false;
-       }
-
-       @Override
-       public T next() {
-               if (hasNext()) {
-                       return currentIterator.next();
-               }
-               else {
-                       throw new NoSuchElementException();
-               }
-       }
-
-       @Override
-       public void remove() {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index d6b69e4..bb7d94c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 6d765b3..49953a6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -67,7 +67,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.NetUtils
+import org.apache.flink.util.{MathUtils, NetUtils}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/760a0d9e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index bbb6a89..6bd8b34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testutils;
 
-import org.apache.flink.runtime.util.FileUtils;
+import org.apache.flink.util.FileUtils;
 
 import java.io.File;
 import java.io.FileWriter;

Reply via email to