Add missing files in the tajo-hcatalog driver
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/63c2fb6c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/63c2fb6c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/63c2fb6c Branch: refs/heads/branch-0.10.1 Commit: 63c2fb6c5ab9ca5712f7af0b6d3e0e015d7e4b7d Parents: 19554d8 Author: Jihoon Son <[email protected]> Authored: Fri May 15 10:47:02 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri May 15 10:47:02 2015 +0900 ---------------------------------------------------------------------- .../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 739 +++++++++++++++ .../tajo/catalog/store/HCatalogStore.java | 891 +++++++++++++++++++ .../catalog/store/HCatalogStoreClientPool.java | 170 ++++ .../apache/tajo/catalog/store/HCatalogUtil.java | 147 +++ .../tajo/catalog/store/TestHCatalogStore.java | 402 +++++++++ 5 files changed, 2349 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/63c2fb6c/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml new file mode 100644 index 0000000..fe8f34a --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml @@ -0,0 +1,739 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>tajo-hcatalog</artifactId> + <packaging>jar</packaging> + <name>Tajo Catalog Drivers HCatalog</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <excludes> + <exclude>derby.log</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <includeScope>runtime</includeScope> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-server</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + <version>0.9.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.9.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>hcatalog-0.12.0</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <hive.version>0.12.0</hive.version> + <parquet.version>1.5.0</parquet.version> + <parquet.format.version>2.1.0</parquet.format.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-contrib</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-hbase-handler</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-testutils</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shimss</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hcatalog-core</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hive-bundle</artifactId> + <version>${parquet.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>hcatalog-0.13.0</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <hive.version>0.13.0</hive.version> + <parquet.version>1.5.0</parquet.version> + <parquet.format.version>2.1.0</parquet.format.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-contrib</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-hbase-handler</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-testutils</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shimss</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hive-bundle</artifactId> + <version>${parquet.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>hcatalog-0.13.1</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <hive.version>0.13.1</hive.version> + <parquet.version>1.5.0</parquet.version> + <parquet.format.version>2.1.0</parquet.format.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-contrib</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-hbase-handler</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-testutils</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shimss</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hive-bundle</artifactId> + <version>${parquet.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>src</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <!-- builds source jars and attaches them to the project for publishing --> + <id>hadoop-java-sources</id> + <phase>package</phase> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </reporting> + +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/63c2fb6c/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java new file mode 100644 index 0000000..2c3fc6a --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -0,0 +1,891 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store; + +import com.google.common.collect.Lists; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.InternalException; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.*; + +import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; + +public class HCatalogStore extends CatalogConstants implements CatalogStore { + protected final Log LOG = LogFactory.getLog(getClass()); + + private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir"; + + protected Configuration conf; + private static final int CLIENT_POOL_SIZE = 2; + private final HCatalogStoreClientPool clientPool; + private final String defaultTableSpaceUri; + + public HCatalogStore(final Configuration conf) throws InternalException { + if (!(conf instanceof TajoConf)) { + throw new CatalogException("Invalid Configuration Type:" + conf.getClass().getSimpleName()); + } + this.conf = conf; + this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) conf).toString(); + this.clientPool = new HCatalogStoreClientPool(CLIENT_POOL_SIZE, conf); + } + + @Override + public boolean existTable(final String databaseName, final String tableName) throws CatalogException { + boolean exist = false; + org.apache.hadoop.hive.ql.metadata.Table table; + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + // get table + try { + client = clientPool.getClient(); + table = HCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + if (table != null) { + exist = true; + } + } catch (NoSuchObjectException nsoe) { + exist = false; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws CatalogException { + org.apache.hadoop.hive.ql.metadata.Table table = null; + HCatalogStoreClientPool.HCatalogStoreClient client = null; + Path path = null; + CatalogProtos.StoreType storeType = null; + org.apache.tajo.catalog.Schema schema = null; + KeyValueSet options = null; + TableStats stats = null; + PartitionMethodDesc partitions = null; + + ////////////////////////////////// + // set tajo table schema. + ////////////////////////////////// + try { + // get hive table schema + try { + client = clientPool.getClient(); + table = HCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + path = table.getPath(); + } catch (NoSuchObjectException nsoe) { + throw new CatalogException("Table not found. - tableName:" + tableName, nsoe); + } catch (Exception e) { + throw new CatalogException(e); + } + + // convert hcatalog field schema into tajo field schema. + schema = new org.apache.tajo.catalog.Schema(); + HCatSchema tableSchema = null; + + try { + tableSchema = HCatUtil.getTableSchemaWithPtnCols(table); + } catch (IOException ioe) { + throw new CatalogException("Fail to get table schema. - tableName:" + tableName, ioe); + } + List<HCatFieldSchema> fieldSchemaList = tableSchema.getFields(); + boolean isPartitionKey = false; + for (HCatFieldSchema eachField : fieldSchemaList) { + isPartitionKey = false; + + if (table.getPartitionKeys() != null) { + for (FieldSchema partitionKey : table.getPartitionKeys()) { + if (partitionKey.getName().equals(eachField.getName())) { + isPartitionKey = true; + } + } + } + + if (!isPartitionKey) { + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName(); + TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(eachField.getType().toString()); + schema.addColumn(fieldName, dataType); + } + } + + // validate field schema. + try { + HCatalogUtil.validateHCatTableAndTajoSchema(tableSchema); + } catch (Exception e) { + throw new CatalogException("HCatalog cannot support schema. - schema:" + tableSchema.toString(), e); + } + + stats = new TableStats(); + options = new KeyValueSet(); + options.putAll(table.getParameters()); + options.remove("EXTERNAL"); + + Properties properties = table.getMetadata(); + if (properties != null) { + // set field delimiter + String fieldDelimiter = "", nullFormat = ""; + if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) { + fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM); + } else { + // if hive table used default row format delimiter, Properties doesn't have it. + // So, Tajo must set as follows: + fieldDelimiter = "\u0001"; + } + + // set null format + if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) != null) { + nullFormat = properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT); + } else { + nullFormat = "\\N"; + } + options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT); + + // set file output format + String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT); + storeType = CatalogUtil.getStoreType(HCatalogUtil.getStoreType(fileOutputformat)); + + if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) { + options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat)); + } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) { + options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (ColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) { + options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinarySerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (LazySimpleSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } + + // set data size + long totalSize = 0; + if (properties.getProperty("totalSize") != null) { + totalSize = Long.parseLong(properties.getProperty("totalSize")); + } else { + try { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + totalSize = fs.getContentSummary(path).getLength(); + } + } catch (IOException ioe) { + throw new CatalogException("Fail to get path. - path:" + path.toString(), ioe); + } + } + stats.setNumBytes(totalSize); + } + + // set partition keys + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + + if (null != partitionKeys) { + org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); + StringBuilder sb = new StringBuilder(); + if (partitionKeys.size() > 0) { + for (int i = 0; i < partitionKeys.size(); i++) { + FieldSchema fieldSchema = partitionKeys.get(i); + TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(fieldSchema.getType().toString()); + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName(); + expressionSchema.addColumn(new Column(fieldName, dataType)); + if (i > 0) { + sb.append(","); + } + sb.append(fieldSchema.getName()); + } + partitions = new PartitionMethodDesc( + databaseName, + tableName, + PartitionType.COLUMN, + sb.toString(), + expressionSchema); + } + } + } finally { + if(client != null) client.release(); + } + TableMeta meta = new TableMeta(storeType, options); + TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri()); + if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) { + tableDesc.setExternal(true); + } + if (stats != null) { + tableDesc.setStats(stats); + } + if (partitions != null) { + tableDesc.setPartitionMethod(partitions); + } + return tableDesc.getProto(); + } + + + private TajoDataTypes.Type getDataType(final String typeStr) { + try { + return Enum.valueOf(TajoDataTypes.Type.class, typeStr); + } catch (IllegalArgumentException iae) { + LOG.error("Cannot find a matched type against from '" + typeStr + "'"); + return null; + } + } + + @Override + public final List<String> getAllTableNames(String databaseName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllTables(databaseName); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public void createTablespace(String spaceName, String spaceUri) throws CatalogException { + // SKIP + } + + @Override + public boolean existTablespace(String spaceName) throws CatalogException { + // SKIP + return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public void dropTablespace(String spaceName) throws CatalogException { + // SKIP + } + + @Override + public Collection<String> getAllTablespaceNames() throws CatalogException { + return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public TablespaceProto getTablespace(String spaceName) throws CatalogException { + if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) { + TablespaceProto.Builder builder = TablespaceProto.newBuilder(); + builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME); + builder.setUri(defaultTableSpaceUri); + return builder.build(); + } else { + throw new CatalogException("tablespace concept is not supported in HCatalogStore"); + } + } + + @Override + public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws + CatalogException { + // TODO - not implemented yet + } + + @Override + public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException { + throw new CatalogException("tablespace concept is not supported in HCatalogStore"); + } + + @Override + public void createDatabase(String databaseName, String tablespaceName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + Database database = new Database( + databaseName, + "", + defaultTableSpaceUri + "/" + databaseName, + new HashMap<String, String>()); + client = clientPool.getClient(); + client.getHiveClient().createDatabase(database); + } catch (AlreadyExistsException e) { + throw new AlreadyExistsDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public boolean existDatabase(String databaseName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + List<String> databaseNames = client.getHiveClient().getAllDatabases(); + return databaseNames.contains(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void dropDatabase(String databaseName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new NoSuchDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(databaseName); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public Collection<String> getAllDatabaseNames() throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllDatabases(); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public final void createTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + TableDesc tableDesc = new TableDesc(tableDescProto); + String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String tableName = splitted[1]; + + try { + client = clientPool.getClient(); + + org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + table.setParameters(new HashMap<String, String>(tableDesc.getMeta().getOptions().getAllKeyValus())); + // TODO: set owner + //table.setOwner(); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setParameters(new HashMap<String, String>()); + sd.getSerdeInfo().setName(table.getTableName()); + + // if tajo set location method, thrift client make exception as follows: + // Caused by: MetaException(message:java.lang.NullPointerException) + // If you want to modify table path, you have to modify on Hive cli. + if (tableDesc.isExternal()) { + table.setTableType(TableType.EXTERNAL_TABLE.name()); + table.putToParameters("EXTERNAL", "TRUE"); + + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + if (fs.isFile(tablePath)) { + LOG.warn("A table path is a file, but HCatalog does not allow a file path."); + sd.setLocation(tablePath.getParent().toString()); + } else { + sd.setLocation(tablePath.toString()); + } + } + + // set column information + List<Column> columns = tableDesc.getSchema().getColumns(); + ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(columns.size()); + + for (Column eachField : columns) { + cols.add(new FieldSchema(eachField.getSimpleName(), + HCatalogUtil.getHiveFieldType(eachField.getDataType()), "")); + } + sd.setCols(cols); + + // set partition keys + if (tableDesc.hasPartition() && tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) { + List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>(); + for (Column eachPartitionKey : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(), + HCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), "")); + } + table.setPartitionKeys(partitionKeys); + } + + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + } else { + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName()); + } + + if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL))); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV) + || tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.TEXT_DELIMITER); + + if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL))); + table.getParameters().remove(StorageConstants.TEXT_NULL); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName()); + + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER); + } else { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName()); + } + + if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL))); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL); + } + } else { + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) { + sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName()); + sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName()); + sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName()); + } else { + throw new CatalogException(new NotImplementedException(tableDesc.getMeta().getStoreType + ().name())); + } + } + + sd.setSortCols(new ArrayList<Order>()); + + table.setSd(sd); + client.getHiveClient().createTable(table); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public final void dropTable(String databaseName, final String tableName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropTable(databaseName, tableName, false, false); + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + @Override + public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException { + final String[] split = CatalogUtil.splitFQTableName(alterTableDescProto.getTableName()); + + if (split.length == 1) { + throw new IllegalArgumentException("alterTable() requires a qualified table name, but it is \"" + + alterTableDescProto.getTableName() + "\"."); + } + + final String databaseName = split[0]; + final String tableName = split[1]; + + + switch (alterTableDescProto.getAlterTableType()) { + case RENAME_TABLE: + if (existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) { + throw new AlreadyExistsTableException(alterTableDescProto.getNewTableName()); + } + renameTable(databaseName, tableName, alterTableDescProto.getNewTableName().toLowerCase()); + break; + case RENAME_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAlterColumnName().getNewColumnName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName()); + } + renameColumn(databaseName, tableName, alterTableDescProto.getAlterColumnName()); + break; + case ADD_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAddColumn().getName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName()); + } + addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn()); + break; + default: + //TODO + } + } + + + private void renameTable(String databaseName, String tableName, String newTableName) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + client = clientPool.getClient(); + Table newTable = client.getHiveClient().getTable(databaseName, tableName); + newTable.setTableName(newTableName); + client.getHiveClient().alter_table(databaseName, tableName, newTable); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void renameColumn(String databaseName, String tableName, CatalogProtos.AlterColumnProto alterColumnProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) { + currentColumn.setName(alterColumnProto.getNewColumnName()); + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + private void addNewColumn(String databaseName, String tableName, CatalogProtos.ColumnProto columnProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + columns.add(new FieldSchema(columnProto.getName(), + HCatalogUtil.getHiveFieldType(columnProto.getDataType()), "")); + client.getHiveClient().alter_table(databaseName, tableName, table); + + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName) + throws CatalogException { + return null; // TODO - not implemented yet + } + + @Override + public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException { + return false; // TODO - not implemented yet + } + + @Override + public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException { + + } + + @Override + public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { + return null; // TODO - not implemented yet + } + + @Override + public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { + return null; // TODO - not implemented yet + } + + @Override + public void delPartition(String partitionName) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public void dropPartitions(String tableName) throws CatalogException { + + } + + + @Override + public final void addFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final void deleteFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final void existFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final List<String> getAllFunctionNames() throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public void dropIndex(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public boolean existIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + return false; + } + + @Override + public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName) + throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public boolean existIndexByColumn(String databaseName, String tableName, String columnName) throws CatalogException { + // TODO - not implemented yet + return false; + } + + @Override + public final void close() { + clientPool.close(); + } + + private boolean existColumn(final String databaseName ,final String tableName , final String columnName) throws CatalogException { + boolean exist = false; + HCatalogStoreClientPool.HCatalogStoreClient client = null; + + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List<FieldSchema> columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(columnName)) { + exist = true; + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public List<ColumnProto> getAllColumns() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<DatabaseProto> getAllDatabases() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<IndexProto> getAllIndexes() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TablePartitionProto> getAllPartitions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableOptionProto> getAllTableOptions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableStatsProto> getAllTableStats() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TableDescriptorProto> getAllTables() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TablespaceProto> getTablespaces() throws CatalogException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/63c2fb6c/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java new file mode 100644 index 0000000..8ccb100 --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java @@ -0,0 +1,170 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.apache.tajo.catalog.store; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Manages a pool of HiveMetaStoreClient connections. If the connection pool is empty + * a new client is created and added to the pool. There is no size limit. + */ +public class HCatalogStoreClientPool { + private static final Logger LOG = Logger.getLogger(HCatalogStoreClientPool.class); + private final ConcurrentLinkedQueue<HCatalogStoreClient> clientPool = + new ConcurrentLinkedQueue<HCatalogStoreClient>(); + private AtomicBoolean poolClosed = new AtomicBoolean(false); + private HiveConf hiveConf; + + /** + * A wrapper around the HiveMetaStoreClient that manages interactions with the + * connection pool. + */ + public class HCatalogStoreClient { + private final IMetaStoreClient hiveClient; + public AtomicBoolean isInUse = new AtomicBoolean(false); + + private HCatalogStoreClient(HiveConf hiveConf) { + try { + HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook(Table table) throws MetaException { + /* metadata hook implementation, or null if this + * storage handler does not need any metadata notifications + */ + return null; + } + }; + + this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName()); + clientPool.add(this); + LOG.info("MetaStoreClient created (size = " + clientPool.size() + ")"); + } catch (Exception e) { + // Turn in to an unchecked exception + throw new IllegalStateException(e); + } + } + + /** + * Returns the internal HiveMetaStoreClient object. + */ + public IMetaStoreClient getHiveClient() { + return hiveClient; + } + + /** + * Returns this client back to the connection pool. If the connection pool has been + * closed, just close the Hive client connection. + */ + public synchronized void release() { + if(!this.isInUse.getAndSet(false)){ + return; + } + // Ensure the connection isn't returned to the pool if the pool has been closed. + // This lock is needed to ensure proper behavior when a thread reads poolClosed + // is false, but a call to pool.close() comes in immediately afterward. + if (poolClosed.get()) { + this.getHiveClient().close(); + } else { + clientPool.add(this); + } + } + + // Marks this client as in use + private void markInUse() { + isInUse.set(true); + } + } + + public HCatalogStoreClientPool(int initialSize) { + this(initialSize, new HiveConf(HCatalogStoreClientPool.class)); + } + + public HCatalogStoreClientPool(int initialSize, HiveConf hiveConf) { + this.hiveConf = hiveConf; + addClients(initialSize); + } + + public HCatalogStoreClientPool(int initialSize, Configuration conf) { + this.hiveConf = new HiveConf(); + setParameters(conf); + addClients(initialSize); + } + + public void setParameters(Configuration conf) { + for( Iterator<Entry<String, String>> iter = conf.iterator(); iter.hasNext();) { + Map.Entry<String, String> entry = iter.next(); + this.hiveConf.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Add numClients to the client pool. + */ + public void addClients(int numClients) { + for (int i = 0; i < numClients; ++i) { + clientPool.add(new HCatalogStoreClient(hiveConf)); + } + } + + /** + * Gets a client from the pool. If the pool is empty a new client is created. + */ + public synchronized HCatalogStoreClient getClient() { + // The MetaStoreClient c'tor relies on knowing the Hadoop version by asking + // org.apache.hadoop.util.VersionInfo. The VersionInfo class relies on opening + // the 'common-version-info.properties' file as a resource from hadoop-common*.jar + // using the Thread's context classloader. If necessary, set the Thread's context + // classloader, otherwise VersionInfo will fail in it's c'tor. + if (Thread.currentThread().getContextClassLoader() == null) { + Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); + } + + HCatalogStoreClient client = clientPool.poll(); + // The pool was empty so create a new client and return that. + if (client == null) { + client = new HCatalogStoreClient(hiveConf); + } + client.markInUse(); + + return client; + } + + /** + * Removes all items from the connection pool and closes all Hive Meta Store client + * connections. Can be called multiple times. + */ + public void close() { + // Ensure no more items get added to the pool once close is called. + if (poolClosed.getAndSet(true)) { + return; + } + + HCatalogStoreClient client = null; + while ((client = clientPool.poll()) != null) { + client.getHiveClient().close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/63c2fb6c/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java new file mode 100644 index 0000000..8e8e58c --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.catalog.store; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.tajo.catalog.exception.CatalogException; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.thrift.TException; +import parquet.hadoop.mapred.DeprecatedParquetOutputFormat; + +public class HCatalogUtil { + protected final Log LOG = LogFactory.getLog(getClass()); + + public static void validateHCatTableAndTajoSchema(HCatSchema tblSchema) throws CatalogException { + for (HCatFieldSchema hcatField : tblSchema.getFields()) { + validateHCatFieldAndTajoSchema(hcatField); + } + } + + private static void validateHCatFieldAndTajoSchema(HCatFieldSchema fieldSchema) throws CatalogException { + try { + HCatFieldSchema.Type fieldType = fieldSchema.getType(); + switch (fieldType) { + case ARRAY: + throw new HCatException("Tajo cannot support array field type."); + case STRUCT: + throw new HCatException("Tajo cannot support struct field type."); + case MAP: + throw new HCatException("Tajo cannot support map field type."); + } + } catch (HCatException e) { + throw new CatalogException("incompatible hcatalog types when assigning to tajo type. - " + + "HCatFieldSchema:" + fieldSchema); + } + } + + public static TajoDataTypes.Type getTajoFieldType(String fieldType) { + Preconditions.checkNotNull(fieldType); + + String typeStr = null; + + if(fieldType.equalsIgnoreCase(serdeConstants.INT_TYPE_NAME)) + typeStr = "INT4"; + else if(fieldType.equalsIgnoreCase(serdeConstants.TINYINT_TYPE_NAME)) + typeStr = "INT1"; + else if(fieldType.equalsIgnoreCase(serdeConstants.SMALLINT_TYPE_NAME)) + typeStr = "INT2"; + else if(fieldType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) + typeStr = "INT8"; + else if(fieldType.equalsIgnoreCase(serdeConstants.BOOLEAN_TYPE_NAME)) + typeStr = "BOOLEAN"; + else if(fieldType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) + typeStr = "FLOAT4"; + else if(fieldType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) + typeStr = "FLOAT8"; + else if(fieldType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)) + typeStr = "TEXT"; + else if(fieldType.equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME)) + typeStr = "BLOB"; + + try { + return Enum.valueOf(TajoDataTypes.Type.class, typeStr); + } catch (IllegalArgumentException iae) { + throw new CatalogException("Cannot find a matched type against from '" + typeStr + "'"); + } + } + + public static String getHiveFieldType(TajoDataTypes.DataType dataType) { + Preconditions.checkNotNull(dataType); + + switch (dataType.getType()) { + case CHAR: return serdeConstants.CHAR_TYPE_NAME; + case BOOLEAN: return serdeConstants.BOOLEAN_TYPE_NAME; + case INT1: return serdeConstants.TINYINT_TYPE_NAME; + case INT2: return serdeConstants.SMALLINT_TYPE_NAME; + case INT4: return serdeConstants.INT_TYPE_NAME; + case INT8: return serdeConstants.BIGINT_TYPE_NAME; + case FLOAT4: return serdeConstants.FLOAT_TYPE_NAME; + case FLOAT8: return serdeConstants.DOUBLE_TYPE_NAME; + case TEXT: return serdeConstants.STRING_TYPE_NAME; + case VARCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case NCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case NVARCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case BINARY: return serdeConstants.BINARY_TYPE_NAME; + case VARBINARY: return serdeConstants.BINARY_TYPE_NAME; + case BLOB: return serdeConstants.BINARY_TYPE_NAME; + case DATE: return serdeConstants.DATE_TYPE_NAME; + case TIMESTAMP: return serdeConstants.TIMESTAMP_TYPE_NAME; + default: + throw new CatalogException(dataType + " is not supported."); + } + } + + public static String getStoreType(String fileFormat) { + Preconditions.checkNotNull(fileFormat); + + String[] fileFormatArrary = fileFormat.split("\\."); + if(fileFormatArrary.length < 1) { + throw new CatalogException("Hive file output format is wrong. - file output format:" + fileFormat); + } + + String outputFormatClass = fileFormatArrary[fileFormatArrary.length-1]; + if(outputFormatClass.equals(HiveIgnoreKeyTextOutputFormat.class.getSimpleName())) { + return CatalogUtil.TEXTFILE_NAME; + } else if(outputFormatClass.equals(HiveSequenceFileOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.SEQUENCEFILE.name(); + } else if(outputFormatClass.equals(RCFileOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.RCFILE.name(); + } else if(outputFormatClass.equals(DeprecatedParquetOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.PARQUET.name(); + } else { + throw new CatalogException("Not supported file output format. - file output format:" + fileFormat); + } + } + + public static Table getTable(IMetaStoreClient client, String dbName, String tableName) throws TException { + return new Table(client.getTable(dbName, tableName)); + } +}
