http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java deleted file mode 100644 index 8579dee..0000000 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -public class HBaseFlinkTestConstants { - - public static final byte[] CF_SOME = "someCf".getBytes(); - public static final byte[] Q_SOME = "someQual".getBytes(); - public static final String TEST_TABLE_NAME = "test-table"; - public static final String TMP_DIR = "/tmp/test"; - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java deleted file mode 100644 index dccf876..0000000 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import org.apache.flink.addons.hbase.TableInputFormat; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Simple stub for HBase DataSet read - * - * To run the test first create the test table with hbase shell. - * - * Use the following commands: - * <ul> - * <li>create 'test-table', 'someCf'</li> - * <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li> - * <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li> - * </ul> - * - * The test should return just the first entry. - * - */ -public class HBaseReadExample { - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - @SuppressWarnings("serial") - DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() { - - @Override - public String getTableName() { - return HBaseFlinkTestConstants.TEST_TABLE_NAME; - } - - @Override - protected Scan getScanner() { - Scan scan = new Scan(); - scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME); - return scan; - } - - private Tuple2<String, String> reuse = new Tuple2<String, String>(); - - @Override - protected Tuple2<String, String> mapResultToTuple(Result r) { - String key = Bytes.toString(r.getRow()); - String val = Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME)); - reuse.setField(key, 0); - reuse.setField(val, 1); - return reuse; - } - }) - .filter(new FilterFunction<Tuple2<String,String>>() { - - @Override - public boolean filter(Tuple2<String, String> t) throws Exception { - String val = t.getField(1); - if(val.startsWith("someStr")) - return true; - return false; - } - }); - - hbaseDs.print(); - - // kick off execution. - env.execute(); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java deleted file mode 100644 index 483bdff..0000000 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; - -/** - * Simple stub for HBase DataSet write - * - * To run the test first create the test table with hbase shell. - * - * Use the following commands: - * <ul> - * <li>create 'test-table', 'someCf'</li> - * </ul> - * - */ -@SuppressWarnings("serial") -public class HBaseWriteExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet<String> text = getTextDataSet(env); - - DataSet<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - // emit result - Job job = Job.getInstance(); - job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); - // TODO is "mapred.output.dir" really useful? - job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR); - counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { - private transient Tuple2<Text, Mutation> reuse; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - reuse = new Tuple2<Text, Mutation>(); - } - - @Override - public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { - reuse.f0 = new Text(t.f0); - Put put = new Put(t.f0.getBytes()); - put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1)); - reuse.f1 = put; - return reuse; - } - }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); - - // execute program - env.execute("WordCount (HBase sink) Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined - * FlatMapFunction. The function takes a line (String) and splits it into - * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). - */ - public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { - - @Override - public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - private static boolean fileOutput = false; - private static String textPath; - private static String outputTableName = HBaseFlinkTestConstants.TEST_TABLE_NAME; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - textPath = args[0]; - outputTableName = args[1]; - } else { - System.err.println("Usage: HBaseWriteExample <text path> <output table>"); - return false; - } - } else { - System.out.println("Executing HBaseWriteExample example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: HBaseWriteExample <text path> <output table>"); - } - return true; - } - - private static DataSet<String> getTextDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return getDefaultTextLineDataSet(env); - } - } - private static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { - return env.fromElements(WORDS); - } - private static final String[] WORDS = new String[] { - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,", - "And by opposing end them?--To die,--to sleep,--", - "No more; and by a sleep to say we end", - "The heartache, and the thousand natural shocks", - "That flesh is heir to,--'tis a consummation", - "Devoutly to be wish'd. To die,--to sleep;--", - "To sleep! perchance to dream:--ay, there's the rub;", - "For in that sleep of death what dreams may come,", - "When we have shuffled off this mortal coil,", - "Must give us pause: there's the respect", - "That makes calamity of so long life;", - "For who would bear the whips and scorns of time,", - "The oppressor's wrong, the proud man's contumely,", - "The pangs of despis'd love, the law's delay,", - "The insolence of office, and the spurns", - "That patient merit of the unworthy takes,", - "When he himself might his quietus make", - "With a bare bodkin? who would these fardels bear,", - "To grunt and sweat under a weary life,", - "But that the dread of something after death,--", - "The undiscover'd country, from whose bourn", - "No traveller returns,--puzzles the will,", - "And makes us rather bear those ills we have", - "Than fly to others that we know not of?", - "Thus conscience does make cowards of us all;", - "And thus the native hue of resolution", - "Is sicklied o'er with the pale cast of thought;", - "And enterprises of great pith and moment,", - "With this regard, their currents turn awry,", - "And lose the name of action.--Soft you now!", - "The fair Ophelia!--Nymph, in thy orisons", - "Be all my sins remember'd." - }; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java deleted file mode 100644 index a6be1a6..0000000 --- a/flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase.example; - -import java.io.IOException; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * - * This is an example how to write streams into HBase. In this example the - * stream will be written into a local Hbase but it is possible to adapt this - * example for an HBase running in a cloud. You need a running local HBase with a - * table "flinkExample" and a column "entry". If your HBase configuration does - * not fit the hbase-site.xml in the resource folder then you gave to delete temporary this - * hbase-site.xml to execute the example properly. - * - */ -public class HBaseWriteStreamExample { - - public static void main(String[] args) { - final StreamExecutionEnvironment env = StreamExecutionEnvironment - .getExecutionEnvironment(); - - // data stream with random numbers - DataStream<String> dataStream = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 1L; - - private volatile boolean isRunning = true; - - @Override - public void run(SourceContext<String> out) throws Exception { - while (isRunning) { - out.collect(String.valueOf(Math.floor(Math.random() * 100))); - } - - } - - @Override - public void cancel() { - isRunning = false; - } - }); - dataStream.write(new HBaseOutputFormat(), 0L); - - try { - env.execute(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - /** - * - * This class implements an OutputFormat for HBase - * - */ - private static class HBaseOutputFormat implements OutputFormat<String> { - - private org.apache.hadoop.conf.Configuration conf = null; - private HTable table = null; - private String taskNumber = null; - private int rowNumber = 0; - - private static final long serialVersionUID = 1L; - - @Override - public void configure(Configuration parameters) { - conf = HBaseConfiguration.create(); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - table = new HTable(conf, "flinkExample"); - this.taskNumber = String.valueOf(taskNumber); - } - - @Override - public void writeRecord(String record) throws IOException { - Put put = new Put(Bytes.toBytes(taskNumber + rowNumber)); - put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"), - Bytes.toBytes(rowNumber)); - rowNumber++; - table.put(put); - } - - @Override - public void close() throws IOException { - table.flushCommits(); - table.close(); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml b/flink-staging/flink-hbase/src/test/resources/hbase-site.xml deleted file mode 100644 index 2984063..0000000 --- a/flink-staging/flink-hbase/src/test/resources/hbase-site.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> -<!-- -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ ---> -<configuration> - - <property> - <name>hbase.tmp.dir</name> - <!-- - <value>/media/Dati/hbase-0.98-data</value> - --> - <value>/opt/hbase-0.98.6.1-hadoop2/data</value> - - </property> - <property> - <name>hbase.zookeeper.quorum</name> - <value>localhost</value> - </property> - <!-- - <property> - <name>hadoop.security.group.mapping</name> - <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value> - </property> - --> -</configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hbase/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hbase/src/test/resources/log4j.properties b/flink-staging/flink-hbase/src/test/resources/log4j.properties deleted file mode 100644 index d6eb2b2..0000000 --- a/flink-staging/flink-hbase/src/test/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -log4j.rootLogger=${hadoop.root.logger} -hadoop.root.logger=INFO,console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/pom.xml b/flink-staging/flink-hcatalog/pom.xml deleted file mode 100644 index 00ce577..0000000 --- a/flink-staging/flink-hcatalog/pom.xml +++ /dev/null @@ -1,179 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-hcatalog</artifactId> - <name>flink-hcatalog</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hcatalog-core</artifactId> - <version>0.12.0</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java deleted file mode 100644 index 859b706..0000000 --- a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog; - -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; -import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.common.HCatUtil; -import org.apache.hive.hcatalog.data.DefaultHCatRecord; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple. - * - * Note: Flink tuples might only support a limited number of fields (depending on the API). - * - * @param <T> - */ -public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryable<T> { - - private static final long serialVersionUID = 1L; - - private Configuration configuration; - - private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat; - private RecordReader<WritableComparable, HCatRecord> recordReader; - private boolean fetched = false; - private boolean hasNext; - - protected String[] fieldNames = new String[0]; - protected HCatSchema outputSchema; - - private TypeInformation<T> resultType; - - public HCatInputFormatBase() { } - - /** - * Creates a HCatInputFormat for the given database and table. - * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink-native tuples by calling - * {@link HCatInputFormatBase#asFlinkTuples()}. - * - * @param database The name of the database to read from. - * @param table The name of the table to read. - * @throws java.io.IOException - */ - public HCatInputFormatBase(String database, String table) throws IOException { - this(database, table, new Configuration()); - } - - /** - * Creates a HCatInputFormat for the given database, table, and - * {@link org.apache.hadoop.conf.Configuration}. - * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink-native tuples by calling - * {@link HCatInputFormatBase#asFlinkTuples()}. - * - * @param database The name of the database to read from. - * @param table The name of the table to read. - * @param config The Configuration for the InputFormat. - * @throws java.io.IOException - */ - public HCatInputFormatBase(String database, String table, Configuration config) throws IOException { - super(); - this.configuration = config; - HadoopUtils.mergeHadoopConf(this.configuration); - - this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table); - this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration); - - // configure output schema of HCatFormat - configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); - // set type information - this.resultType = new WritableTypeInfo(DefaultHCatRecord.class); - } - - /** - * Specifies the fields which are returned by the InputFormat and their order. - * - * @param fields The fields and their order which are returned by the InputFormat. - * @return This InputFormat with specified return fields. - * @throws java.io.IOException - */ - public HCatInputFormatBase<T> getFields(String... fields) throws IOException { - - // build output schema - ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length); - for(String field : fields) { - fieldSchemas.add(this.outputSchema.get(field)); - } - this.outputSchema = new HCatSchema(fieldSchemas); - - // update output schema configuration - configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); - - return this; - } - - /** - * Specifies a SQL-like filter condition on the table's partition columns. - * Filter conditions on non-partition columns are invalid. - * A partition filter can significantly reduce the amount of data to be read. - * - * @param filter A SQL-like filter condition on the table's partition columns. - * @return This InputFormat with specified partition filter. - * @throws java.io.IOException - */ - public HCatInputFormatBase<T> withFilter(String filter) throws IOException { - - // set filter - this.hCatInputFormat.setFilter(filter); - - return this; - } - - /** - * Specifies that the InputFormat returns Flink tuples instead of - * {@link org.apache.hive.hcatalog.data.HCatRecord}. - * - * Note: Flink tuples might only support a limited number of fields (depending on the API). - * - * @return This InputFormat. - * @throws org.apache.hive.hcatalog.common.HCatException - */ - public HCatInputFormatBase<T> asFlinkTuples() throws HCatException { - - // build type information - int numFields = outputSchema.getFields().size(); - if(numFields > this.getMaxFlinkTupleSize()) { - throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+ - " fields can be returned as Flink tuples."); - } - - TypeInformation[] fieldTypes = new TypeInformation[numFields]; - fieldNames = new String[numFields]; - for (String fieldName : outputSchema.getFieldNames()) { - HCatFieldSchema field = outputSchema.get(fieldName); - - int fieldPos = outputSchema.getPosition(fieldName); - TypeInformation fieldType = getFieldType(field); - - fieldTypes[fieldPos] = fieldType; - fieldNames[fieldPos] = fieldName; - - } - this.resultType = new TupleTypeInfo(fieldTypes); - - return this; - } - - protected abstract int getMaxFlinkTupleSize(); - - private TypeInformation getFieldType(HCatFieldSchema fieldSchema) { - - switch(fieldSchema.getType()) { - case INT: - return BasicTypeInfo.INT_TYPE_INFO; - case TINYINT: - return BasicTypeInfo.BYTE_TYPE_INFO; - case SMALLINT: - return BasicTypeInfo.SHORT_TYPE_INFO; - case BIGINT: - return BasicTypeInfo.LONG_TYPE_INFO; - case BOOLEAN: - return BasicTypeInfo.BOOLEAN_TYPE_INFO; - case FLOAT: - return BasicTypeInfo.FLOAT_TYPE_INFO; - case DOUBLE: - return BasicTypeInfo.DOUBLE_TYPE_INFO; - case STRING: - return BasicTypeInfo.STRING_TYPE_INFO; - case BINARY: - return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; - case ARRAY: - return new GenericTypeInfo(List.class); - case MAP: - return new GenericTypeInfo(Map.class); - case STRUCT: - return new GenericTypeInfo(List.class); - default: - throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered."); - } - } - - /** - * Returns the {@link org.apache.hadoop.conf.Configuration} of the HCatInputFormat. - * - * @return The Configuration of the HCatInputFormat. - */ - public Configuration getConfiguration() { - return this.configuration; - } - - /** - * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} of the {@link org.apache.hive.hcatalog.data.HCatRecord} - * returned by this InputFormat. - * - * @return The HCatSchema of the HCatRecords returned by this InputFormat. - */ - public HCatSchema getOutputSchema() { - return this.outputSchema; - } - - // -------------------------------------------------------------------------------------------- - // InputFormat - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(org.apache.flink.configuration.Configuration parameters) { - // nothing to do - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - // no statistics provided at the moment - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); - - JobContext jobContext = null; - try { - jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); - } catch (Exception e) { - throw new RuntimeException(e); - } - - List<InputSplit> splits; - try { - splits = this.hCatInputFormat.getSplits(jobContext); - } catch (InterruptedException e) { - throw new IOException("Could not get Splits.", e); - } - HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - - for(int i = 0; i < hadoopInputSplits.length; i++){ - hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); - } - return hadoopInputSplits; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new LocatableInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - TaskAttemptContext context = null; - try { - context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); - } catch(Exception e) { - throw new RuntimeException(e); - } - - try { - this.recordReader = this.hCatInputFormat - .createRecordReader(split.getHadoopInputSplit(), context); - this.recordReader.initialize(split.getHadoopInputSplit(), context); - } catch (InterruptedException e) { - throw new IOException("Could not create RecordReader.", e); - } finally { - this.fetched = false; - } - } - - @Override - public boolean reachedEnd() throws IOException { - if(!this.fetched) { - fetchNext(); - } - return !this.hasNext; - } - - private void fetchNext() throws IOException { - try { - this.hasNext = this.recordReader.nextKeyValue(); - } catch (InterruptedException e) { - throw new IOException("Could not fetch next KeyValue pair.", e); - } finally { - this.fetched = true; - } - } - - @Override - public T nextRecord(T record) throws IOException { - if(!this.fetched) { - // first record - fetchNext(); - } - if(!this.hasNext) { - return null; - } - try { - - // get next HCatRecord - HCatRecord v = this.recordReader.getCurrentValue(); - this.fetched = false; - - if(this.fieldNames.length > 0) { - // return as Flink tuple - return this.buildFlinkTuple(record, v); - - } else { - // return as HCatRecord - return (T)v; - } - - } catch (InterruptedException e) { - throw new IOException("Could not get next record.", e); - } - } - - protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException; - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - // -------------------------------------------------------------------------------------------- - // Custom de/serialization methods - // -------------------------------------------------------------------------------------------- - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeInt(this.fieldNames.length); - for(String fieldName : this.fieldNames) { - out.writeUTF(fieldName); - } - this.configuration.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - this.fieldNames = new String[in.readInt()]; - for(int i=0; i<this.fieldNames.length; i++) { - this.fieldNames[i] = in.readUTF(); - } - - Configuration configuration = new Configuration(); - configuration.readFields(in); - - if(this.configuration == null) { - this.configuration = configuration; - } - - this.hCatInputFormat = new org.apache.hive.hcatalog.mapreduce.HCatInputFormat(); - this.outputSchema = (HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema")); - } - - // -------------------------------------------------------------------------------------------- - // Result type business - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<T> getProducedType() { - return this.resultType; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java deleted file mode 100644 index 46f3cd5..0000000 --- a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog.java; - - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.hcatalog.HCatInputFormatBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.data.HCatRecord; - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. - * Flink tuples support only up to 25 fields. - * - * @param <T> - */ -public class HCatInputFormat<T> extends HCatInputFormatBase<T> { - private static final long serialVersionUID = 1L; - - public HCatInputFormat() {} - - public HCatInputFormat(String database, String table) throws Exception { - super(database, table); - } - - public HCatInputFormat(String database, String table, Configuration config) throws Exception { - super(database, table, config); - } - - - @Override - protected int getMaxFlinkTupleSize() { - return 25; - } - - @Override - protected T buildFlinkTuple(T t, HCatRecord record) throws HCatException { - - Tuple tuple = (Tuple)t; - - // Extract all fields from HCatRecord - for(int i=0; i < this.fieldNames.length; i++) { - - // get field value - Object o = record.get(this.fieldNames[i], this.outputSchema); - - // Set field value in Flink tuple. - // Partition columns are returned as String and - // need to be converted to original type. - switch(this.outputSchema.get(i).getType()) { - case INT: - if(o instanceof String) { - tuple.setField(Integer.parseInt((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case TINYINT: - if(o instanceof String) { - tuple.setField(Byte.parseByte((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case SMALLINT: - if(o instanceof String) { - tuple.setField(Short.parseShort((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case BIGINT: - if(o instanceof String) { - tuple.setField(Long.parseLong((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case BOOLEAN: - if(o instanceof String) { - tuple.setField(Boolean.parseBoolean((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case FLOAT: - if(o instanceof String) { - tuple.setField(Float.parseFloat((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case DOUBLE: - if(o instanceof String) { - tuple.setField(Double.parseDouble((String) o), i); - } else { - tuple.setField(o, i); - } - break; - case STRING: - tuple.setField(o, i); - break; - case BINARY: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type BINARY."); - } else { - tuple.setField(o, i); - } - break; - case ARRAY: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type ARRAY."); - } else { - tuple.setField(o, i); - } - break; - case MAP: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type MAP."); - } else { - tuple.setField(o, i); - } - break; - case STRUCT: - if(o instanceof String) { - throw new RuntimeException("Cannot handle partition keys of type STRUCT."); - } else { - tuple.setField(o, i); - } - break; - default: - throw new RuntimeException("Invalid Type"); - } - } - - return (T)tuple; - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala deleted file mode 100644 index d5a3cbf..0000000 --- a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hcatalog.scala - -import org.apache.flink.configuration -import org.apache.flink.hcatalog.HCatInputFormatBase -import org.apache.hadoop.conf.Configuration -import org.apache.hive.hcatalog.data.HCatRecord -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema - -/** - * A InputFormat to read from HCatalog tables. - * The InputFormat supports projection (selection and order of fields) and partition filters. - * - * Data can be returned as {@link HCatRecord} or Scala tuples. - * Scala tuples support only up to 22 fields. - * - */ -class HCatInputFormat[T]( - database: String, - table: String, - config: Configuration - ) extends HCatInputFormatBase[T](database, table, config) { - - def this(database: String, table: String) { - this(database, table, new Configuration) - } - - var vals: Array[Any] = Array[Any]() - - override def configure(parameters: configuration.Configuration): Unit = { - super.configure(parameters) - vals = new Array[Any](fieldNames.length) - } - - override protected def getMaxFlinkTupleSize: Int = 22 - - override protected def buildFlinkTuple(t: T, record: HCatRecord): T = { - - // Extract all fields from HCatRecord - var i: Int = 0 - while (i < this.fieldNames.length) { - - val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema) - - // partition columns are returned as String - // Check and convert to actual type. - this.outputSchema.get(i).getType match { - case HCatFieldSchema.Type.INT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt - } - else { - vals(i) = o.asInstanceOf[Int] - } - case HCatFieldSchema.Type.TINYINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt.toByte - } - else { - vals(i) = o.asInstanceOf[Byte] - } - case HCatFieldSchema.Type.SMALLINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toInt.toShort - } - else { - vals(i) = o.asInstanceOf[Short] - } - case HCatFieldSchema.Type.BIGINT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toLong - } - else { - vals(i) = o.asInstanceOf[Long] - } - case HCatFieldSchema.Type.BOOLEAN => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toBoolean - } - else { - vals(i) = o.asInstanceOf[Boolean] - } - case HCatFieldSchema.Type.FLOAT => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toFloat - } - else { - vals(i) = o.asInstanceOf[Float] - } - case HCatFieldSchema.Type.DOUBLE => - if (o.isInstanceOf[String]) { - vals(i) = o.asInstanceOf[String].toDouble - } - else { - vals(i) = o.asInstanceOf[Double] - } - case HCatFieldSchema.Type.STRING => - vals(i) = o - case HCatFieldSchema.Type.BINARY => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type BINARY.") - } - else { - vals(i) = o.asInstanceOf[Array[Byte]] - } - case HCatFieldSchema.Type.ARRAY => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type ARRAY.") - } - else { - vals(i) = o.asInstanceOf[List[Object]] - } - case HCatFieldSchema.Type.MAP => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type MAP.") - } - else { - vals(i) = o.asInstanceOf[Map[Object, Object]] - } - case HCatFieldSchema.Type.STRUCT => - if (o.isInstanceOf[String]) { - throw new RuntimeException("Cannot handle partition keys of type STRUCT.") - } - else { - vals(i) = o.asInstanceOf[List[Object]] - } - case _ => - throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType + - " encountered.") - } - - i += 1 - } - createScalaTuple(vals) - } - - private def createScalaTuple(vals: Array[Any]): T = { - - this.fieldNames.length match { - case 1 => - new Tuple1(vals(0)).asInstanceOf[T] - case 2 => - new Tuple2(vals(0), vals(1)).asInstanceOf[T] - case 3 => - new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T] - case 4 => - new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T] - case 5 => - new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T] - case 6 => - new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5)).asInstanceOf[T] - case 7 => - new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6)).asInstanceOf[T] - case 8 => - new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7)) - .asInstanceOf[T] - case 9 => - new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8)).asInstanceOf[T] - case 10 => - new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9)).asInstanceOf[T] - case 11 => - new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10)).asInstanceOf[T] - case 12 => - new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T] - case 13 => - new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T] - case 14 => - new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13)).asInstanceOf[T] - case 15 => - new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14)).asInstanceOf[T] - case 16 => - new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15)) - .asInstanceOf[T] - case 17 => - new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16)).asInstanceOf[T] - case 18 => - new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17)).asInstanceOf[T] - case 19 => - new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18)).asInstanceOf[T] - case 20 => - new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T] - case 21 => - new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T] - case 22 => - new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), vals(6), vals(7), - vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), vals(15), - vals(16), vals(17), vals(18), vals(19), vals(20), vals(21)).asInstanceOf[T] - case _ => - throw new RuntimeException("Only up to 22 fields supported for Scala Tuples.") - - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/pom.xml b/flink-staging/flink-jdbc/pom.xml deleted file mode 100644 index 79e26be..0000000 --- a/flink-staging/flink-jdbc/pom.xml +++ /dev/null @@ -1,52 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-jdbc</artifactId> - <name>flink-jdbc</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.10.1.1</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java deleted file mode 100644 index eb3ac31..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.flink.api.common.io.NonParallelInput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.types.NullValue; - -/** - * InputFormat to read data from a database and generate tuples. - * The InputFormat has to be configured using the supplied InputFormatBuilder. - * - * @param <OUT> - * @see Tuple - * @see DriverManager - */ -public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); - - private String username; - private String password; - private String drivername; - private String dbURL; - private String query; - - private transient Connection dbConn; - private transient Statement statement; - private transient ResultSet resultSet; - - private int[] columnTypes = null; - - public JDBCInputFormat() { - } - - @Override - public void configure(Configuration parameters) { - } - - /** - * Connects to the source database and executes the query. - * - * @param ignored - * @throws IOException - */ - @Override - public void open(InputSplit ignored) throws IOException { - try { - establishConnection(); - statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); - resultSet = statement.executeQuery(query); - } catch (SQLException se) { - close(); - throw new IllegalArgumentException("open() failed." + se.getMessage(), se); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); - } - } - - private void establishConnection() throws SQLException, ClassNotFoundException { - Class.forName(drivername); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - } - - /** - * Closes all resources used. - * - * @throws IOException Indicates that a resource could not be closed. - */ - @Override - public void close() throws IOException { - try { - resultSet.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - try { - statement.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - try { - dbConn.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - } - - /** - * Checks whether all data has been read. - * - * @return boolean value indication whether all data has been read. - * @throws IOException - */ - @Override - public boolean reachedEnd() throws IOException { - try { - if (resultSet.isLast()) { - close(); - return true; - } - return false; - } catch (SQLException se) { - throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se); - } - } - - /** - * Stores the next resultSet row in a tuple - * - * @param tuple - * @return tuple containing next row - * @throws java.io.IOException - */ - @Override - public OUT nextRecord(OUT tuple) throws IOException { - try { - resultSet.next(); - if (columnTypes == null) { - extractTypes(tuple); - } - addValue(tuple); - return tuple; - } catch (SQLException se) { - close(); - throw new IOException("Couldn't read data - " + se.getMessage(), se); - } catch (NullPointerException npe) { - close(); - throw new IOException("Couldn't access resultSet", npe); - } - } - - private void extractTypes(OUT tuple) throws SQLException, IOException { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - columnTypes = new int[resultSetMetaData.getColumnCount()]; - if (tuple.getArity() != columnTypes.length) { - close(); - throw new IOException("Tuple size does not match columncount"); - } - for (int pos = 0; pos < columnTypes.length; pos++) { - columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1); - } - } - - /** - * Enters data value from the current resultSet into a Record. - * - * @param reuse Target Record. - */ - private void addValue(OUT reuse) throws SQLException { - for (int pos = 0; pos < columnTypes.length; pos++) { - switch (columnTypes[pos]) { - case java.sql.Types.NULL: - reuse.setField(NullValue.getInstance(), pos); - break; - case java.sql.Types.BOOLEAN: - reuse.setField(resultSet.getBoolean(pos + 1), pos); - break; - case java.sql.Types.BIT: - reuse.setField(resultSet.getBoolean(pos + 1), pos); - break; - case java.sql.Types.CHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.NCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.VARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.LONGVARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.LONGNVARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); - break; - case java.sql.Types.TINYINT: - reuse.setField(resultSet.getShort(pos + 1), pos); - break; - case java.sql.Types.SMALLINT: - reuse.setField(resultSet.getShort(pos + 1), pos); - break; - case java.sql.Types.BIGINT: - reuse.setField(resultSet.getLong(pos + 1), pos); - break; - case java.sql.Types.INTEGER: - reuse.setField(resultSet.getInt(pos + 1), pos); - break; - case java.sql.Types.FLOAT: - reuse.setField(resultSet.getDouble(pos + 1), pos); - break; - case java.sql.Types.REAL: - reuse.setField(resultSet.getFloat(pos + 1), pos); - break; - case java.sql.Types.DOUBLE: - reuse.setField(resultSet.getDouble(pos + 1), pos); - break; - case java.sql.Types.DECIMAL: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); - break; - case java.sql.Types.NUMERIC: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); - break; - case java.sql.Types.DATE: - reuse.setField(resultSet.getDate(pos + 1).toString(), pos); - break; - case java.sql.Types.TIME: - reuse.setField(resultSet.getTime(pos + 1).getTime(), pos); - break; - case java.sql.Types.TIMESTAMP: - reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); - break; - case java.sql.Types.SQLXML: - reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos); - break; - default: - throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]"); - - // case java.sql.Types.BINARY: - // case java.sql.Types.VARBINARY: - // case java.sql.Types.LONGVARBINARY: - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUCT: - } - } - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - GenericInputSplit[] split = { - new GenericInputSplit(0, 1) - }; - return split; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - * @return builder - */ - public static JDBCInputFormatBuilder buildJDBCInputFormat() { - return new JDBCInputFormatBuilder(); - } - - public static class JDBCInputFormatBuilder { - private final JDBCInputFormat format; - - public JDBCInputFormatBuilder() { - this.format = new JDBCInputFormat(); - } - - public JDBCInputFormatBuilder setUsername(String username) { - format.username = username; - return this; - } - - public JDBCInputFormatBuilder setPassword(String password) { - format.password = password; - return this; - } - - public JDBCInputFormatBuilder setDrivername(String drivername) { - format.drivername = drivername; - return this; - } - - public JDBCInputFormatBuilder setDBUrl(String dbURL) { - format.dbURL = dbURL; - return this; - } - - public JDBCInputFormatBuilder setQuery(String query) { - format.query = query; - return this; - } - - public JDBCInputFormat finish() { - if (format.username == null) { - LOG.info("Username was not supplied separately."); - } - if (format.password == null) { - LOG.info("Password was not supplied separately."); - } - if (format.dbURL == null) { - throw new IllegalArgumentException("No dababase URL supplied."); - } - if (format.query == null) { - throw new IllegalArgumentException("No query suplied"); - } - if (format.drivername == null) { - throw new IllegalArgumentException("No driver supplied"); - } - return format; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java deleted file mode 100644 index 614c5b7..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.RichOutputFormat; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; - -/** - * OutputFormat to write tuples into a database. - * The OutputFormat has to be configured using the supplied OutputFormatBuilder. - * - * @param <OUT> - * @see Tuple - * @see DriverManager - */ -public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> { - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); - - private String username; - private String password; - private String drivername; - private String dbURL; - private String query; - private int batchInterval = 5000; - - private Connection dbConn; - private PreparedStatement upload; - - private SupportedTypes[] types = null; - - private int batchCount = 0; - - public JDBCOutputFormat() { - } - - @Override - public void configure(Configuration parameters) { - } - - /** - * Connects to the target database and initializes the prepared statement. - * - * @param taskNumber The number of the parallel instance. - * @throws IOException Thrown, if the output could not be opened due to an - * I/O problem. - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - establishConnection(); - upload = dbConn.prepareStatement(query); - } catch (SQLException sqe) { - close(); - throw new IllegalArgumentException("open() failed:\t!", sqe); - } catch (ClassNotFoundException cnfe) { - close(); - throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe); - } - } - - private void establishConnection() throws SQLException, ClassNotFoundException { - Class.forName(drivername); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - } - - private enum SupportedTypes { - BOOLEAN, - BYTE, - SHORT, - INTEGER, - LONG, - STRING, - FLOAT, - DOUBLE - } - - /** - * Adds a record to the prepared statement. - * <p> - * When this method is called, the output format is guaranteed to be opened. - * - * @param tuple The records to add to the output. - * @throws IOException Thrown, if the records could not be added due to an I/O problem. - */ - @Override - public void writeRecord(OUT tuple) throws IOException { - try { - if (types == null) { - extractTypes(tuple); - } - addValues(tuple); - upload.addBatch(); - batchCount++; - if (batchCount >= batchInterval) { - upload.executeBatch(); - batchCount = 0; - } - } catch (SQLException sqe) { - close(); - throw new IllegalArgumentException("writeRecord() failed", sqe); - } catch (IllegalArgumentException iae) { - close(); - throw new IllegalArgumentException("writeRecord() failed", iae); - } - } - - private void extractTypes(OUT tuple) { - types = new SupportedTypes[tuple.getArity()]; - for (int x = 0; x < tuple.getArity(); x++) { - types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase()); - } - } - - private void addValues(OUT tuple) throws SQLException { - for (int index = 0; index < tuple.getArity(); index++) { - switch (types[index]) { - case BOOLEAN: - upload.setBoolean(index + 1, (Boolean) tuple.getField(index)); - break; - case BYTE: - upload.setByte(index + 1, (Byte) tuple.getField(index)); - break; - case SHORT: - upload.setShort(index + 1, (Short) tuple.getField(index)); - break; - case INTEGER: - upload.setInt(index + 1, (Integer) tuple.getField(index)); - break; - case LONG: - upload.setLong(index + 1, (Long) tuple.getField(index)); - break; - case STRING: - upload.setString(index + 1, (String) tuple.getField(index)); - break; - case FLOAT: - upload.setFloat(index + 1, (Float) tuple.getField(index)); - break; - case DOUBLE: - upload.setDouble(index + 1, (Double) tuple.getField(index)); - break; - } - } - } - - /** - * Executes prepared statement and closes all resources of this instance. - * - * @throws IOException Thrown, if the input could not be closed properly. - */ - @Override - public void close() throws IOException { - try { - upload.executeBatch(); - batchCount = 0; - } catch (SQLException se) { - throw new IllegalArgumentException("close() failed", se); - } catch (NullPointerException se) { - } - try { - upload.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - try { - dbConn.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { - } - } - - public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { - return new JDBCOutputFormatBuilder(); - } - - public static class JDBCOutputFormatBuilder { - private final JDBCOutputFormat format; - - protected JDBCOutputFormatBuilder() { - this.format = new JDBCOutputFormat(); - } - - public JDBCOutputFormatBuilder setUsername(String username) { - format.username = username; - return this; - } - - public JDBCOutputFormatBuilder setPassword(String password) { - format.password = password; - return this; - } - - public JDBCOutputFormatBuilder setDrivername(String drivername) { - format.drivername = drivername; - return this; - } - - public JDBCOutputFormatBuilder setDBUrl(String dbURL) { - format.dbURL = dbURL; - return this; - } - - public JDBCOutputFormatBuilder setQuery(String query) { - format.query = query; - return this; - } - - public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { - format.batchInterval = batchInterval; - return this; - } - - /** - Finalizes the configuration and checks validity. - @return Configured JDBCOutputFormat - */ - public JDBCOutputFormat finish() { - if (format.username == null) { - LOG.info("Username was not supplied separately."); - } - if (format.password == null) { - LOG.info("Password was not supplied separately."); - } - if (format.dbURL == null) { - throw new IllegalArgumentException("No dababase URL supplied."); - } - if (format.query == null) { - throw new IllegalArgumentException("No query suplied"); - } - if (format.drivername == null) { - throw new IllegalArgumentException("No driver supplied"); - } - return format; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java deleted file mode 100644 index 7b012ba..0000000 --- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc.example; - -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; -import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; - -public class JDBCExample { - - public static void main(String[] args) throws Exception { - prepareTestDb(); - - ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5> source - = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") - .finish(), - new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO) - ); - - source.output(JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") - .finish()); - environment.execute(); - } - - private static void prepareTestDb() throws Exception { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection conn = DriverManager.getConnection(dbURL); - - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - - conn.close(); - } -}