http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
deleted file mode 100644
index 8579dee..0000000
--- 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-public class HBaseFlinkTestConstants {
-       
-       public static final byte[] CF_SOME = "someCf".getBytes();
-       public static final byte[] Q_SOME = "someQual".getBytes();
-       public static final String TEST_TABLE_NAME = "test-table";
-       public static final String TMP_DIR = "/tmp/test";
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index dccf876..0000000
--- 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Simple stub for HBase DataSet read
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
- *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
- * </ul>
- * 
- * The test should return just the first entry.
- * 
- */
-public class HBaseReadExample {
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               @SuppressWarnings("serial")
-               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
TableInputFormat<Tuple2<String, String>>() {
-                       
-                               @Override
-                               public String getTableName() {
-                                       return 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
-                               }
-
-                               @Override
-                               protected Scan getScanner() {
-                                       Scan scan = new Scan();
-                                       
scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
-                                       return scan;
-                               }
-
-                               private Tuple2<String, String> reuse = new 
Tuple2<String, String>();
-                               
-                               @Override
-                               protected Tuple2<String, String> 
mapResultToTuple(Result r) {
-                                       String key = Bytes.toString(r.getRow());
-                                       String val = 
Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, 
HBaseFlinkTestConstants.Q_SOME));
-                                       reuse.setField(key, 0);
-                                       reuse.setField(val, 1);
-                                       return reuse;
-                               }
-               })
-               .filter(new FilterFunction<Tuple2<String,String>>() {
-
-                       @Override
-                       public boolean filter(Tuple2<String, String> t) throws 
Exception {
-                               String val = t.getField(1);
-                               if(val.startsWith("someStr"))
-                                       return true;
-                               return false;
-                       }
-               });
-               
-               hbaseDs.print();
-               
-               // kick off execution.
-               env.execute();
-                               
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
deleted file mode 100644
index 483bdff..0000000
--- 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Simple stub for HBase DataSet write
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class HBaseWriteExample {
-       
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-       
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-               
-               // set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // get input data
-               DataSet<String> text = getTextDataSet(env);
-               
-               DataSet<Tuple2<String, Integer>> counts = 
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               text.flatMap(new Tokenizer())
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .groupBy(0)
-                               .sum(1);
-
-               // emit result
-               Job job = Job.getInstance();
-               job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
outputTableName);
-               // TODO is "mapred.output.dir" really useful?
-               
job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
-               counts.map(new RichMapFunction <Tuple2<String,Integer>, 
Tuple2<Text,Mutation>>() {
-                       private transient Tuple2<Text, Mutation> reuse;
-
-                       @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               reuse = new Tuple2<Text, Mutation>();
-                       }
-
-                       @Override
-                       public Tuple2<Text, Mutation> map(Tuple2<String, 
Integer> t) throws Exception {
-                               reuse.f0 = new Text(t.f0);
-                               Put put = new Put(t.f0.getBytes());
-                               
put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, 
Bytes.toBytes(t.f1));
-                               reuse.f1 = put;
-                               return reuse;
-                       }
-               }).output(new HadoopOutputFormat<Text, Mutation>(new 
TableOutputFormat<Text>(), job));
-               
-               // execute program
-               env.execute("WordCount (HBase sink) Example");
-       }
-       
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-       
-       /**
-        * Implements the string tokenizer that splits sentences into words as 
a user-defined
-        * FlatMapFunction. The function takes a line (String) and splits it 
into 
-        * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-        */
-       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-
-               @Override
-               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
-                       // normalize and split the line
-                       String[] tokens = value.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-       
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputTableName = 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
-       
-       private static boolean parseParameters(String[] args) {
-               
-               if(args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(args.length == 2) {
-                               textPath = args[0];
-                               outputTableName = args[1];
-                       } else {
-                               System.err.println("Usage: HBaseWriteExample 
<text path> <output table>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing HBaseWriteExample example 
with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from a file.");
-                       System.out.println("  Usage: HBaseWriteExample <text 
path> <output table>");
-               }
-               return true;
-       }
-       
-       private static DataSet<String> getTextDataSet(ExecutionEnvironment env) 
{
-               if(fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               } else {
-                       // get default test text data
-                       return getDefaultTextLineDataSet(env);
-               }
-       }
-       private static DataSet<String> 
getDefaultTextLineDataSet(ExecutionEnvironment env) {
-               return env.fromElements(WORDS);
-       }
-       private static final String[] WORDS = new String[] {
-               "To be, or not to be,--that is the question:--",
-               "Whether 'tis nobler in the mind to suffer",
-               "The slings and arrows of outrageous fortune",
-               "Or to take arms against a sea of troubles,",
-               "And by opposing end them?--To die,--to sleep,--",
-               "No more; and by a sleep to say we end",
-               "The heartache, and the thousand natural shocks",
-               "That flesh is heir to,--'tis a consummation",
-               "Devoutly to be wish'd. To die,--to sleep;--",
-               "To sleep! perchance to dream:--ay, there's the rub;",
-               "For in that sleep of death what dreams may come,",
-               "When we have shuffled off this mortal coil,",
-               "Must give us pause: there's the respect",
-               "That makes calamity of so long life;",
-               "For who would bear the whips and scorns of time,",
-               "The oppressor's wrong, the proud man's contumely,",
-               "The pangs of despis'd love, the law's delay,",
-               "The insolence of office, and the spurns",
-               "That patient merit of the unworthy takes,",
-               "When he himself might his quietus make",
-               "With a bare bodkin? who would these fardels bear,",
-               "To grunt and sweat under a weary life,",
-               "But that the dread of something after death,--",
-               "The undiscover'd country, from whose bourn",
-               "No traveller returns,--puzzles the will,",
-               "And makes us rather bear those ills we have",
-               "Than fly to others that we know not of?",
-               "Thus conscience does make cowards of us all;",
-               "And thus the native hue of resolution",
-               "Is sicklied o'er with the pale cast of thought;",
-               "And enterprises of great pith and moment,",
-               "With this regard, their currents turn awry,",
-               "And lose the name of action.--Soft you now!",
-               "The fair Ophelia!--Nymph, in thy orisons",
-               "Be all my sins remember'd."
-       };
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 
b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
deleted file mode 100644
index a6be1a6..0000000
--- 
a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * 
- * This is an example how to write streams into HBase. In this example the
- * stream will be written into a local Hbase but it is possible to adapt this
- * example for an HBase running in a cloud. You need a running local HBase 
with a
- * table "flinkExample" and a column "entry". If your HBase configuration does
- * not fit the hbase-site.xml in the resource folder then you gave to delete 
temporary this
- * hbase-site.xml to execute the example properly.
- * 
- */
-public class HBaseWriteStreamExample {
-
-       public static void main(String[] args) {
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment
-                               .getExecutionEnvironment();
-
-               // data stream with random numbers
-               DataStream<String> dataStream = env.addSource(new 
SourceFunction<String>() {
-                       private static final long serialVersionUID = 1L;
-
-                       private volatile boolean isRunning = true;
-
-                       @Override
-                       public void run(SourceContext<String> out) throws 
Exception {
-                               while (isRunning) {
-                                       
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
-                               }
-
-                       }
-
-                       @Override
-                       public void cancel() {
-                               isRunning = false;
-                       }
-               });
-               dataStream.write(new HBaseOutputFormat(), 0L);
-
-               try {
-                       env.execute();
-               } catch (Exception e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
-       }
-
-       /**
-        * 
-        * This class implements an OutputFormat for HBase
-        *
-        */
-       private static class HBaseOutputFormat implements OutputFormat<String> {
-
-               private org.apache.hadoop.conf.Configuration conf = null;
-               private HTable table = null;
-               private String taskNumber = null;
-               private int rowNumber = 0;
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void configure(Configuration parameters) {
-                       conf = HBaseConfiguration.create();
-               }
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
-                       table = new HTable(conf, "flinkExample");
-                       this.taskNumber = String.valueOf(taskNumber);
-               }
-
-               @Override
-               public void writeRecord(String record) throws IOException {
-                       Put put = new Put(Bytes.toBytes(taskNumber + 
rowNumber));
-                       put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
-                                       Bytes.toBytes(rowNumber));
-                       rowNumber++;
-                       table.put(put);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       table.flushCommits();
-                       table.close();
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml 
b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
deleted file mode 100644
index 2984063..0000000
--- a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
-<configuration>
-
-  <property>
-    <name>hbase.tmp.dir</name>
-    <!-- 
-    <value>/media/Dati/hbase-0.98-data</value>
-    --> 
-    <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
-
-  </property>
-  <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>localhost</value>
-  </property>
-    <!-- 
-  <property>
-    <name>hadoop.security.group.mapping</name>
-    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
-  </property>
-  -->
-</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/test/resources/log4j.properties 
b/flink-staging/flink-hbase/src/test/resources/log4j.properties
deleted file mode 100644
index d6eb2b2..0000000
--- a/flink-staging/flink-hbase/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=${hadoop.root.logger}
-hadoop.root.logger=INFO,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hcatalog/pom.xml 
b/flink-staging/flink-hcatalog/pom.xml
deleted file mode 100644
index 00ce577..0000000
--- a/flink-staging/flink-hcatalog/pom.xml
+++ /dev/null
@@ -1,179 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-hcatalog</artifactId>
-       <name>flink-hcatalog</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hive.hcatalog</groupId>
-                       <artifactId>hcatalog-core</artifactId>
-                       <version>0.12.0</version>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               
<phase>generate-test-sources</phase>
-                                               <goals>
-                                                       
<goal>add-test-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/test/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <version>0.5.0</version>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>check</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <verbose>false</verbose>
-                                       <failOnViolation>true</failOnViolation>
-                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
-                                       <failOnWarning>false</failOnWarning>
-                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-                                       <outputEncoding>UTF-8</outputEncoding>
-                               </configuration>
-                       </plugin>
-
-               </plugins>
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
deleted file mode 100644
index 859b706..0000000
--- 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog;
-
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink-native tuple.
- *
- * Note: Flink tuples might only support a limited number of fields (depending 
on the API).
- *
- * @param <T>
- */
-public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, 
HadoopInputSplit> implements ResultTypeQueryable<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       private Configuration configuration;
-
-       private org.apache.hive.hcatalog.mapreduce.HCatInputFormat 
hCatInputFormat;
-       private RecordReader<WritableComparable, HCatRecord> recordReader;
-       private boolean fetched = false;
-       private boolean hasNext;
-
-       protected String[] fieldNames = new String[0];
-       protected HCatSchema outputSchema;
-
-       private TypeInformation<T> resultType;
-
-       public HCatInputFormatBase() { }
-
-       /**
-        * Creates a HCatInputFormat for the given database and table.
-        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
-        * {@link HCatInputFormatBase#asFlinkTuples()}.
-        *
-        * @param database The name of the database to read from.
-        * @param table The name of the table to read.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase(String database, String table) throws 
IOException {
-               this(database, table, new Configuration());
-       }
-
-       /**
-        * Creates a HCatInputFormat for the given database, table, and
-        * {@link org.apache.hadoop.conf.Configuration}.
-        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
-        * {@link HCatInputFormatBase#asFlinkTuples()}.
-        *
-        * @param database The name of the database to read from.
-        * @param table The name of the table to read.
-        * @param config The Configuration for the InputFormat.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase(String database, String table, Configuration 
config) throws IOException {
-               super();
-               this.configuration = config;
-               HadoopUtils.mergeHadoopConf(this.configuration);
-
-               this.hCatInputFormat = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, 
database, table);
-               this.outputSchema = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
-
-               // configure output schema of HCatFormat
-               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
-               // set type information
-               this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
-       }
-
-       /**
-        * Specifies the fields which are returned by the InputFormat and their 
order.
-        *
-        * @param fields The fields and their order which are returned by the 
InputFormat.
-        * @return This InputFormat with specified return fields.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase<T> getFields(String... fields) throws 
IOException {
-
-               // build output schema
-               ArrayList<HCatFieldSchema> fieldSchemas = new 
ArrayList<HCatFieldSchema>(fields.length);
-               for(String field : fields) {
-                       fieldSchemas.add(this.outputSchema.get(field));
-               }
-               this.outputSchema = new HCatSchema(fieldSchemas);
-
-               // update output schema configuration
-               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
-
-               return this;
-       }
-
-       /**
-        * Specifies a SQL-like filter condition on the table's partition 
columns.
-        * Filter conditions on non-partition columns are invalid.
-        * A partition filter can significantly reduce the amount of data to be 
read.
-        *
-        * @param filter A SQL-like filter condition on the table's partition 
columns.
-        * @return This InputFormat with specified partition filter.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase<T> withFilter(String filter) throws 
IOException {
-
-               // set filter
-               this.hCatInputFormat.setFilter(filter);
-
-               return this;
-       }
-
-       /**
-        * Specifies that the InputFormat returns Flink tuples instead of
-        * {@link org.apache.hive.hcatalog.data.HCatRecord}.
-        *
-        * Note: Flink tuples might only support a limited number of fields 
(depending on the API).
-        *
-        * @return This InputFormat.
-        * @throws org.apache.hive.hcatalog.common.HCatException
-        */
-       public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
-
-               // build type information
-               int numFields = outputSchema.getFields().size();
-               if(numFields > this.getMaxFlinkTupleSize()) {
-                       throw new IllegalArgumentException("Only up to 
"+this.getMaxFlinkTupleSize()+
-                                       " fields can be returned as Flink 
tuples.");
-               }
-
-               TypeInformation[] fieldTypes = new TypeInformation[numFields];
-               fieldNames = new String[numFields];
-               for (String fieldName : outputSchema.getFieldNames()) {
-                       HCatFieldSchema field = outputSchema.get(fieldName);
-
-                       int fieldPos = outputSchema.getPosition(fieldName);
-                       TypeInformation fieldType = getFieldType(field);
-
-                       fieldTypes[fieldPos] = fieldType;
-                       fieldNames[fieldPos] = fieldName;
-
-               }
-               this.resultType = new TupleTypeInfo(fieldTypes);
-
-               return this;
-       }
-
-       protected abstract int getMaxFlinkTupleSize();
-
-       private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
-
-               switch(fieldSchema.getType()) {
-                       case INT:
-                               return BasicTypeInfo.INT_TYPE_INFO;
-                       case TINYINT:
-                               return BasicTypeInfo.BYTE_TYPE_INFO;
-                       case SMALLINT:
-                               return BasicTypeInfo.SHORT_TYPE_INFO;
-                       case BIGINT:
-                               return BasicTypeInfo.LONG_TYPE_INFO;
-                       case BOOLEAN:
-                               return BasicTypeInfo.BOOLEAN_TYPE_INFO;
-                       case FLOAT:
-                               return BasicTypeInfo.FLOAT_TYPE_INFO;
-                       case DOUBLE:
-                               return BasicTypeInfo.DOUBLE_TYPE_INFO;
-                       case STRING:
-                               return BasicTypeInfo.STRING_TYPE_INFO;
-                       case BINARY:
-                               return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-                       case ARRAY:
-                               return new GenericTypeInfo(List.class);
-                       case MAP:
-                               return new GenericTypeInfo(Map.class);
-                       case STRUCT:
-                               return new GenericTypeInfo(List.class);
-                       default:
-                               throw new IllegalArgumentException("Unknown 
data type \""+fieldSchema.getType()+"\" encountered.");
-               }
-       }
-
-       /**
-        * Returns the {@link org.apache.hadoop.conf.Configuration} of the 
HCatInputFormat.
-        *
-        * @return The Configuration of the HCatInputFormat.
-        */
-       public Configuration getConfiguration() {
-               return this.configuration;
-       }
-
-       /**
-        * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} 
of the {@link org.apache.hive.hcatalog.data.HCatRecord}
-        * returned by this InputFormat.
-        *
-        * @return The HCatSchema of the HCatRecords returned by this 
InputFormat.
-        */
-       public HCatSchema getOutputSchema() {
-               return this.outputSchema;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  InputFormat
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void configure(org.apache.flink.configuration.Configuration 
parameters) {
-               // nothing to do
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
-               // no statistics provided at the moment
-               return null;
-       }
-
-       @Override
-       public HadoopInputSplit[] createInputSplits(int minNumSplits)
-                       throws IOException {
-               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
-
-               JobContext jobContext = null;
-               try {
-                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               List<InputSplit> splits;
-               try {
-                       splits = this.hCatInputFormat.getSplits(jobContext);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get Splits.", e);
-               }
-               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
-
-               for(int i = 0; i < hadoopInputSplits.length; i++){
-                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
-               }
-               return hadoopInputSplits;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new LocatableInputSplitAssigner(inputSplits);
-       }
-
-       @Override
-       public void open(HadoopInputSplit split) throws IOException {
-               TaskAttemptContext context = null;
-               try {
-                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-               } catch(Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               try {
-                       this.recordReader = this.hCatInputFormat
-                                       
.createRecordReader(split.getHadoopInputSplit(), context);
-                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not create RecordReader.", 
e);
-               } finally {
-                       this.fetched = false;
-               }
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               if(!this.fetched) {
-                       fetchNext();
-               }
-               return !this.hasNext;
-       }
-
-       private void fetchNext() throws IOException {
-               try {
-                       this.hasNext = this.recordReader.nextKeyValue();
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
-               } finally {
-                       this.fetched = true;
-               }
-       }
-
-       @Override
-       public T nextRecord(T record) throws IOException {
-               if(!this.fetched) {
-                       // first record
-                       fetchNext();
-               }
-               if(!this.hasNext) {
-                       return null;
-               }
-               try {
-
-                       // get next HCatRecord
-                       HCatRecord v = this.recordReader.getCurrentValue();
-                       this.fetched = false;
-
-                       if(this.fieldNames.length > 0) {
-                               // return as Flink tuple
-                               return this.buildFlinkTuple(record, v);
-
-                       } else {
-                               // return as HCatRecord
-                               return (T)v;
-                       }
-
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get next record.", e);
-               }
-       }
-
-       protected abstract T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException;
-
-       @Override
-       public void close() throws IOException {
-               this.recordReader.close();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom de/serialization methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeInt(this.fieldNames.length);
-               for(String fieldName : this.fieldNames) {
-                       out.writeUTF(fieldName);
-               }
-               this.configuration.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               this.fieldNames = new String[in.readInt()];
-               for(int i=0; i<this.fieldNames.length; i++) {
-                       this.fieldNames[i] = in.readUTF();
-               }
-
-               Configuration configuration = new Configuration();
-               configuration.readFields(in);
-
-               if(this.configuration == null) {
-                       this.configuration = configuration;
-               }
-
-               this.hCatInputFormat = new 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
-               this.outputSchema = 
(HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Result type business
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return this.resultType;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
deleted file mode 100644
index 46f3cd5..0000000
--- 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.java;
-
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.hcatalog.HCatInputFormatBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as {@link HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
- * Flink tuples support only up to 25 fields.
- *
- * @param <T>
- */
-public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
-       private static final long serialVersionUID = 1L;
-
-       public HCatInputFormat() {}
-
-       public HCatInputFormat(String database, String table) throws Exception {
-               super(database, table);
-       }
-
-       public HCatInputFormat(String database, String table, Configuration 
config) throws Exception {
-               super(database, table, config);
-       }
-
-
-       @Override
-       protected int getMaxFlinkTupleSize() {
-               return 25;
-       }
-
-       @Override
-       protected T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException {
-
-               Tuple tuple = (Tuple)t;
-
-               // Extract all fields from HCatRecord
-               for(int i=0; i < this.fieldNames.length; i++) {
-
-                       // get field value
-                       Object o = record.get(this.fieldNames[i], 
this.outputSchema);
-
-                       // Set field value in Flink tuple.
-                       // Partition columns are returned as String and
-                       //   need to be converted to original type.
-                       switch(this.outputSchema.get(i).getType()) {
-                               case INT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Integer.parseInt((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case TINYINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Byte.parseByte((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case SMALLINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Short.parseShort((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case BIGINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Long.parseLong((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case BOOLEAN:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Boolean.parseBoolean((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case FLOAT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Float.parseFloat((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case DOUBLE:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Double.parseDouble((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case STRING:
-                                       tuple.setField(o, i);
-                                       break;
-                               case BINARY:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type BINARY.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case ARRAY:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type ARRAY.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case MAP:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type MAP.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case STRUCT:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type STRUCT.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               default:
-                                       throw new RuntimeException("Invalid 
Type");
-                       }
-               }
-
-               return (T)tuple;
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
 
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
deleted file mode 100644
index d5a3cbf..0000000
--- 
a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.scala
-
-import org.apache.flink.configuration
-import org.apache.flink.hcatalog.HCatInputFormatBase
-import org.apache.hadoop.conf.Configuration
-import org.apache.hive.hcatalog.data.HCatRecord
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as {@link HCatRecord} or Scala tuples.
- * Scala tuples support only up to 22 fields.
- *
- */
-class HCatInputFormat[T](
-                        database: String,
-                        table: String,
-                        config: Configuration
-                          ) extends HCatInputFormatBase[T](database, table, 
config) {
-
-  def this(database: String, table: String) {
-    this(database, table, new Configuration)
-  }
-
-  var vals: Array[Any] = Array[Any]()
-
-  override def configure(parameters: configuration.Configuration): Unit = {
-    super.configure(parameters)
-    vals = new Array[Any](fieldNames.length)
-  }
-
-  override protected def getMaxFlinkTupleSize: Int = 22
-
-  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
-
-    // Extract all fields from HCatRecord
-    var i: Int = 0
-    while (i < this.fieldNames.length) {
-
-        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
-
-        // partition columns are returned as String
-        //   Check and convert to actual type.
-        this.outputSchema.get(i).getType match {
-          case HCatFieldSchema.Type.INT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt
-            }
-            else {
-              vals(i) = o.asInstanceOf[Int]
-            }
-          case HCatFieldSchema.Type.TINYINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toByte
-            }
-            else {
-              vals(i) = o.asInstanceOf[Byte]
-            }
-          case HCatFieldSchema.Type.SMALLINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toShort
-            }
-            else {
-              vals(i) = o.asInstanceOf[Short]
-            }
-          case HCatFieldSchema.Type.BIGINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toLong
-            }
-            else {
-              vals(i) = o.asInstanceOf[Long]
-            }
-          case HCatFieldSchema.Type.BOOLEAN =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toBoolean
-            }
-            else {
-              vals(i) = o.asInstanceOf[Boolean]
-            }
-          case HCatFieldSchema.Type.FLOAT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toFloat
-            }
-            else {
-              vals(i) = o.asInstanceOf[Float]
-            }
-          case HCatFieldSchema.Type.DOUBLE =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toDouble
-            }
-            else {
-              vals(i) = o.asInstanceOf[Double]
-            }
-          case HCatFieldSchema.Type.STRING =>
-            vals(i) = o
-          case HCatFieldSchema.Type.BINARY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
BINARY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Array[Byte]]
-            }
-          case HCatFieldSchema.Type.ARRAY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
ARRAY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case HCatFieldSchema.Type.MAP =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
MAP.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Map[Object, Object]]
-            }
-          case HCatFieldSchema.Type.STRUCT =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
STRUCT.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case _ =>
-            throw new RuntimeException("Invalid type " + 
this.outputSchema.get(i).getType +
-              " encountered.")
-        }
-
-        i += 1
-      }
-    createScalaTuple(vals)
-  }
-
-  private def createScalaTuple(vals: Array[Any]): T = {
-
-    this.fieldNames.length match {
-      case 1 =>
-        new Tuple1(vals(0)).asInstanceOf[T]
-      case 2 =>
-        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
-      case 3 =>
-        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
-      case 4 =>
-        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
-      case 5 =>
-        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
-      case 6 =>
-        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), 
vals(5)).asInstanceOf[T]
-      case 7 =>
-        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6)).asInstanceOf[T]
-      case 8 =>
-        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7))
-          .asInstanceOf[T]
-      case 9 =>
-        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8)).asInstanceOf[T]
-      case 10 =>
-        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9)).asInstanceOf[T]
-      case 11 =>
-        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10)).asInstanceOf[T]
-      case 12 =>
-        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
-      case 13 =>
-        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
-      case 14 =>
-        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), 
vals(13)).asInstanceOf[T]
-      case 15 =>
-        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), 
vals(14)).asInstanceOf[T]
-      case 16 =>
-        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15))
-          .asInstanceOf[T]
-      case 17 =>
-        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16)).asInstanceOf[T]
-      case 18 =>
-        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17)).asInstanceOf[T]
-      case 19 =>
-        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18)).asInstanceOf[T]
-      case 20 =>
-        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
-      case 21 =>
-        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
-      case 22 =>
-        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20), 
vals(21)).asInstanceOf[T]
-      case _ =>
-        throw new RuntimeException("Only up to 22 fields supported for Scala 
Tuples.")
-
-  }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml
deleted file mode 100644
index 79e26be..0000000
--- a/flink-staging/flink-jdbc/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-jdbc</artifactId>
-       <name>flink-jdbc</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.derby</groupId>
-                       <artifactId>derby</artifactId>
-                       <version>10.10.1.1</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index eb3ac31..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.NullValue;
-
-/**
- * InputFormat to read data from a database and generate tuples.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * 
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, 
InputSplit> implements NonParallelInput {
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
-
-       private String username;
-       private String password;
-       private String drivername;
-       private String dbURL;
-       private String query;
-
-       private transient Connection dbConn;
-       private transient Statement statement;
-       private transient ResultSet resultSet;
-
-       private int[] columnTypes = null;
-
-       public JDBCInputFormat() {
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-       }
-
-       /**
-        * Connects to the source database and executes the query.
-        *
-        * @param ignored
-        * @throws IOException
-        */
-       @Override
-       public void open(InputSplit ignored) throws IOException {
-               try {
-                       establishConnection();
-                       statement = 
dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_READ_ONLY);
-                       resultSet = statement.executeQuery(query);
-               } catch (SQLException se) {
-                       close();
-                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
-               } catch (ClassNotFoundException cnfe) {
-                       throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
-               }
-       }
-
-       private void establishConnection() throws SQLException, 
ClassNotFoundException {
-               Class.forName(drivername);
-               if (username == null) {
-                       dbConn = DriverManager.getConnection(dbURL);
-               } else {
-                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
-               }
-       }
-
-       /**
-        * Closes all resources used.
-        *
-        * @throws IOException Indicates that a resource could not be closed.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       resultSet.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } catch (NullPointerException npe) {
-               }
-               try {
-                       statement.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } catch (NullPointerException npe) {
-               }
-               try {
-                       dbConn.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } catch (NullPointerException npe) {
-               }
-       }
-
-       /**
-        * Checks whether all data has been read.
-        *
-        * @return boolean value indication whether all data has been read.
-        * @throws IOException
-        */
-       @Override
-       public boolean reachedEnd() throws IOException {
-               try {
-                       if (resultSet.isLast()) {
-                               close();
-                               return true;
-                       }
-                       return false;
-               } catch (SQLException se) {
-                       throw new IOException("Couldn't evaluate reachedEnd() - 
" + se.getMessage(), se);
-               }
-       }
-
-       /**
-        * Stores the next resultSet row in a tuple
-        *
-        * @param tuple
-        * @return tuple containing next row
-        * @throws java.io.IOException
-        */
-       @Override
-       public OUT nextRecord(OUT tuple) throws IOException {
-               try {
-                       resultSet.next();
-                       if (columnTypes == null) {
-                               extractTypes(tuple);
-                       }
-                       addValue(tuple);
-                       return tuple;
-               } catch (SQLException se) {
-                       close();
-                       throw new IOException("Couldn't read data - " + 
se.getMessage(), se);
-               } catch (NullPointerException npe) {
-                       close();
-                       throw new IOException("Couldn't access resultSet", npe);
-               }
-       }
-
-       private void extractTypes(OUT tuple) throws SQLException, IOException {
-               ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-               columnTypes = new int[resultSetMetaData.getColumnCount()];
-               if (tuple.getArity() != columnTypes.length) {
-                       close();
-                       throw new IOException("Tuple size does not match 
columncount");
-               }
-               for (int pos = 0; pos < columnTypes.length; pos++) {
-                       columnTypes[pos] = resultSetMetaData.getColumnType(pos 
+ 1);
-               }
-       }
-
-       /**
-        * Enters data value from the current resultSet into a Record.
-        *
-        * @param reuse Target Record.
-        */
-       private void addValue(OUT reuse) throws SQLException {
-               for (int pos = 0; pos < columnTypes.length; pos++) {
-                       switch (columnTypes[pos]) {
-                               case java.sql.Types.NULL:
-                                       reuse.setField(NullValue.getInstance(), 
pos);
-                                       break;
-                               case java.sql.Types.BOOLEAN:
-                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.BIT:
-                                       reuse.setField(resultSet.getBoolean(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.CHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.NCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.VARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.LONGVARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.LONGNVARCHAR:
-                                       reuse.setField(resultSet.getString(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.TINYINT:
-                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.SMALLINT:
-                                       reuse.setField(resultSet.getShort(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.BIGINT:
-                                       reuse.setField(resultSet.getLong(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.INTEGER:
-                                       reuse.setField(resultSet.getInt(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.FLOAT:
-                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.REAL:
-                                       reuse.setField(resultSet.getFloat(pos + 
1), pos);
-                                       break;
-                               case java.sql.Types.DOUBLE:
-                                       reuse.setField(resultSet.getDouble(pos 
+ 1), pos);
-                                       break;
-                               case java.sql.Types.DECIMAL:
-                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-                                       break;
-                               case java.sql.Types.NUMERIC:
-                                       
reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-                                       break;
-                               case java.sql.Types.DATE:
-                                       reuse.setField(resultSet.getDate(pos + 
1).toString(), pos);
-                                       break;
-                               case java.sql.Types.TIME:
-                                       reuse.setField(resultSet.getTime(pos + 
1).getTime(), pos);
-                                       break;
-                               case java.sql.Types.TIMESTAMP:
-                                       
reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
-                                       break;
-                               case java.sql.Types.SQLXML:
-                                       reuse.setField(resultSet.getSQLXML(pos 
+ 1).toString(), pos);
-                                       break;
-                               default:
-                                       throw new SQLException("Unsupported 
sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
-
-                               // case java.sql.Types.BINARY:
-                               // case java.sql.Types.VARBINARY:
-                               // case java.sql.Types.LONGVARBINARY:
-                               // case java.sql.Types.ARRAY:
-                               // case java.sql.Types.JAVA_OBJECT:
-                               // case java.sql.Types.BLOB:
-                               // case java.sql.Types.CLOB:
-                               // case java.sql.Types.NCLOB:
-                               // case java.sql.Types.DATALINK:
-                               // case java.sql.Types.DISTINCT:
-                               // case java.sql.Types.OTHER:
-                               // case java.sql.Types.REF:
-                               // case java.sql.Types.ROWID:
-                               // case java.sql.Types.STRUCT:
-                       }
-               }
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
-               return cachedStatistics;
-       }
-
-       @Override
-       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-               GenericInputSplit[] split = {
-                       new GenericInputSplit(0, 1)
-               };
-               return split;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
-       }
-       
-
-       /**
-        * A builder used to set parameters to the output format's 
configuration in a fluent way.
-        * @return builder
-        */
-       public static JDBCInputFormatBuilder buildJDBCInputFormat() {
-               return new JDBCInputFormatBuilder();
-       }
-
-       public static class JDBCInputFormatBuilder {
-               private final JDBCInputFormat format;
-
-               public JDBCInputFormatBuilder() {
-                       this.format = new JDBCInputFormat();
-               }
-
-               public JDBCInputFormatBuilder setUsername(String username) {
-                       format.username = username;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setPassword(String password) {
-                       format.password = password;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setDrivername(String drivername) {
-                       format.drivername = drivername;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setDBUrl(String dbURL) {
-                       format.dbURL = dbURL;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setQuery(String query) {
-                       format.query = query;
-                       return this;
-               }
-
-               public JDBCInputFormat finish() {
-                       if (format.username == null) {
-                               LOG.info("Username was not supplied 
separately.");
-                       }
-                       if (format.password == null) {
-                               LOG.info("Password was not supplied 
separately.");
-                       }
-                       if (format.dbURL == null) {
-                               throw new IllegalArgumentException("No dababase 
URL supplied.");
-                       }
-                       if (format.query == null) {
-                               throw new IllegalArgumentException("No query 
suplied");
-                       }
-                       if (format.drivername == null) {
-                               throw new IllegalArgumentException("No driver 
supplied");
-                       }
-                       return format;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index 614c5b7..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied 
OutputFormatBuilder.
- * 
- * @param <OUT>
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> 
{
-       private static final long serialVersionUID = 1L;
-
-       @SuppressWarnings("unused")
-       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
-
-       private String username;
-       private String password;
-       private String drivername;
-       private String dbURL;
-       private String query;
-       private int batchInterval = 5000;
-
-       private Connection dbConn;
-       private PreparedStatement upload;
-
-       private SupportedTypes[] types = null;
-
-       private int batchCount = 0;
-
-       public JDBCOutputFormat() {
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-       }
-
-       /**
-        * Connects to the target database and initializes the prepared 
statement.
-        *
-        * @param taskNumber The number of the parallel instance.
-        * @throws IOException Thrown, if the output could not be opened due to 
an
-        * I/O problem.
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               try {
-                       establishConnection();
-                       upload = dbConn.prepareStatement(query);
-               } catch (SQLException sqe) {
-                       close();
-                       throw new IllegalArgumentException("open() failed:\t!", 
sqe);
-               } catch (ClassNotFoundException cnfe) {
-                       close();
-                       throw new IllegalArgumentException("JDBC-Class not 
found:\t", cnfe);
-               }
-       }
-
-       private void establishConnection() throws SQLException, 
ClassNotFoundException {
-               Class.forName(drivername);
-               if (username == null) {
-                       dbConn = DriverManager.getConnection(dbURL);
-               } else {
-                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
-               }
-       }
-
-       private enum SupportedTypes {
-               BOOLEAN,
-               BYTE,
-               SHORT,
-               INTEGER,
-               LONG,
-               STRING,
-               FLOAT,
-               DOUBLE
-       }
-
-       /**
-        * Adds a record to the prepared statement.
-        * <p>
-        * When this method is called, the output format is guaranteed to be 
opened.
-        *
-        * @param tuple The records to add to the output.
-        * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
-        */
-       @Override
-       public void writeRecord(OUT tuple) throws IOException {
-               try {
-                       if (types == null) {
-                               extractTypes(tuple);
-                       }
-                       addValues(tuple);
-                       upload.addBatch();
-                       batchCount++;
-                       if (batchCount >= batchInterval) {
-                               upload.executeBatch();
-                               batchCount = 0;
-                       }
-               } catch (SQLException sqe) {
-                       close();
-                       throw new IllegalArgumentException("writeRecord() 
failed", sqe);
-               } catch (IllegalArgumentException iae) {
-                       close();
-                       throw new IllegalArgumentException("writeRecord() 
failed", iae);
-               }
-       }
-
-       private void extractTypes(OUT tuple) {
-               types = new SupportedTypes[tuple.getArity()];
-               for (int x = 0; x < tuple.getArity(); x++) {
-                       types[x] = 
SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
-               }
-       }
-
-       private void addValues(OUT tuple) throws SQLException {
-               for (int index = 0; index < tuple.getArity(); index++) {
-                       switch (types[index]) {
-                               case BOOLEAN:
-                                       upload.setBoolean(index + 1, (Boolean) 
tuple.getField(index));
-                                       break;
-                               case BYTE:
-                                       upload.setByte(index + 1, (Byte) 
tuple.getField(index));
-                                       break;
-                               case SHORT:
-                                       upload.setShort(index + 1, (Short) 
tuple.getField(index));
-                                       break;
-                               case INTEGER:
-                                       upload.setInt(index + 1, (Integer) 
tuple.getField(index));
-                                       break;
-                               case LONG:
-                                       upload.setLong(index + 1, (Long) 
tuple.getField(index));
-                                       break;
-                               case STRING:
-                                       upload.setString(index + 1, (String) 
tuple.getField(index));
-                                       break;
-                               case FLOAT:
-                                       upload.setFloat(index + 1, (Float) 
tuple.getField(index));
-                                       break;
-                               case DOUBLE:
-                                       upload.setDouble(index + 1, (Double) 
tuple.getField(index));
-                                       break;
-                       }
-               }
-       }
-
-       /**
-        * Executes prepared statement and closes all resources of this 
instance.
-        *
-        * @throws IOException Thrown, if the input could not be closed 
properly.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       upload.executeBatch();
-                       batchCount = 0;
-               } catch (SQLException se) {
-                       throw new IllegalArgumentException("close() failed", 
se);
-               } catch (NullPointerException se) {
-               }
-               try {
-                       upload.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } catch (NullPointerException npe) {
-               }
-               try {
-                       dbConn.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } catch (NullPointerException npe) {
-               }
-       }
-
-       public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
-               return new JDBCOutputFormatBuilder();
-       }
-
-       public static class JDBCOutputFormatBuilder {
-               private final JDBCOutputFormat format;
-
-               protected JDBCOutputFormatBuilder() {
-                       this.format = new JDBCOutputFormat();
-               }
-
-               public JDBCOutputFormatBuilder setUsername(String username) {
-                       format.username = username;
-                       return this;
-               }
-
-               public JDBCOutputFormatBuilder setPassword(String password) {
-                       format.password = password;
-                       return this;
-               }
-
-               public JDBCOutputFormatBuilder setDrivername(String drivername) 
{
-                       format.drivername = drivername;
-                       return this;
-               }
-
-               public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
-                       format.dbURL = dbURL;
-                       return this;
-               }
-
-               public JDBCOutputFormatBuilder setQuery(String query) {
-                       format.query = query;
-                       return this;
-               }
-
-               public JDBCOutputFormatBuilder setBatchInterval(int 
batchInterval) {
-                       format.batchInterval = batchInterval;
-                       return this;
-               }
-
-               /**
-               Finalizes the configuration and checks validity.
-               @return Configured JDBCOutputFormat
-                */
-               public JDBCOutputFormat finish() {
-                       if (format.username == null) {
-                               LOG.info("Username was not supplied 
separately.");
-                       }
-                       if (format.password == null) {
-                               LOG.info("Password was not supplied 
separately.");
-                       }
-                       if (format.dbURL == null) {
-                               throw new IllegalArgumentException("No dababase 
URL supplied.");
-                       }
-                       if (format.query == null) {
-                               throw new IllegalArgumentException("No query 
suplied");
-                       }
-                       if (format.drivername == null) {
-                               throw new IllegalArgumentException("No driver 
supplied");
-                       }
-                       return format;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
 
b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 7b012ba..0000000
--- 
a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc.example;
-
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class JDBCExample {
-
-       public static void main(String[] args) throws Exception {
-               prepareTestDb();
-
-               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple5> source
-                               = 
environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
-                                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-                                               
.setDBUrl("jdbc:derby:memory:ebookshop")
-                                               .setQuery("select * from books")
-                                               .finish(),
-                                               new TupleTypeInfo(Tuple5.class, 
INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, 
INT_TYPE_INFO)
-                               );
-
-               source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-                               .setDBUrl("jdbc:derby:memory:ebookshop")
-                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
-                               .finish());
-               environment.execute();
-       }
-
-       private static void prepareTestDb() throws Exception {
-               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-               Connection conn = DriverManager.getConnection(dbURL);
-
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-               stat = conn.createStatement();
-               stat.executeUpdate(sqlQueryBuilder.toString());
-               stat.close();
-
-               sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, 
title, author, price, qty) VALUES ");
-               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
-               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
-               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
-               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
-               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
-
-               stat = conn.createStatement();
-               stat.execute(sqlQueryBuilder.toString());
-               stat.close();
-
-               conn.close();
-       }
-}

Reply via email to