Repository: flume Updated Branches: refs/heads/trunk 990776427 -> a7f9255a8
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java new file mode 100644 index 0000000..1fd60bc --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.flume.sink.hive; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.util.Shell; +import org.apache.hive.hcatalog.streaming.QueryFailedException; +import org.apache.thrift.TException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +public class TestUtil { + + private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + + /** + * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true, + * and the JDBC configs will be set for putting the transaction and lock info in the embedded + * metastore. + * @param conf HiveConf to add these values to. + */ + public static void setConfValues(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + conf.set("fs.raw.impl", RawFileSystem.class.getName()); + } + + public static void createDbAndTable(Driver driver, String databaseName, + String tableName, List<String> partVals, + String[] colNames, String[] colTypes, + String[] partNames, String dbLocation) + throws Exception { + String dbUri = "raw://" + dbLocation; + String tableLoc = dbUri + Path.SEPARATOR + tableName; + + runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'"); + runDDL(driver, "use " + databaseName); + String crtTbl = "create table " + tableName + + " ( " + getTableColumnsStr(colNames,colTypes) + " )" + + getPartitionStmtStr(partNames) + + " clustered by ( " + colNames[0] + " )" + + " into 10 buckets " + + " stored as orc " + + " location '" + tableLoc + "'"; + runDDL(driver, crtTbl); + System.out.println("crtTbl = " + crtTbl); + if (partNames!=null && partNames.length!=0) { + String addPart = "alter table " + tableName + " add partition ( " + + getTablePartsStr2(partNames, partVals) + " )"; + runDDL(driver, addPart); + } + } + + private static String getPartitionStmtStr(String[] partNames) { + if ( partNames == null || partNames.length == 0) { + return ""; + } + return " partitioned by (" + getTablePartsStr(partNames) + " )"; + } + + // delete db and all tables in it + public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException { + IMetaStoreClient client = new HiveMetaStoreClient(conf); + try { + for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) { + client.dropTable(databaseName, table, true, true); + } + client.dropDatabase(databaseName); + } catch (TException e) { + client.close(); + } + } + + private static String getTableColumnsStr(String[] colNames, String[] colTypes) { + StringBuffer sb = new StringBuffer(); + for (int i=0; i < colNames.length; ++i) { + sb.append(colNames[i] + " " + colTypes[i]); + if (i<colNames.length-1) { + sb.append(","); + } + } + return sb.toString(); + } + + // converts partNames into "partName1 string, partName2 string" + private static String getTablePartsStr(String[] partNames) { + if (partNames==null || partNames.length==0) { + return ""; + } + StringBuffer sb = new StringBuffer(); + for (int i=0; i < partNames.length; ++i) { + sb.append(partNames[i] + " string"); + if (i < partNames.length-1) { + sb.append(","); + } + } + return sb.toString(); + } + + // converts partNames,partVals into "partName1=val1, partName2=val2" + private static String getTablePartsStr2(String[] partNames, List<String> partVals) { + StringBuffer sb = new StringBuffer(); + for (int i=0; i < partVals.size(); ++i) { + sb.append(partNames[i] + " = '" + partVals.get(i) + "'"); + if (i < partVals.size()-1) { + sb.append(","); + } + } + return sb.toString(); + } + + public static ArrayList<String> listRecordsInTable(Driver driver, String dbName, String tblName) + throws CommandNeedRetryException, IOException { + driver.run("select * from " + dbName + "." + tblName); + ArrayList<String> res = new ArrayList<String>(); + driver.getResults(res); + return res; + } + + public static ArrayList<String> listRecordsInPartition(Driver driver, String dbName, + String tblName, String continent, String country) + throws CommandNeedRetryException, IOException { + driver.run("select * from " + dbName + "." + tblName + " where continent='" + + continent + "' and country='" + country + "'"); + ArrayList<String> res = new ArrayList<String>(); + driver.getResults(res); + return res; + } + + + public static class RawFileSystem extends RawLocalFileSystem { + private static final URI NAME; + static { + try { + NAME = new URI("raw:///"); + } catch (URISyntaxException se) { + throw new IllegalArgumentException("bad uri", se); + } + } + + @Override + public URI getUri() { + return NAME; + } + + static String execCommand(File f, String... cmd) throws IOException { + String[] args = new String[cmd.length + 1]; + System.arraycopy(cmd, 0, args, 0, cmd.length); + args[cmd.length] = f.getCanonicalPath(); + String output = Shell.execCommand(args); + return output; + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + File file = pathToFile(path); + if (!file.exists()) { + throw new FileNotFoundException("Can't find " + path); + } + // get close enough + short mod = 0; + if (file.canRead()) { + mod |= 0444; + } + if (file.canWrite()) { + mod |= 0200; + } + if (file.canExecute()) { + mod |= 0111; + } + ShimLoader.getHadoopShims(); + return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + file.lastModified(), file.lastModified(), + FsPermission.createImmutable(mod), "owen", "users", path); + } + } + private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { + int retryCount = 1; // # of times to retry if first attempt fails + for (int attempt=0; attempt <= retryCount; ++attempt) { + try { + driver.run(sql); + return true; + } catch (CommandNeedRetryException e) { + if (attempt == retryCount) { + throw new QueryFailedException(sql, e); + } + continue; + } + } // for + return false; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties new file mode 100644 index 0000000..252b5ea --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/test/resources/log4j.properties @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +log4j.rootLogger = INFO, out + +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n + +log4j.logger.org.apache.flume = DEBUG +log4j.logger.org.apache.hadoop = WARN +log4j.logger.org.mortbay = WARN http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml index 4bac019..de12891 100644 --- a/flume-ng-sinks/pom.xml +++ b/flume-ng-sinks/pom.xml @@ -72,6 +72,7 @@ limitations under the License. --> <modules> <module>flume-dataset-sink</module> + <module>flume-hive-sink</module> </modules> </profile> @@ -87,6 +88,7 @@ limitations under the License. --> <modules> <module>flume-dataset-sink</module> + <module>flume-hive-sink</module> </modules> </profile> http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java index eba8d2e..3b5b3c7 100644 --- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java +++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java @@ -168,15 +168,20 @@ public class Scribe { super("Log"); } - protected Log_args getEmptyArgsInstance() { + public Log_args getEmptyArgsInstance() { return new Log_args(); } - protected Log_result getResult(I iface, Log_args args) throws org.apache.thrift.TException { + public Log_result getResult(I iface, Log_args args) throws org.apache + .thrift.TException { Log_result result = new Log_result(); result.success = iface.Log(args.messages); return result; } + + public boolean isOneway() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1350fa4..ea7ffe3 100644 --- a/pom.xml +++ b/pom.xml @@ -48,11 +48,12 @@ limitations under the License. <avro.version>1.7.4</avro.version> <elasticsearch.version>0.90.1</elasticsearch.version> - <hadoop2.version>2.4.0</hadoop2.version> <thrift.version>0.7.0</thrift.version> <kite.version>0.17.1</kite.version> - <hive.version>0.10.0</hive.version> + <hive.version>0.13.1</hive.version> + <xalan.verion>2.7.1</xalan.verion> + <xerces.version>2.9.1</xerces.version> </properties> <modules> @@ -81,7 +82,7 @@ limitations under the License. </property> </activation> <properties> - <hadoop.version>1.0.1</hadoop.version> + <hadoop.version>1.2.1</hadoop.version> <hbase.version>0.92.1</hbase.version> <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id> <thrift.version>0.7.0</thrift.version> @@ -133,7 +134,7 @@ limitations under the License. <hadoop.version>${hadoop2.version}</hadoop.version> <hbase.version>0.94.2</hbase.version> <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> - <thrift.version>0.8.0</thrift.version> + <thrift.version>0.9.0</thrift.version> </properties> <dependencyManagement> <dependencies> @@ -149,6 +150,11 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> </dependency> @@ -211,7 +217,7 @@ limitations under the License. <hadoop.version>${hadoop2.version}</hadoop.version> <hbase.version>0.98.2-hadoop2</hbase.version> <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> - <thrift.version>0.8.0</thrift.version> + <thrift.version>0.9.0</thrift.version> </properties> <dependencyManagement> <dependencies> @@ -736,6 +742,8 @@ limitations under the License. <exclude>**/.classpath</exclude> <exclude>**/.project</exclude> <exclude>**/target/**</exclude> + <exclude>**/derby.log</exclude> + <exclude>**/metastore_db/</exclude> </excludes> </configuration> </execution> @@ -971,6 +979,7 @@ limitations under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <optional>true</optional> </dependency> <dependency> @@ -1026,6 +1035,15 @@ limitations under the License. <artifactId>joda-time</artifactId> <version>2.1</version> </dependency> + <!-- Adding zookeeper as dependency, beacuse it was + pulling different version of zookeeper as transient + dependency from asynchbase. --> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.version}</version> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -1123,6 +1141,12 @@ limitations under the License. <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-hive-sink</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-irc-sink</artifactId> <version>1.6.0-SNAPSHOT</version> </dependency> @@ -1331,6 +1355,18 @@ limitations under the License. <version>1.1.0</version> </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-streaming</artifactId> + <version>${hive.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + </dependency> + <!-- Dependency for Zk provider --> <dependency> <groupId>org.apache.curator</groupId>
