http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
new file mode 100644
index 0000000..57bbd9b
--- /dev/null
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.junit.Assert
+
+class ScalaShellLocalStartupITCase extends TestLogger {
+
+  /**
+   * tests flink shell with local setup through startup script in bin folder
+   */
+  @Test
+  def testLocalCluster: Unit = {
+    val input: String =
+      """
+        |import org.apache.flink.api.common.functions.RichMapFunction
+        |import org.apache.flink.api.java.io.PrintingOutputFormat
+        |import org.apache.flink.api.common.accumulators.IntCounter
+        |import org.apache.flink.configuration.Configuration
+        |
+        |val els = env.fromElements("foobar","barfoo")
+        |val mapped = els.map{
+        | new RichMapFunction[String, String]() {
+        |   var intCounter: IntCounter = _
+        |   override def open(conf: Configuration): Unit = {
+        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
+        |   }
+        |
+        |   def map(element: String): String = {
+        |     intCounter.add(1)
+        |     element
+        |   }
+        | }
+        |}
+        |mapped.output(new PrintingOutputFormat())
+        |val executionResult = env.execute("Test Job")
+        |System.out.println("IntCounter: " + 
executionResult.getIntCounterResult("intCounter"))
+        |
+        |:q
+      """.stripMargin
+      val in: BufferedReader = new BufferedReader(new StringReader(input + 
"\n"))
+      val out: StringWriter = new StringWriter
+      val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+      val oldOut: PrintStream = System.out
+      System.setOut(new PrintStream(baos))
+      val args: Array[String] = Array("local")
+
+    //start flink scala shell
+    FlinkShell.bufferedReader = Some(in);
+    FlinkShell.main(args)
+
+    baos.flush()
+    val output: String = baos.toString
+    System.setOut(oldOut)
+
+    Assert.assertTrue(output.contains("IntCounter: 2"))
+    Assert.assertTrue(output.contains("foobar"))
+    Assert.assertTrue(output.contains("barfoo"))
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("Error"))
+    Assert.assertFalse(output.contains("ERROR"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-scala-shell/start-script/start-scala-shell.sh 
b/flink-scala-shell/start-script/start-scala-shell.sh
new file mode 100644
index 0000000..fd85897
--- /dev/null
+++ b/flink-scala-shell/start-script/start-scala-shell.sh
@@ -0,0 +1,86 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# from scala-lang 2.10.4
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+  if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+    echo "restoring stty:"
+    echo "$saved_stty"
+  fi
+  stty $saved_stty
+  saved_stty=""
+}
+
+function onExit() {
+  [[ "$saved_stty" != "" ]] && restoreSttySettings
+  exit $scala_exit_status
+}
+
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+  saved_stty=""
+fi
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+FLINK_CLASSPATH=`constructFlinkClassPath`
+
+# https://issues.scala-lang.org/browse/SI-6502, cant load external jars 
interactively
+# in scala shell since 2.10, has to be done at startup
+# checks arguments for additional classpath and adds it to the "standard 
classpath"
+
+EXTERNAL_LIB_FOUND=false
+for ((i=1;i<=$#;i++))
+do
+    if [[  ${!i} = "-a" || ${!i} = "--addclasspath" ]]
+    then
+       EXTERNAL_LIB_FOUND=true
+       
+        #adding to classpath
+        k=$((i+1))
+        j=$((k+1))
+        echo " "
+        echo "Additional classpath:${!k}"
+        echo " "
+        EXT_CLASSPATH="${!k}"
+        FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}"
+        set -- "${@:1:$((i-1))}" "${@:j}"
+    fi
+done
+
+if ${EXTERNAL_LIB_FOUND}
+then
+    java -Dscala.color -cp "$FLINK_CLASSPATH" 
org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH"
+else
+    java -Dscala.color -cp "$FLINK_CLASSPATH" 
org.apache.flink.api.scala.FlinkShell $@
+fi
+
+#restore echo
+onExit

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/pom.xml b/flink-staging/flink-avro/pom.xml
deleted file mode 100644
index 9e0e868..0000000
--- a/flink-staging/flink-avro/pom.xml
+++ /dev/null
@@ -1,205 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-avro</artifactId>
-       <name>flink-avro</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-                       <!-- version is derived from base module -->
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>create-test-dependency</id>
-                                               
<phase>process-test-classes</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                               <manifest>
-                                                                       
<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
-                                                               </manifest>
-                                                       </archive>
-                                                       
<finalName>maven</finalName>
-                                                       <attach>false</attach>
-                                                       <descriptors>
-                                                               
<descriptor>src/test/assembly/test-assembly.xml</descriptor>
-                                                       </descriptors>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!--Remove the AvroExternalJarProgram code from the 
test-classes directory since it musn't be in the
-                       classpath when running the tests to actually test 
whether the user code class loader
-                       is properly used.-->
-                       <plugin>
-                               <artifactId>maven-clean-plugin</artifactId>
-                               <version>2.5</version><!--$NO-MVN-MAN-VER$-->
-                               <executions>
-                                       <execution>
-                                               
<id>remove-avroexternalprogram</id>
-                                               
<phase>process-test-classes</phase>
-                                               <goals>
-                                                       <goal>clean</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<excludeDefaultDirectories>true</excludeDefaultDirectories>
-                                                       <filesets>
-                                                               <fileset>
-                                                                       
<directory>${project.build.testOutputDirectory}</directory>
-                                                                       
<includes>
-                                                                               
<include>**/testjar/*.class</include>
-                                                                       
</includes>
-                                                               </fileset>
-                                                       </filesets>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!-- Generate Test class from avro schema -->
-                       <plugin>
-                               <groupId>org.apache.avro</groupId>
-                               <artifactId>avro-maven-plugin</artifactId>
-                               <version>1.7.7</version>
-                               <executions>
-                                       <execution>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>schema</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
-                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-               
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-assembly-plugin</artifactId>
-                                                                               
<versionRange>[2.4,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>single</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-clean-plugin</artifactId>
-                                                                               
<versionRange>[1,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>clean</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.avro</groupId>
-                                                                               
<artifactId>avro-maven-plugin</artifactId>
-                                                                               
<versionRange>[1.7.7,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>schema</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 59da4cb..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder {
-       
-       private final Utf8 stringDecoder = new Utf8();
-       
-       private DataInput in;
-       
-       public void setIn(DataInput in) {
-               this.in = in;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void readNull() {}
-       
-
-       @Override
-       public boolean readBoolean() throws IOException {
-               return in.readBoolean();
-       }
-
-       @Override
-       public int readInt() throws IOException {
-               return in.readInt();
-       }
-
-       @Override
-       public long readLong() throws IOException {
-               return in.readLong();
-       }
-
-       @Override
-       public float readFloat() throws IOException {
-               return in.readFloat();
-       }
-
-       @Override
-       public double readDouble() throws IOException {
-               return in.readDouble();
-       }
-       
-       @Override
-       public int readEnum() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
-               in.readFully(bytes, start, length);
-       }
-       
-       @Override
-       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-               int length = readInt();
-               ByteBuffer result;
-               if (old != null && length <= old.capacity() && old.hasArray()) {
-                       result = old;
-                       result.clear();
-               } else {
-                       result = ByteBuffer.allocate(length);
-               }
-               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
-               result.limit(length);
-               return result;
-       }
-       
-       
-       @Override
-       public void skipFixed(int length) throws IOException {
-               skipBytes(length);
-       }
-       
-       @Override
-       public void skipBytes() throws IOException {
-               int num = readInt();
-               skipBytes(num);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       @Override
-       public Utf8 readString(Utf8 old) throws IOException {
-               int length = readInt();
-               Utf8 result = (old != null ? old : new Utf8());
-               result.setByteLength(length);
-               
-               if (length > 0) {
-                       in.readFully(result.getBytes(), 0, length);
-               }
-               
-               return result;
-       }
-
-       @Override
-       public String readString() throws IOException {
-               return readString(stringDecoder).toString();
-       }
-
-       @Override
-       public void skipString() throws IOException {
-               int len = readInt();
-               skipBytes(len);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public long readArrayStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long arrayNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipArray() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long readMapStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long mapNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipMap() throws IOException {
-               return readVarLongCount(in);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int readIndex() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void skipBytes(int num) throws IOException {
-               while (num > 0) {
-                       num -= in.skipBytes(num);
-               }
-       }
-       
-       public static long readVarLongCount(DataInput in) throws IOException {
-               long value = in.readUnsignedByte();
-
-               if ((value & 0x80) == 0) {
-                       return value;
-               }
-               else {
-                       long curr;
-                       int shift = 7;
-                       value = value & 0x7f;
-                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-                               value |= (curr & 0x7f) << shift;
-                               shift += 7;
-                       }
-                       value |= curr << shift;
-                       return value;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index 0102cc1..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private DataOutput out;
-       
-       
-       public void setOut(DataOutput out) {
-               this.out = out;
-       }
-
-
-       @Override
-       public void flush() throws IOException {}
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeNull() {}
-       
-
-       @Override
-       public void writeBoolean(boolean b) throws IOException {
-               out.writeBoolean(b);
-       }
-
-       @Override
-       public void writeInt(int n) throws IOException {
-               out.writeInt(n);
-       }
-
-       @Override
-       public void writeLong(long n) throws IOException {
-               out.writeLong(n);
-       }
-
-       @Override
-       public void writeFloat(float f) throws IOException {
-               out.writeFloat(f);
-       }
-
-       @Override
-       public void writeDouble(double d) throws IOException {
-               out.writeDouble(d);
-       }
-       
-       @Override
-       public void writeEnum(int e) throws IOException {
-               out.writeInt(e);
-       }
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
-               out.write(bytes, start, len);
-       }
-       
-       @Override
-       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
-               out.writeInt(len);
-               if (len > 0) {
-                       out.write(bytes, start, len);
-               }
-       }
-       
-       @Override
-       public void writeBytes(ByteBuffer bytes) throws IOException {
-               int num = bytes.remaining();
-               out.writeInt(num);
-               
-               if (num > 0) {
-                       writeFixed(bytes);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeString(String str) throws IOException {
-               byte[] bytes = Utf8.getBytesFor(str);
-               writeBytes(bytes, 0, bytes.length);
-       }
-       
-       @Override
-       public void writeString(Utf8 utf8) throws IOException {
-               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-               
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeArrayStart() {}
-
-       @Override
-       public void setItemCount(long itemCount) throws IOException {
-               if (itemCount > 0) {
-                       writeVarLongCount(out, itemCount);
-               }
-       }
-
-       @Override
-       public void startItem() {}
-
-       @Override
-       public void writeArrayEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       @Override
-       public void writeMapStart() {}
-
-       @Override
-       public void writeMapEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeIndex(int unionIndex) throws IOException {
-               out.writeInt(unionIndex);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-               
-       
-       public static void writeVarLongCount(DataOutput out, long val) throws 
IOException {
-               if (val < 0) {
-                       throw new IOException("Illegal count (must be 
non-negative): " + val);
-               }
-               
-               while ((val & ~0x7FL) != 0) {
-                       out.write(((int) val) | 0x80);
-                       val >>>= 7;
-               }
-               out.write((int) val);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index 709c4f1..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
-
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache 
licensed as well)
- * 
- * The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
-       private final FSDataInputStream stream;
-       private long pos;
-       private long len;
-
-       public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
-               this.stream = stream;
-               this.pos = 0;
-               this.len = len;
-       }
-
-       public long length() throws IOException {
-               return this.len;
-       }
-
-       public int read(byte[] b, int off, int len) throws IOException {
-               int read;
-               read = stream.read(b, off, len);
-               pos += read;
-               return read;
-       }
-
-       public void seek(long p) throws IOException {
-               stream.seek(p);
-               pos = p;
-       }
-
-       public long tell() throws IOException {
-               return pos;
-       }
-
-       public void close() throws IOException {
-               stream.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
deleted file mode 100644
index 6affeec..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro.example;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class AvroTypeExample {
-       
-       
-       public static void main(String[] args) throws Exception {
-               
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               DataSet<User> users = env.createInput(new 
UserGeneratingInputFormat());
-               
-               users
-                       .map(new NumberExtractingMapper())
-                       .groupBy(1)
-                       .reduceGroup(new ConcatenatingReducer())
-                       .print();
-               
-               env.execute();
-       }
-       
-       
-       
-       public static final class NumberExtractingMapper implements 
MapFunction<User, Tuple2<User, Integer>> {
-               
-               @Override
-               public Tuple2<User, Integer> map(User user) {
-                       return new Tuple2<User, Integer>(user, 
user.getFavoriteNumber());
-               }
-       }
-       
-       
-       public static final class ConcatenatingReducer implements 
GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
-
-               @Override
-               public void reduce(Iterable<Tuple2<User, Integer>> values, 
Collector<Tuple2<Integer, String>> out) throws Exception {
-                       int number = 0;
-                       StringBuilder colors = new StringBuilder();
-                       
-                       for (Tuple2<User, Integer> u : values) {
-                               number = u.f1;
-                               colors.append(u.f0.getFavoriteColor()).append(" 
- ");
-                       }
-                       
-                       colors.setLength(colors.length() - 3);
-                       out.collect(new Tuple2<Integer, String>(number, 
colors.toString()));
-               }
-       }
-       
-       
-       public static final class UserGeneratingInputFormat extends 
GenericInputFormat<User> {
-
-               private static final long serialVersionUID = 1L;
-               
-               private static final int NUM = 100;
-               
-               private final Random rnd = new Random(32498562304986L);
-               
-               private static final String[] NAMES = { "Peter", "Bob", 
"Liddy", "Alexander", "Stan" };
-               
-               private static final String[] COLORS = { "mauve", "crimson", 
"copper", "sky", "grass" };
-               
-               private int count;
-               
-
-               @Override
-               public boolean reachedEnd() throws IOException {
-                       return count >= NUM;
-               }
-
-               @Override
-               public User nextRecord(User reuse) throws IOException {
-                       count++;
-                       
-                       User u = new User();
-                       u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-                       u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-                       u.setFavoriteNumber(rnd.nextInt(87));
-                       return u;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
deleted file mode 100644
index 4608f96..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.example;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link #newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, 
java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() 
{
-    return new org.apache.flink.api.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.io.avro.example.User other) {
-            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : 
(java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : 
(java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : 
(java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 09fcacb..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E> {
-       
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AvroInputFormat.class);
-
-       private final Class<E> avroValueType;
-       
-       private boolean reuseAvroValue = true;
-
-       private transient FileReader<E> dataFileReader;
-
-       private transient long end;
-
-       
-       public AvroInputFormat(Path filePath, Class<E> type) {
-               super(filePath);
-               this.avroValueType = type;
-       }
-       
-       
-       /**
-        * Sets the flag whether to reuse the Avro value instance for all 
records.
-        * By default, the input format reuses the Avro value.
-        *
-        * @param reuseAvroValue True, if the input format should reuse the 
Avro value instance, false otherwise.
-        */
-       public void setReuseAvroValue(boolean reuseAvroValue) {
-               this.reuseAvroValue = reuseAvroValue;
-       }
-
-       /**
-        * If set, the InputFormat will only read entire files.
-        */
-       public void setUnsplittable(boolean unsplittable) {
-               this.unsplittable = unsplittable;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Typing
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public TypeInformation<E> getProducedType() {
-               return TypeExtractor.getForClass(this.avroValueType);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Input Format Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
-
-               DatumReader<E> datumReader;
-               if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
-                       datumReader = new SpecificDatumReader<E>(avroValueType);
-               } else {
-                       datumReader = new ReflectDatumReader<E>(avroValueType);
-               }
-               
-               LOG.info("Opening split " + split);
-               
-               SeekableInput in = new FSDataInputStreamWrapper(stream, 
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
-
-               dataFileReader = DataFileReader.openReader(in, datumReader);
-               dataFileReader.sync(split.getStart());
-               this.end = split.getStart() + split.getLength();
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return !dataFileReader.hasNext() || 
dataFileReader.pastSync(end);
-       }
-
-       @Override
-       public E nextRecord(E reuseValue) throws IOException {
-               if (reachedEnd()) {
-                       return null;
-               }
-               
-               if (!reuseAvroValue) {
-                       reuseValue = 
InstantiationUtil.instantiate(avroValueType, Object.class);
-               }
-               
-               reuseValue = dataFileReader.next(reuseValue);
-               return reuseValue;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 
b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index d00dbf7..0000000
--- 
a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
-import java.io.IOException;
-
-public class AvroOutputFormat<E> extends FileOutputFormat<E> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final Class<E> avroValueType;
-
-       private Schema userDefinedSchema = null;
-
-       private transient DataFileWriter<E> dataFileWriter;
-
-       public AvroOutputFormat(Path filePath, Class<E> type) {
-               super(filePath);
-               this.avroValueType = type;
-       }
-
-       public AvroOutputFormat(Class<E> type) {
-               this.avroValueType = type;
-       }
-
-       @Override
-       protected String getDirectoryFileName(int taskNumber) {
-               return super.getDirectoryFileName(taskNumber) + ".avro";
-       }
-
-       public void setSchema(Schema schema) {
-               this.userDefinedSchema = schema;
-       }
-
-       @Override
-       public void writeRecord(E record) throws IOException {
-               dataFileWriter.append(record);
-       }
-
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               super.open(taskNumber, numTasks);
-
-               DatumWriter<E> datumWriter;
-               Schema schema = null;
-               if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
-                       datumWriter = new SpecificDatumWriter<E>(avroValueType);
-                       try {
-                               schema = 
((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
-                       } catch (InstantiationException e) {
-                               throw new RuntimeException(e.getMessage());
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException(e.getMessage());
-                       }
-               } else {
-                       datumWriter = new ReflectDatumWriter<E>(avroValueType);
-                       schema = ReflectData.get().getSchema(avroValueType);
-               }
-               dataFileWriter = new DataFileWriter<E>(datumWriter);
-               if (userDefinedSchema == null) {
-                       dataFileWriter.create(schema, stream);
-               } else {
-                       dataFileWriter.create(userDefinedSchema, stream);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               dataFileWriter.flush();
-               dataFileWriter.close();
-               super.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml 
b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 0f4561a..0000000
--- a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<assembly>
-       <id>test-jar</id>
-       <formats>
-               <format>jar</format>
-       </formats>
-       <includeBaseDirectory>false</includeBaseDirectory>
-       <fileSets>
-               <fileSet>
-                       
<directory>${project.build.testOutputDirectory}</directory>
-                       <outputDirectory>/</outputDirectory>
-                       <!--modify/add include to match your package(s) -->
-                       <includes>
-                               
<include>org/apache/flink/api/avro/testjar/**</include>
-                       </includes>
-               </fileSet>
-       </fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index e2d91af..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.File;
-import java.net.InetAddress;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.RemoteExecutor;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class AvroExternalJarProgramITCase {
-
-       private static final String JAR_FILE = "target/maven-test-jar.jar";
-
-       private static final String TEST_DATA_FILE = "/testdata.avro";
-
-       @Test
-       public void testExternalProgram() {
-
-               ForkableFlinkMiniCluster testMiniCluster = null;
-
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-                       testMiniCluster = new ForkableFlinkMiniCluster(config, 
false);
-                       testMiniCluster.start();
-
-                       String jarFile = JAR_FILE;
-                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
-
-                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
-
-
-                       
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-                       
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
testMiniCluster.getLeaderRPCPort());
-
-                       Client client = new Client(config);
-
-                       client.setPrintStatusDuringExecution(false);
-                       client.runBlocking(program, 4);
-
-               }
-               catch (Throwable t) {
-                       System.err.println(t.getMessage());
-                       t.printStackTrace();
-                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
-               }
-               finally {
-                       if (testMiniCluster != null) {
-                               try {
-                                       testMiniCluster.stop();
-                               } catch (Throwable t) {
-                                       // ignore
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
deleted file mode 100644
index d40fec5..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatITCase extends JavaProgramTestBase {
-
-       public static String outputPath1;
-
-       public static String outputPath2;
-
-       public static String inputPath;
-
-       public static String userData = "alice|1|blue\n" +
-               "bob|2|red\n" +
-               "john|3|yellow\n" +
-               "walt|4|black\n";
-
-       @Override
-       protected void preSubmit() throws Exception {
-               inputPath = createTempFile("user", userData);
-               outputPath1 = getTempDirPath("avro_output1");
-               outputPath2 = getTempDirPath("avro_output2");
-       }
-
-
-       @Override
-       protected void testProgram() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
-                       .fieldDelimiter("|")
-                       .types(String.class, Integer.class, String.class);
-
-               //output the data with AvroOutputFormat for specific user type
-               DataSet<User> specificUser = input.map(new ConvertToUser());
-               specificUser.write(new AvroOutputFormat<User>(User.class), 
outputPath1);
-
-               //output the data with AvroOutputFormat for reflect user type
-               DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());
-               reflectiveUser.write(new 
AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               //compare result for specific user type
-               File [] output1;
-               File file1 = asFile(outputPath1);
-               if (file1.isDirectory()) {
-                       output1 = file1.listFiles();
-                       // check for avro ext in dir.
-                       for (File avroOutput : output1) {
-                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
-                       }
-               } else {
-                       output1 = new File[] {file1};
-               }
-               List<String> result1 = new ArrayList<String>();
-               DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
-               for (File avroOutput : output1) {
-
-                       DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
-                       while (dataFileReader1.hasNext()) {
-                               User user = dataFileReader1.next();
-                               result1.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result1.contains(expectedResult));
-               }
-
-               //compare result for reflect user type
-               File [] output2;
-               File file2 = asFile(outputPath2);
-               if (file2.isDirectory()) {
-                       output2 = file2.listFiles();
-               } else {
-                       output2 = new File[] {file2};
-               }
-               List<String> result2 = new ArrayList<String>();
-               DatumReader<ReflectiveUser> userDatumReader2 = new 
ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-               for (File avroOutput : output2) {
-                       DataFileReader<ReflectiveUser> dataFileReader2 = new 
DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
-                       while (dataFileReader2.hasNext()) {
-                               ReflectiveUser user = dataFileReader2.next();
-                               result2.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result2.contains(expectedResult));
-               }
-
-
-       }
-
-
-       public final static class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
-
-               @Override
-               public User map(Tuple3<String, Integer, String> value) throws 
Exception {
-                       return new User(value.f0, value.f1, value.f2);
-               }
-       }
-
-       public final static class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
-
-               @Override
-               public ReflectiveUser map(User value) throws Exception {
-                       return new ReflectiveUser(value.getName().toString(), 
value.getFavoriteNumber(), value.getFavoriteColor().toString());
-               }
-       }
-
-       
-       public static class ReflectiveUser {
-               private String name;
-               private int favoriteNumber;
-               private String favoriteColor;
-
-               public ReflectiveUser() {}
-
-               public ReflectiveUser(String name, int favoriteNumber, String 
favoriteColor) {
-                       this.name = name;
-                       this.favoriteNumber = favoriteNumber;
-                       this.favoriteColor = favoriteColor;
-               }
-               
-               public String getName() {
-                       return this.name;
-               }
-               public String getFavoriteColor() {
-                       return this.favoriteColor;
-               }
-               public int getFavoriteNumber() {
-                       return this.favoriteNumber;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
deleted file mode 100644
index c39db15..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes 
for Avro serialization.
- */
-public class EncoderDecoderTest {
-       @Test
-       public void testComplexStringsDirecty() {
-               try {
-                       Random rnd = new Random(349712539451944123L);
-                       
-                       for (int i = 0; i < 10; i++) {
-                               String testString = 
StringUtils.getRandomString(rnd, 10, 100);
-                               
-                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                               {
-                                       DataOutputStream dataOut = new 
DataOutputStream(baos);
-                                       DataOutputEncoder encoder = new 
DataOutputEncoder();
-                                       encoder.setOut(dataOut);
-                                       
-                                       encoder.writeString(testString);
-                                       dataOut.flush();
-                                       dataOut.close();
-                               }
-                               
-                               byte[] data = baos.toByteArray();
-                               
-                               // deserialize
-                               {
-                                       ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                                       DataInputStream dataIn = new 
DataInputStream(bais);
-                                       DataInputDecoder decoder = new 
DataInputDecoder();
-                                       decoder.setIn(dataIn);
-       
-                                       String deserialized = 
decoder.readString();
-                                       
-                                       assertEquals(testString, deserialized);
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPrimitiveTypes() {
-               
-               testObjectSerialization(new Boolean(true));
-               testObjectSerialization(new Boolean(false));
-               
-               testObjectSerialization(Byte.valueOf((byte) 0));
-               testObjectSerialization(Byte.valueOf((byte) 1));
-               testObjectSerialization(Byte.valueOf((byte) -1));
-               testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
-               testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-               
-               testObjectSerialization(Short.valueOf((short) 0));
-               testObjectSerialization(Short.valueOf((short) 1));
-               testObjectSerialization(Short.valueOf((short) -1));
-               testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
-               testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-               
-               testObjectSerialization(Integer.valueOf(0));
-               testObjectSerialization(Integer.valueOf(1));
-               testObjectSerialization(Integer.valueOf(-1));
-               testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
-               testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-               
-               testObjectSerialization(Long.valueOf(0));
-               testObjectSerialization(Long.valueOf(1));
-               testObjectSerialization(Long.valueOf(-1));
-               testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
-               testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-               
-               testObjectSerialization(Float.valueOf(0));
-               testObjectSerialization(Float.valueOf(1));
-               testObjectSerialization(Float.valueOf(-1));
-               testObjectSerialization(Float.valueOf((float)Math.E));
-               testObjectSerialization(Float.valueOf((float)Math.PI));
-               testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
-               testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
-               testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
-               testObjectSerialization(Float.valueOf(Float.NaN));
-               testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
-               testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-               
-               testObjectSerialization(Double.valueOf(0));
-               testObjectSerialization(Double.valueOf(1));
-               testObjectSerialization(Double.valueOf(-1));
-               testObjectSerialization(Double.valueOf(Math.E));
-               testObjectSerialization(Double.valueOf(Math.PI));
-               testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
-               testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
-               testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
-               testObjectSerialization(Double.valueOf(Double.NaN));
-               
testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
-               
testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
-               
-               testObjectSerialization("");
-               testObjectSerialization("abcdefg");
-               testObjectSerialization("ab\u1535\u0155xyz\u706F");
-               
-               testObjectSerialization(new SimpleTypes(3637, 54876486548L, 
(byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
-               testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) 
-65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 
0.0000001));
-       }
-       
-       @Test
-       public void testArrayTypes() {
-               {
-                       int[] array = new int[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       long[] array = new long[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       float[] array = new float[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       double[] array = new double[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       String[] array = new String[] {"Oh", "my", "what", 
"do", "we", "have", "here", "?"};
-                       testObjectSerialization(array);
-               }
-       }
-       
-       @Test
-       public void testEmptyArray() {
-               {
-                       int[] array = new int[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       long[] array = new long[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       float[] array = new float[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       double[] array = new double[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       String[] array = new String[0];
-                       testObjectSerialization(array);
-               }
-       }
-       
-       @Test
-       public void testObjects() {
-               // simple object containing only primitives
-               {
-                       testObjectSerialization(new Book(976243875L, "The 
Serialization Odysse", 42));
-               }
-               
-               // object with collection
-               {
-                       ArrayList<String> list = new ArrayList<String>();
-                       list.add("A");
-                       list.add("B");
-                       list.add("C");
-                       list.add("D");
-                       list.add("E");
-                       
-                       testObjectSerialization(new BookAuthor(976243875L, 
list, "Arno Nym"));
-               }
-               
-               // object with empty collection
-               {
-                       ArrayList<String> list = new ArrayList<String>();
-                       testObjectSerialization(new BookAuthor(987654321L, 
list, "The Saurus"));
-               }
-       }
-       
-       @Test
-       public void testNestedObjectsWithCollections() {
-               testObjectSerialization(new ComplexNestedObject2(true));
-       }
-       
-       @Test
-       public void testGeneratedObjectWithNullableFields() {
-               List<CharSequence> strings = Arrays.asList(new CharSequence[] { 
"These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", 
"sequence" });
-               List<Boolean> bools = Arrays.asList(true, true, false, false, 
true, false, true, true);
-               Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
-               map.put("1", 1L);
-               map.put("2", 2L);
-               map.put("3", 3L);
-
-               byte[] b = new byte[16];
-               new Random().nextBytes(b);
-               Fixed16 f = new Fixed16(b);
-               Address addr = new Address(new Integer(239), "6th Main", 
"Bangalore",
-                               "Karnataka", "560075");
-               User user = new User("Freudenreich", 1337, "macintosh gray",
-                               1234567890L, 3.1415926, null, true, strings, 
bools, null,
-                               Colors.GREEN, map, f, new Boolean(true), addr);
-               
-               testObjectSerialization(user);
-       }
-       
-       @Test
-       public void testVarLenCountEncoding() {
-               try {
-                       long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 
45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-                       
-                       // write
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                       {
-                               DataOutputStream dataOut = new 
DataOutputStream(baos);
-                               
-                               for (long val : values) {
-                                       
DataOutputEncoder.writeVarLongCount(dataOut, val);
-                               }
-                               
-                               dataOut.flush();
-                               dataOut.close();
-                       }
-                       
-                       // read
-                       {
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-                               DataInputStream dataIn = new 
DataInputStream(bais);
-                               
-                               for (long val : values) {
-                                       long read = 
DataInputDecoder.readVarLongCount(dataIn);
-                                       assertEquals("Wrong var-len encoded 
value read.", val, read);
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       private static <X> void testObjectSerialization(X obj) {
-               
-               try {
-                       
-                       // serialize
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                       {
-                               DataOutputStream dataOut = new 
DataOutputStream(baos);
-                               DataOutputEncoder encoder = new 
DataOutputEncoder();
-                               encoder.setOut(dataOut);
-                               
-                               @SuppressWarnings("unchecked")
-                               Class<X> clazz = (Class<X>) obj.getClass();
-                               ReflectDatumWriter<X> writer = new 
ReflectDatumWriter<X>(clazz);
-                               
-                               writer.write(obj, encoder);
-                               dataOut.flush();
-                               dataOut.close();
-                       }
-                       
-                       byte[] data = baos.toByteArray();
-                       X result = null;
-                       
-                       // deserialize
-                       {
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                               DataInputStream dataIn = new 
DataInputStream(bais);
-                               DataInputDecoder decoder = new 
DataInputDecoder();
-                               decoder.setIn(dataIn);
-
-                               @SuppressWarnings("unchecked")
-                               Class<X> clazz = (Class<X>) obj.getClass();
-                               ReflectDatumReader<X> reader = new 
ReflectDatumReader<X>(clazz);
-                               
-                               // create a reuse object if possible, otherwise 
we have no reuse object 
-                               X reuse = null;
-                               try {
-                                       @SuppressWarnings("unchecked")
-                                       X test = (X) 
obj.getClass().newInstance();
-                                       reuse = test;
-                               } catch (Throwable t) {}
-                               
-                               result = reader.read(reuse, decoder);
-                       }
-                       
-                       // check
-                       final String message = "Deserialized object is not the 
same as the original";
-                       
-                       if (obj.getClass().isArray()) {
-                               Class<?> clazz = obj.getClass();
-                               if (clazz == byte[].class) {
-                                       assertArrayEquals(message, (byte[]) 
obj, (byte[]) result);
-                               }
-                               else if (clazz == short[].class) {
-                                       assertArrayEquals(message, (short[]) 
obj, (short[]) result);
-                               }
-                               else if (clazz == int[].class) {
-                                       assertArrayEquals(message, (int[]) obj, 
(int[]) result);
-                               }
-                               else if (clazz == long[].class) {
-                                       assertArrayEquals(message, (long[]) 
obj, (long[]) result);
-                               }
-                               else if (clazz == char[].class) {
-                                       assertArrayEquals(message, (char[]) 
obj, (char[]) result);
-                               }
-                               else if (clazz == float[].class) {
-                                       assertArrayEquals(message, (float[]) 
obj, (float[]) result, 0.0f);
-                               }
-                               else if (clazz == double[].class) {
-                                       assertArrayEquals(message, (double[]) 
obj, (double[]) result, 0.0);
-                               } else {
-                                       assertArrayEquals(message, (Object[]) 
obj, (Object[]) result);
-                               }
-                       } else {
-                               assertEquals(message, obj, result);
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Test Objects
-       // 
--------------------------------------------------------------------------------------------
-
-
-       public static final class SimpleTypes {
-               
-               private final int iVal;
-               private final long lVal;
-               private final byte bVal;
-               private final String sVal;
-               private final short rVal;
-               private final double dVal;
-               
-               
-               public SimpleTypes() {
-                       this(0, 0, (byte) 0, "", (short) 0, 0);
-               }
-               
-               public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, 
short rVal, double dVal) {
-                       this.iVal = iVal;
-                       this.lVal = lVal;
-                       this.bVal = bVal;
-                       this.sVal = sVal;
-                       this.rVal = rVal;
-                       this.dVal = dVal;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == SimpleTypes.class) {
-                               SimpleTypes other = (SimpleTypes) obj;
-                               
-                               return other.iVal == this.iVal &&
-                                               other.lVal == this.lVal &&
-                                               other.bVal == this.bVal &&
-                                               other.sVal.equals(this.sVal) &&
-                                               other.rVal == this.rVal &&
-                                               other.dVal == this.dVal;
-                               
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class ComplexNestedObject1 {
-               
-               private double doubleValue;
-               
-               private List<String> stringList;
-               
-               public ComplexNestedObject1() {}
-               
-               public ComplexNestedObject1(int offInit) {
-                       this.doubleValue = 6293485.6723 + offInit;
-                               
-                       this.stringList = new ArrayList<String>();
-                       this.stringList.add("A" + offInit);
-                       this.stringList.add("somewhat" + offInit);
-                       this.stringList.add("random" + offInit);
-                       this.stringList.add("collection" + offInit);
-                       this.stringList.add("of" + offInit);
-                       this.stringList.add("strings" + offInit);
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == ComplexNestedObject1.class) {
-                               ComplexNestedObject1 other = 
(ComplexNestedObject1) obj;
-                               return other.doubleValue == this.doubleValue && 
this.stringList.equals(other.stringList);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class ComplexNestedObject2 {
-               
-               private long longValue;
-               
-               private Map<String, ComplexNestedObject1> theMap;
-               
-               public ComplexNestedObject2() {}
-               
-               public ComplexNestedObject2(boolean init) {
-                       this.longValue = 46547;
-                               
-                       this.theMap = new HashMap<String, 
ComplexNestedObject1>();
-                       this.theMap.put("36354L", new 
ComplexNestedObject1(43546543));
-                       this.theMap.put("785611L", new 
ComplexNestedObject1(45784568));
-                       this.theMap.put("43L", new 
ComplexNestedObject1(9876543));
-                       this.theMap.put("-45687L", new 
ComplexNestedObject1(7897615));
-                       this.theMap.put("1919876876896L", new 
ComplexNestedObject1(27154));
-                       this.theMap.put("-868468468L", new 
ComplexNestedObject1(546435));
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == ComplexNestedObject2.class) {
-                               ComplexNestedObject2 other = 
(ComplexNestedObject2) obj;
-                               return other.longValue == this.longValue && 
this.theMap.equals(other.theMap);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class Book {
-
-               private long bookId;
-               private String title;
-               private long authorId;
-
-               public Book() {}
-
-               public Book(long bookId, String title, long authorId) {
-                       this.bookId = bookId;
-                       this.title = title;
-                       this.authorId = authorId;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == Book.class) {
-                               Book other = (Book) obj;
-                               return other.bookId == this.bookId && 
other.authorId == this.authorId && this.title.equals(other.title);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-       public static class BookAuthor {
-
-               private long authorId;
-               private List<String> bookTitles;
-               private String authorName;
-
-               public BookAuthor() {}
-
-               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
-                       this.authorId = authorId;
-                       this.bookTitles = bookTitles;
-                       this.authorName = authorName;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == BookAuthor.class) {
-                               BookAuthor other = (BookAuthor) obj;
-                               return other.authorName.equals(this.authorName) 
&& other.authorId == this.authorId &&
-                                               
other.bookTitles.equals(this.bookTitles);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
deleted file mode 100644
index 1174786..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro.testjar;
-
-// 
================================================================================================
-//  This file defines the classes for the AvroExternalJarProgramITCase.
-//  The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE 
NOT COMPILED
-//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS 
LOADING WILL
-//  NOT BE COVERED BY THIS TEST.
-// 
================================================================================================
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.core.fs.Path;
-
-public class AvroExternalJarProgram  {
-
-       public static final class Color {
-               
-               private String name;
-               private double saturation;
-               
-               public Color() {
-                       name = "";
-                       saturation = 1.0;
-               }
-               
-               public Color(String name, double saturation) {
-                       this.name = name;
-                       this.saturation = saturation;
-               }
-               
-               public String getName() {
-                       return name;
-               }
-               
-               public void setName(String name) {
-                       this.name = name;
-               }
-               
-               public double getSaturation() {
-                       return saturation;
-               }
-               
-               public void setSaturation(double saturation) {
-                       this.saturation = saturation;
-               }
-               
-               @Override
-               public String toString() {
-                       return name + '(' + saturation + ')';
-               }
-       }
-       
-       public static final class MyUser {
-               
-               private String name;
-               private List<Color> colors;
-               
-               public MyUser() {
-                       name = "unknown";
-                       colors = new ArrayList<Color>();
-               }
-               
-               public MyUser(String name, List<Color> colors) {
-                       this.name = name;
-                       this.colors = colors;
-               }
-               
-               public String getName() {
-                       return name;
-               }
-               
-               public List<Color> getColors() {
-                       return colors;
-               }
-               
-               public void setName(String name) {
-                       this.name = name;
-               }
-               
-               public void setColors(List<Color> colors) {
-                       this.colors = colors;
-               }
-               
-               @Override
-               public String toString() {
-                       return name + " : " + colors;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class NameExtractor extends RichMapFunction<MyUser, 
Tuple2<String, MyUser>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, MyUser> map(MyUser u) {
-                       String namePrefix = u.getName().substring(0, 1);
-                       return new Tuple2<String, MyUser>(namePrefix, u);
-               }
-       }
-       
-       public static final class NameGrouper extends 
RichReduceFunction<Tuple2<String, MyUser>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> 
val1, Tuple2<String, MyUser> val2) {
-                       return val1;
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Test Data
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class Generator {
-               
-               private final Random rnd = new Random(2389756789345689276L);
-               
-               public MyUser nextUser() {
-                       return randomUser();
-               }
-               
-               private MyUser randomUser() {
-                       
-                       int numColors = rnd.nextInt(5);
-                       ArrayList<Color> colors = new 
ArrayList<Color>(numColors);
-                       for (int i = 0; i < numColors; i++) {
-                               colors.add(new Color(randomString(), 
rnd.nextDouble()));
-                       }
-                       
-                       return new MyUser(randomString(), colors);
-               }
-               
-               private String randomString() {
-                       char[] c = new char[this.rnd.nextInt(20) + 5];
-                       
-                       for (int i = 0; i < c.length; i++) {
-                               c[i] = (char) (this.rnd.nextInt(150) + 40);
-                       }
-                       
-                       return new String(c);
-               }
-       }
-       
-       public static void writeTestData(File testFile, int numRecords) throws 
IOException {
-               
-               DatumWriter<MyUser> userDatumWriter = new 
ReflectDatumWriter<MyUser>(MyUser.class);
-               DataFileWriter<MyUser> dataFileWriter = new 
DataFileWriter<MyUser>(userDatumWriter);
-               
-               
dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-               
-               
-               Generator generator = new Generator();
-               
-               for (int i = 0; i < numRecords; i++) {
-                       MyUser user = generator.nextUser();
-                       dataFileWriter.append(user);
-               }
-               
-               dataFileWriter.close();
-       }
-
-//     public static void main(String[] args) throws Exception {
-//             String testDataFile = new 
File("src/test/resources/testdata.avro").getAbsolutePath();
-//             writeTestData(new File(testDataFile), 50);
-//     }
-       
-       public static void main(String[] args) throws Exception {
-               String inputPath = args[0];
-               
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               DataSet<MyUser> input = env.createInput(new 
AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-       
-               DataSet<Tuple2<String, MyUser>> result = input.map(new 
NameExtractor()).groupBy(0).reduce(new NameGrouper());
-               
-               result.output(new 
DiscardingOutputFormat<Tuple2<String,MyUser>>());
-               env.execute();
-       }
-}

Reply via email to