Updated Branches: refs/heads/master 43c4e83a2 -> 2576896c9
CRUNCH-92 : Read data from RBDMS - Create DatabaseSourec and IT test in crunch-contrib - Make users of DataBaseSource implement a DBWritable & Writable to handle their own serialization, to avoid invisible errors in serialization.[Update from Gabriel] Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2576896c Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2576896c Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2576896c Branch: refs/heads/master Commit: 2576896c91a0400af07e5fe32c1edd7adea29b48 Parents: 43c4e83 Author: Rahul Sharma <[email protected]> Authored: Tue Oct 16 09:32:33 2012 +0530 Committer: Rahul Sharma <[email protected]> Committed: Tue Oct 16 09:32:33 2012 +0530 ---------------------------------------------------------------------- crunch-contrib/pom.xml | 7 + .../crunch/contrib/io/jdbc/DataBaseSourceIT.java | 123 +++++++++++++++ crunch-contrib/src/it/resources/data.script | 22 +++ .../crunch/contrib/io/jdbc/DataBaseSource.java | 121 ++++++++++++++ .../crunch/contrib/io/jdbc/package-info.java | 23 +++ 5 files changed, 296 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-contrib/pom.xml b/crunch-contrib/pom.xml index c96f219..e98509e 100644 --- a/crunch-contrib/pom.xml +++ b/crunch-contrib/pom.xml @@ -53,6 +53,13 @@ under the License. <artifactId>hadoop-client</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.3.168</version> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java new file mode 100644 index 0000000..8fdb22d --- /dev/null +++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.io.jdbc; + +import static org.junit.Assert.assertEquals; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.contrib.io.jdbc.DataBaseSource; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.h2.tools.RunScript; +import org.h2.tools.Server; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class DataBaseSourceIT extends CrunchTestSupport implements Serializable { + private transient Server server; + + @Before + public void start() throws Exception { + File file = tempDir.copyResourceFile("data.script"); + server = Server.createTcpServer().start(); + RunScript.execute("jdbc:h2:tcp://localhost/~/test", "sa", "", file.getAbsolutePath(), "utf-8", false); + } + + @After + public void stop() throws Exception { + server.stop(); + } + + public static class IdentifiableName implements DBWritable, Writable { + + public IntWritable id = new IntWritable(); + public Text name = new Text(); + + @Override + public void readFields(DataInput in) throws IOException { + id.readFields(in); + name.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + id.write(out); + name.write(out); + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + id.set(resultSet.getInt(1)); + name.set(resultSet.getString(2)); + } + + @Override + public void write(PreparedStatement preparedStatement) throws SQLException { + throw new UnsupportedOperationException("Not implemented"); + } + + } + + @Test + public void testReadFromSource() throws Exception { + Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class); + DataBaseSource<IdentifiableName> dbsrc = new DataBaseSource.Builder<IdentifiableName>(IdentifiableName.class) + .setDriverClass(org.h2.Driver.class) + .setUrl("jdbc:h2:tcp://localhost/~/test").setUsername("sa").setPassword("") + .selectSQLQuery("SELECT ID, NAME FROM TEST").countSQLQuery("select count(*) from Test").build(); + + PCollection<IdentifiableName> cdidata = pipeline.read(dbsrc); + PCollection<String> names = cdidata.parallelDo(new DoFn<IdentifiableName, String>() { + + @Override + public void process(IdentifiableName input, Emitter<String> emitter) { + emitter.emit(input.name.toString()); + } + + }, Writables.strings()); + + List<String> nameList = Lists.newArrayList(names.materialize()); + pipeline.done(); + + assertEquals(2, nameList.size()); + assertEquals(Sets.newHashSet("Hello", "World"), Sets.newHashSet(nameList)); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/it/resources/data.script ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/it/resources/data.script b/crunch-contrib/src/it/resources/data.script new file mode 100644 index 0000000..16fc151 --- /dev/null +++ b/crunch-contrib/src/it/resources/data.script @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- +DROP TABLE IF EXISTS TEST; +CREATE TABLE TEST(ID INT PRIMARY KEY, NAME VARCHAR(255)); +INSERT INTO TEST VALUES(1, 'Hello'); +INSERT INTO TEST VALUES(2, 'World'); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java new file mode 100644 index 0000000..23ca685 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.contrib.io.jdbc; + +import java.io.IOException; +import java.sql.Driver; + +import org.apache.crunch.Source; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * Source from reading from a database via a JDBC connection. Underlying + * database reading is provided by {@link DBInputFormat}. + * <p> + * A type that is input via this class must be a Writable that also implements + * DBWritable. On the {@link DBWritable#readFields(java.sql.ResultSet)} method + * needs to be fully implemented form {@link DBWritable}. + * + * @param <T> The input type of this source + */ +public class DataBaseSource<T extends DBWritable & Writable> implements Source<T> { + + private Class<T> inputClass; + private PType<T> ptype; + private String driverClass; + private String url; + private String username; + private String password; + private String selectClause; + public String countClause; + + private DataBaseSource(Class<T> inputClass) { + this.inputClass = inputClass; + this.ptype = Writables.writables(inputClass); + } + + static class Builder<T extends DBWritable & Writable> { + + private DataBaseSource<T> dataBaseSource; + + public Builder(Class<T> inputClass) { + this.dataBaseSource = new DataBaseSource<T>(inputClass); + } + + Builder<T> setDriverClass(Class<? extends Driver> driverClass) { + dataBaseSource.driverClass = driverClass.getName(); + return this; + } + + Builder<T> setUrl(String url) { + dataBaseSource.url = url; + return this; + } + + Builder<T> setUsername(String username) { + dataBaseSource.username = username; + return this; + } + + Builder<T> setPassword(String password) { + dataBaseSource.password = password; + return this; + } + + Builder<T> selectSQLQuery(String selectClause) { + dataBaseSource.selectClause = selectClause; + return this; + } + + Builder<T> countSQLQuery(String countClause) { + dataBaseSource.countClause = countClause; + return this; + } + + DataBaseSource<T> build() { + return dataBaseSource; + } + } + + @Override + public void configureSource(Job job, int inputId) throws IOException { + Configuration configuration = job.getConfiguration(); + DBConfiguration.configureDB(configuration, driverClass, url, username, password); + job.setInputFormatClass(DBInputFormat.class); + DBInputFormat.setInput(job, inputClass, selectClause, countClause); + } + + @Override + public long getSize(Configuration configuration) { + // TODO Do something smarter here + return 1000 * 1000; + } + + @Override + public PType<T> getType() { + return ptype; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2576896c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java new file mode 100644 index 0000000..e6cefe9 --- /dev/null +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for reading data from RDBMS using JDBC + */ +package org.apache.crunch.contrib.io.jdbc; +
