http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat new file mode 100644 index 0000000..fd69350 Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat differ
http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat new file mode 100644 index 0000000..9ae5d87 Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat differ http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat new file mode 100644 index 0000000..762cc2a Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat differ http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat new file mode 100644 index 0000000..fd69350 Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat differ http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties new file mode 100644 index 0000000..e385a7b --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties @@ -0,0 +1,22 @@ +#/Users/hshreedharan/work/flume-latest/flume/flume-ng-sinks/flume-hive-sink/metastore_db +# ******************************************************************** +# *** Please do NOT edit this file. *** +# *** CHANGING THE CONTENT OF THIS FILE MAY CAUSE DATA CORRUPTION. *** +# ******************************************************************** +#Mon Feb 09 20:19:39 PST 2015 +SysschemasIndex2Identifier=225 +SyscolumnsIdentifier=144 +SysconglomeratesIndex1Identifier=49 +SysconglomeratesIdentifier=32 +SyscolumnsIndex2Identifier=177 +SysschemasIndex1Identifier=209 +SysconglomeratesIndex3Identifier=81 +SystablesIndex2Identifier=129 +SyscolumnsIndex1Identifier=161 +derby.serviceProtocol=org.apache.derby.database.Database +SysschemasIdentifier=192 +derby.storage.propertiesId=16 +SysconglomeratesIndex2Identifier=65 +derby.serviceLocale=en_US +SystablesIdentifier=96 +SystablesIndex1Identifier=113 http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml b/flume-ng-sinks/flume-hive-sink/pom.xml new file mode 100644 index 0000000..e5f673a --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/pom.xml @@ -0,0 +1,189 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sinks</artifactId> + <version>1.6.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-hive-sink</artifactId> + <name>Flume NG Hive Sink</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop-1.0</id> + <activation> + <property> + <name>!hadoop.profile</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop-2</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>2</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </profile> + + <profile> + <id>hbase-98</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>hbase-98</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> + + <dependencies> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-configuration</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-streaming</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <scope>provided</scope> + <version>${hive.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <scope>test</scope> + </dependency> + + <!--temporary - really belongs to hive-streaming : roshan --> + <dependency> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + <scope>runtime</scope> + <version>2.9.1</version> + </dependency> + + <dependency> + <groupId>xalan</groupId> + <artifactId>serializer</artifactId> + <version>2.7.1</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>xalan</groupId> + <artifactId>xalan</artifactId> + <scope>runtime</scope> + <version>2.7.1</version> + </dependency> + <!-- end temporary --> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java new file mode 100644 index 0000000..b2d2582 --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + +public class Config { + public static final String HIVE_METASTORE = "hive.metastore"; + public static final String HIVE_DATABASE = "hive.database"; + public static final String HIVE_TABLE = "hive.table"; + public static final String HIVE_PARTITION = "hive.partition"; + public static final String HIVE_TXNS_PER_BATCH_ASK = "hive.txnsPerBatchAsk"; + public static final String BATCH_SIZE = "batchSize"; + public static final String IDLE_TIMEOUT = "idleTimeout"; + public static final String CALL_TIMEOUT = "callTimeout"; + public static final String HEART_BEAT_INTERVAL = "heartBeatInterval"; + public static final String MAX_OPEN_CONNECTIONS = "maxOpenConnections"; + public static final String USE_LOCAL_TIME_STAMP = "useLocalTimeStamp"; + public static final String TIME_ZONE = "timeZone"; + public static final String ROUND_UNIT = "roundUnit"; + public static final String ROUND = "round"; + public static final String HOUR = "hour"; + public static final String MINUTE = "minute"; + public static final String SECOND = "second"; + public static final String ROUND_VALUE = "roundValue"; + public static final String SERIALIZER = "serializer"; +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java new file mode 100644 index 0000000..8f64435 --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.TransactionBatch; + +import java.io.IOException; + +/** Forwards the incoming event body to Hive unmodified + * Sets up the delimiter and the field to column mapping + */ +public class HiveDelimitedTextSerializer implements HiveEventSerializer { + public static final String ALIAS = "DELIMITED"; + + public static final String defaultDelimiter = ","; + public static final String SERIALIZER_DELIMITER = "serializer.delimiter"; + public static final String SERIALIZER_FIELDNAMES = "serializer.fieldnames"; + public static final String SERIALIZER_SERDE_SEPARATOR = "serializer.serdeSeparator"; + + private String delimiter; + private String[] fieldToColMapping = null; + private Character serdeSeparator = null; + + @Override + public void write(TransactionBatch txnBatch, Event e) + throws StreamingException, IOException, InterruptedException { + txnBatch.write(e.getBody()); + } + + @Override + public RecordWriter createRecordWriter(HiveEndPoint endPoint) + throws StreamingException, IOException, ClassNotFoundException { + if (serdeSeparator == null) { + return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint); + } + return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, null + , serdeSeparator); + } + + @Override + public void configure(Context context) { + delimiter = parseDelimiterSpec( + context.getString(SERIALIZER_DELIMITER, defaultDelimiter) ); + String fieldNames = context.getString(SERIALIZER_FIELDNAMES); + if (fieldNames == null) { + throw new IllegalArgumentException("serializer.fieldnames is not specified " + + "for serializer " + this.getClass().getName() ); + } + String serdeSeparatorStr = context.getString(SERIALIZER_SERDE_SEPARATOR); + this.serdeSeparator = parseSerdeSeparatorSpec(serdeSeparatorStr); + + // split, but preserve empty fields (-1) + fieldToColMapping = fieldNames.trim().split(",",-1); + } + + // if delimiter is a double quoted like "\t", drop quotes + private static String parseDelimiterSpec(String delimiter) { + if (delimiter == null) { + return null; + } + if (delimiter.charAt(0) == '"' && + delimiter.charAt(delimiter.length()-1) == '"') { + return delimiter.substring(1,delimiter.length()-1); + } + return delimiter; + } + + // if delimiter is a single quoted character like '\t', drop quotes + private static Character parseSerdeSeparatorSpec(String separatorStr) { + if (separatorStr == null) { + return null; + } + if (separatorStr.length() == 1) { + return separatorStr.charAt(0); + } + if (separatorStr.length() == 3 && + separatorStr.charAt(2) == '\'' && + separatorStr.charAt(separatorStr.length()-1) == '\'') { + return separatorStr.charAt(1); + } + + throw new IllegalArgumentException("serializer.serdeSeparator spec is invalid " + + "for " + ALIAS + " serializer " ); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java new file mode 100644 index 0000000..c233d3d --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.TransactionBatch; + +import java.io.IOException; + +public interface HiveEventSerializer extends Configurable { + public void write(TransactionBatch batch, Event e) + throws StreamingException, IOException, InterruptedException; + + RecordWriter createRecordWriter(HiveEndPoint endPoint) + throws StreamingException, IOException, ClassNotFoundException; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java new file mode 100644 index 0000000..a75073f --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.StrictJsonWriter; +import org.apache.hive.hcatalog.streaming.TransactionBatch; + +import java.io.IOException; + +/** Forwards the incoming event body to Hive unmodified + * Sets up the delimiter and the field to column mapping + */ + +public class HiveJsonSerializer implements HiveEventSerializer { + public static final String ALIAS = "JSON"; + + @Override + public void write(TransactionBatch txnBatch, Event e) + throws StreamingException, IOException, InterruptedException { + txnBatch.write(e.getBody()); + } + + @Override + public RecordWriter createRecordWriter(HiveEndPoint endPoint) + throws StreamingException, IOException, ClassNotFoundException { + return new StrictJsonWriter(endPoint); + } + + @Override + public void configure(Context context) { + return; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java new file mode 100644 index 0000000..6fe332a --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.formatter.output.BucketPath; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.AbstractSink; +import org.apache.hive.hcatalog.streaming.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TimeZone; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HiveSink extends AbstractSink implements Configurable { + + private static final Logger LOG = LoggerFactory + .getLogger(HiveSink.class); + + private static final int DEFAULT_MAXOPENCONNECTIONS = 500; + private static final int DEFAULT_TXNSPERBATCH = 100; + private static final int DEFAULT_BATCHSIZE = 15000; + private static final int DEFAULT_CALLTIMEOUT = 10000; + private static final int DEFAULT_IDLETIMEOUT = 0; + private static final int DEFAULT_HEARTBEATINTERVAL = 240; // seconds + + + private Map<HiveEndPoint, HiveWriter> allWriters; + + private SinkCounter sinkCounter; + private volatile int idleTimeout; + private String metaStoreUri; + private String proxyUser; + private String database; + private String table; + private List<String> partitionVals; + private Integer txnsPerBatchAsk; + private Integer batchSize; + private Integer maxOpenConnections; + private boolean autoCreatePartitions; + private String serializerType; + private HiveEventSerializer serializer; + + /** + * Default timeout for blocking I/O calls in HiveWriter + */ + private Integer callTimeout; + private Integer heartBeatInterval; + + private ExecutorService callTimeoutPool; + + private boolean useLocalTime; + private TimeZone timeZone; + private boolean needRounding; + private int roundUnit; + private Integer roundValue; + + private Timer heartBeatTimer = new Timer(); + private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); + + @VisibleForTesting + Map<HiveEndPoint, HiveWriter> getAllWriters() { + return allWriters; + } + + // read configuration and setup thresholds + @Override + public void configure(Context context) { + + metaStoreUri = context.getString(Config.HIVE_METASTORE); + if (metaStoreUri == null) { + throw new IllegalArgumentException(Config.HIVE_METASTORE + " config setting is not " + + "specified for sink " + getName()); + } + if (metaStoreUri.equalsIgnoreCase("null")) { // for testing support + metaStoreUri = null; + } + proxyUser = null; // context.getString("hive.proxyUser"); not supported by hive api yet + database = context.getString(Config.HIVE_DATABASE); + if (database == null) { + throw new IllegalArgumentException(Config.HIVE_DATABASE + " config setting is not " + + "specified for sink " + getName()); + } + table = context.getString(Config.HIVE_TABLE); + if (table == null) { + throw new IllegalArgumentException(Config.HIVE_TABLE + " config setting is not " + + "specified for sink " + getName()); + } + + String partitions = context.getString(Config.HIVE_PARTITION); + if (partitions != null) { + partitionVals = Arrays.asList(partitions.split(",")); + } + + + txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, DEFAULT_TXNSPERBATCH); + if (txnsPerBatchAsk < 0) { + LOG.warn(getName() + ". hive.txnsPerBatchAsk must be positive number. Defaulting to " + + DEFAULT_TXNSPERBATCH); + txnsPerBatchAsk = DEFAULT_TXNSPERBATCH; + } + batchSize = context.getInteger(Config.BATCH_SIZE, DEFAULT_BATCHSIZE); + if (batchSize < 0) { + LOG.warn(getName() + ". batchSize must be positive number. Defaulting to " + + DEFAULT_BATCHSIZE); + batchSize = DEFAULT_BATCHSIZE; + } + idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, DEFAULT_IDLETIMEOUT); + if (idleTimeout < 0) { + LOG.warn(getName() + ". idleTimeout must be positive number. Defaulting to " + + DEFAULT_IDLETIMEOUT); + idleTimeout = DEFAULT_IDLETIMEOUT; + } + callTimeout = context.getInteger(Config.CALL_TIMEOUT, DEFAULT_CALLTIMEOUT); + if (callTimeout < 0) { + LOG.warn(getName() + ". callTimeout must be positive number. Defaulting to " + + DEFAULT_CALLTIMEOUT); + callTimeout = DEFAULT_CALLTIMEOUT; + } + + heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, DEFAULT_HEARTBEATINTERVAL); + if (heartBeatInterval < 0) { + LOG.warn(getName() + ". heartBeatInterval must be positive number. Defaulting to " + + DEFAULT_HEARTBEATINTERVAL); + heartBeatInterval = DEFAULT_HEARTBEATINTERVAL; + } + maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, DEFAULT_MAXOPENCONNECTIONS); + autoCreatePartitions = context.getBoolean("autoCreatePartitions", true); + + // Timestamp processing + useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false); + + String tzName = context.getString(Config.TIME_ZONE); + timeZone = (tzName == null) ? null : TimeZone.getTimeZone(tzName); + needRounding = context.getBoolean(Config.ROUND, false); + + String unit = context.getString(Config.ROUND_UNIT, Config.MINUTE); + if (unit.equalsIgnoreCase(Config.HOUR)) { + this.roundUnit = Calendar.HOUR_OF_DAY; + } else if (unit.equalsIgnoreCase(Config.MINUTE)) { + this.roundUnit = Calendar.MINUTE; + } else if (unit.equalsIgnoreCase(Config.SECOND)){ + this.roundUnit = Calendar.SECOND; + } else { + LOG.warn(getName() + ". Rounding unit is not valid, please set one of " + + "minute, hour or second. Rounding will be disabled"); + needRounding = false; + } + this.roundValue = context.getInteger(Config.ROUND_VALUE, 1); + if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){ + Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, + "Round value must be > 0 and <= 60"); + } else if (roundUnit == Calendar.HOUR_OF_DAY){ + Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, + "Round value must be > 0 and <= 24"); + } + + // Serializer + serializerType = context.getString(Config.SERIALIZER, ""); + if (serializerType.isEmpty()) { + throw new IllegalArgumentException("serializer config setting is not " + + "specified for sink " + getName()); + } + + serializer = createSerializer(serializerType); + serializer.configure(context); + + Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0"); + + if (sinkCounter == null) { + sinkCounter = new SinkCounter(getName()); + } + } + + @VisibleForTesting + protected SinkCounter getCounter() { + return sinkCounter; + } + private HiveEventSerializer createSerializer(String serializerName) { + if(serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 || + serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) { + return new HiveDelimitedTextSerializer(); + } else if (serializerName.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 0 || + serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) { + return new HiveJsonSerializer(); + } + + try { + return (HiveEventSerializer) Class.forName(serializerName).newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to instantiate serializer: " + serializerName + + " on sink: " + getName(), e); + } + } + + + /** + * Pull events out of channel, find corresponding HiveWriter and write to it. + * Take at most batchSize events per Transaction. <br/> + * This method is not thread safe. + */ + public Status process() throws EventDeliveryException { + // writers used in this Txn + + Channel channel = getChannel(); + Transaction transaction = channel.getTransaction(); + transaction.begin(); + boolean success = false; + try { + // 1 Enable Heart Beats + if (timeToSendHeartBeat.compareAndSet(true, false)) { + enableHeartBeatOnAllWriters(); + } + + // 2 Drain Batch + int txnEventCount = drainOneBatch(channel); + transaction.commit(); + success = true; + + // 3 Update Counters + if (txnEventCount < 1) { + return Status.BACKOFF; + } else { + return Status.READY; + } + } catch (InterruptedException err) { + LOG.warn(getName() + ": Thread was interrupted.", err); + return Status.BACKOFF; + } catch (Exception e) { + throw new EventDeliveryException(e); + } finally { + if (!success) { + transaction.rollback(); + } + transaction.close(); + } + } + + // Drains one batch of events from Channel into Hive + private int drainOneBatch(Channel channel) + throws HiveWriter.Failure, InterruptedException { + int txnEventCount = 0; + try { + Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap(); + for (; txnEventCount < batchSize; ++txnEventCount) { + // 0) Read event from Channel + Event event = channel.take(); + if (event == null) { + break; + } + + //1) Create end point by substituting place holders + HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table, + partitionVals, event.getHeaders(), timeZone, + needRounding, roundUnit, roundValue, useLocalTime); + + //2) Create or reuse Writer + HiveWriter writer = getOrCreateWriter(activeWriters, endPoint); + + //3) Write + LOG.debug("{} : Writing event to {}", getName(), endPoint); + writer.write(event); + + } // for + + //4) Update counters + if (txnEventCount == 0) { + sinkCounter.incrementBatchEmptyCount(); + } else if (txnEventCount == batchSize) { + sinkCounter.incrementBatchCompleteCount(); + } else { + sinkCounter.incrementBatchUnderflowCount(); + } + sinkCounter.addToEventDrainAttemptCount(txnEventCount); + + + // 5) Flush all Writers + for (HiveWriter writer : activeWriters.values()) { + writer.flush(true); + } + + sinkCounter.addToEventDrainSuccessCount(txnEventCount); + return txnEventCount; + } catch (HiveWriter.Failure e) { + LOG.warn(getName() + " : " + e.getMessage(), e); + abortAllWriters(); + closeAllWriters(); + throw e; + } + } + + private void enableHeartBeatOnAllWriters() { + for (HiveWriter writer : allWriters.values()) { + writer.setHearbeatNeeded(); + } + } + + private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters, + HiveEndPoint endPoint) + throws HiveWriter.ConnectException, InterruptedException { + try { + HiveWriter writer = allWriters.get( endPoint ); + if (writer == null) { + LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint); + writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions, + callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter); + + sinkCounter.incrementConnectionCreatedCount(); + if (allWriters.size() > maxOpenConnections){ + int retired = closeIdleWriters(); + if (retired == 0) { + closeEldestWriter(); + } + } + allWriters.put(endPoint, writer); + activeWriters.put(endPoint, writer); + } + else { + if (activeWriters.get(endPoint) == null) { + activeWriters.put(endPoint,writer); + } + } + return writer; + } catch (HiveWriter.ConnectException e) { + sinkCounter.incrementConnectionFailedCount(); + throw e; + } + + } + + private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table, + List<String> partVals, Map<String, String> headers, + TimeZone timeZone, boolean needRounding, + int roundUnit, Integer roundValue, + boolean useLocalTime) { + if (partVals == null) { + return new HiveEndPoint(metaStoreUri, database, table, null); + } + + ArrayList<String> realPartVals = Lists.newArrayList(); + for (String partVal : partVals) { + realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone, + needRounding, roundUnit, roundValue, useLocalTime)); + } + return new HiveEndPoint(metaStoreUri, database, table, realPartVals); + } + + /** + * Locate writer that has not been used for longest time and retire it + */ + private void closeEldestWriter() throws InterruptedException { + long oldestTimeStamp = System.currentTimeMillis(); + HiveEndPoint eldest = null; + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + if (entry.getValue().getLastUsed() < oldestTimeStamp) { + eldest = entry.getKey(); + oldestTimeStamp = entry.getValue().getLastUsed(); + } + } + + try { + sinkCounter.incrementConnectionCreatedCount(); + LOG.info(getName() + ": Closing least used Writer to Hive EndPoint : " + eldest); + allWriters.remove(eldest).close(); + } catch (InterruptedException e) { + LOG.warn(getName() + ": Interrupted when attempting to close writer for end point: " + + eldest, e); + throw e; + } + } + + /** + * Locate all writers past idle timeout and retire them + * @return number of writers retired + */ + private int closeIdleWriters() throws InterruptedException { + int count = 0; + long now = System.currentTimeMillis(); + ArrayList<HiveEndPoint> retirees = Lists.newArrayList(); + + //1) Find retirement candidates + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + if (now - entry.getValue().getLastUsed() > idleTimeout) { + ++count; + retirees.add(entry.getKey()); + } + } + //2) Retire them + for(HiveEndPoint ep : retirees) { + sinkCounter.incrementConnectionClosedCount(); + LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", ep); + allWriters.remove(ep).close(); + } + return count; + } + + /** + * Closes all writers and remove them from cache + * @return number of writers retired + */ + private void closeAllWriters() throws InterruptedException { + //1) Retire writers + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + entry.getValue().close(); + } + + //2) Clear cache + allWriters.clear(); + } + + /** + * Abort current Txn on all writers + * @return number of writers retired + */ + private void abortAllWriters() throws InterruptedException { + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + entry.getValue().abort(); + } + } + + @Override + public void stop() { + // do not constrain close() calls with a timeout + for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { + try { + HiveWriter w = entry.getValue(); + LOG.info("Closing connection to {}", w); + w.closeConnection(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + // shut down all thread pools + callTimeoutPool.shutdown(); + try { + while (callTimeoutPool.isTerminated() == false) { + callTimeoutPool.awaitTermination( + Math.max(DEFAULT_CALLTIMEOUT, callTimeout), TimeUnit.MILLISECONDS); + } + } catch (InterruptedException ex) { + LOG.warn(getName() + ":Shutdown interrupted on " + callTimeoutPool, ex); + } + + callTimeoutPool = null; + allWriters.clear(); + allWriters = null; + sinkCounter.stop(); + super.stop(); + LOG.info("Hive Sink {} stopped", getName() ); + } + + @Override + public void start() { + String timeoutName = "hive-" + getName() + "-call-runner-%d"; + // call timeout pool needs only 1 thd as sink is effectively single threaded + callTimeoutPool = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + + this.allWriters = Maps.newHashMap(); + sinkCounter.start(); + super.start(); + setupHeartBeatTimer(); + LOG.info(getName() + ": Hive Sink {} started", getName() ); + } + + private void setupHeartBeatTimer() { + if (heartBeatInterval > 0) { + heartBeatTimer.schedule(new TimerTask() { + @Override + public void run() { + timeToSendHeartBeat.set(true); + setupHeartBeatTimer(); + } + }, heartBeatInterval * 1000); + } + } + + + @Override + public String toString() { + return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + + " }"; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java new file mode 100644 index 0000000..4a06feb --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hive; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hive.hcatalog.streaming.*; + +import org.apache.flume.Event; + +import org.apache.flume.instrumentation.SinkCounter; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Internal API intended for HiveSink use. + */ +class HiveWriter { + + private static final Logger LOG = LoggerFactory + .getLogger(HiveWriter.class); + + private final HiveEndPoint endPoint; + private HiveEventSerializer serializer; + private final StreamingConnection connection; + private final int txnsPerBatch; + private final RecordWriter recordWriter; + private TransactionBatch txnBatch; + + private final ExecutorService callTimeoutPool; + + private final long callTimeout; + + private long lastUsed; // time of last flush on this writer + + private SinkCounter sinkCounter; + private int batchCounter; + private long eventCounter; + private long processSize; + + protected boolean closed; // flag indicating HiveWriter was closed + private boolean autoCreatePartitions; + + private boolean hearbeatNeeded = false; + + HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, + boolean autoCreatePartitions, long callTimeout, + ExecutorService callTimeoutPool, String hiveUser, + HiveEventSerializer serializer, SinkCounter sinkCounter) + throws ConnectException, InterruptedException { + try { + this.autoCreatePartitions = autoCreatePartitions; + this.sinkCounter = sinkCounter; + this.callTimeout = callTimeout; + this.callTimeoutPool = callTimeoutPool; + this.endPoint = endPoint; + this.connection = newConnection(hiveUser); + this.txnsPerBatch = txnsPerBatch; + this.serializer = serializer; + this.recordWriter = serializer.createRecordWriter(endPoint); + this.txnBatch = nextTxnBatch(recordWriter); + this.closed = false; + this.lastUsed = System.currentTimeMillis(); + } catch (InterruptedException e) { + throw e; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new ConnectException(endPoint, e); + } + } + + @Override + public String toString() { + return endPoint.toString(); + } + + /** + * Clear the class counters + */ + private void resetCounters() { + eventCounter = 0; + processSize = 0; + batchCounter = 0; + } + + void setHearbeatNeeded() { + hearbeatNeeded = true; + } + + + /** + * Write data, update stats + * @param event + * @throws StreamingException + * @throws InterruptedException + */ + public synchronized void write(final Event event) + throws WriteException, InterruptedException { + if (closed) { + throw new IllegalStateException("Writer closed. Cannot write to : " + endPoint); + } + + // write the event + try { + timedCall(new CallRunner1<Void>() { + @Override + public Void call() throws InterruptedException, StreamingException { + try { + serializer.write(txnBatch, event); + return null; + } catch (IOException e) { + throw new StreamingIOFailure(e.getMessage(), e); + } + } + }); + } catch (StreamingException e) { + throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e); + } catch (TimeoutException e) { + throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e); + } + + // Update Statistics + processSize += event.getBody().length; + eventCounter++; + } + + /** + * Commits the current Txn. + * If 'rollToNext' is true, will switch to next Txn in batch or to a + * new TxnBatch if current Txn batch is exhausted + */ + public void flush(boolean rollToNext) + throws CommitException, TxnBatchException, TxnFailure, InterruptedException { + //0 Heart beat on TxnBatch + if(hearbeatNeeded) { + hearbeatNeeded = false; + heartBeat(); + } + lastUsed = System.currentTimeMillis(); + + try { + //1 commit txn & close batch if needed + commitTxn(); + if(txnBatch.remainingTransactions() == 0) { + closeTxnBatch(); + txnBatch = null; + if(rollToNext) { + txnBatch = nextTxnBatch(recordWriter); + } + } + + //2 roll to next Txn + if(rollToNext) { + LOG.debug("Switching to next Txn for {}", endPoint); + txnBatch.beginNextTransaction(); // does not block + } + } catch (StreamingException e) { + throw new TxnFailure(txnBatch, e); + } + } + + /** + * Aborts the current Txn and switches to next Txn. + * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn + */ + public void abort() throws InterruptedException { + abortTxn(); + } + + /** Queues up a heartbeat request on the current and remaining txns using the + * heartbeatThdPool and returns immediately + */ + public void heartBeat() throws InterruptedException { + // 1) schedule the heartbeat on one thread in pool + try { + timedCall(new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException { + LOG.info("Sending heartbeat on batch " + txnBatch); + txnBatch.heartbeat(); + return null; + } + }); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e); + // Suppressing exceptions as we don't care for errors on heartbeats + } + } + + /** + * Close the Transaction Batch and connection + * @throws IOException + * @throws InterruptedException + */ + public void close() throws InterruptedException { + closeTxnBatch(); + closeConnection(); + closed = true; + } + + public void closeConnection() throws InterruptedException { + LOG.info("Closing connection to EndPoint : {}", endPoint); + try { + timedCall(new CallRunner1<Void>() { + @Override + public Void call() { + connection.close(); // could block + return null; + } + }); + sinkCounter.incrementConnectionClosedCount(); + } catch (Exception e) { + LOG.warn("Error closing connection to EndPoint : " + endPoint, e); + // Suppressing exceptions as we don't care for errors on connection close + } + } + + private void commitTxn() throws CommitException, InterruptedException { + if (LOG.isInfoEnabled()) { + LOG.info("Committing Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint); + } + try { + timedCall(new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.commit(); // could block + return null; + } + }); + } catch (Exception e) { + throw new CommitException(endPoint, txnBatch.getCurrentTxnId(), e); + } + } + + private void abortTxn() throws InterruptedException { + LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint); + try { + timedCall(new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.abort(); // could block + return null; + } + }); + } catch (InterruptedException e) { + throw e; + } catch (TimeoutException e) { + LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e); + } catch (Exception e) { + LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e); + // Suppressing exceptions as we don't care for errors on abort + } + } + + private StreamingConnection newConnection(final String proxyUser) + throws InterruptedException, ConnectException { + try { + return timedCall(new CallRunner1<StreamingConnection>() { + @Override + public StreamingConnection call() throws InterruptedException, StreamingException { + return endPoint.newConnection(autoCreatePartitions); // could block + } + }); + } catch (Exception e) { + throw new ConnectException(endPoint, e); + } + } + + private TransactionBatch nextTxnBatch(final RecordWriter recordWriter) + throws InterruptedException, TxnBatchException { + LOG.debug("Fetching new Txn Batch for {}", endPoint); + TransactionBatch batch = null; + try { + batch = timedCall(new CallRunner1<TransactionBatch>() { + @Override + public TransactionBatch call() throws InterruptedException, StreamingException { + return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block + } + }); + LOG.info("Acquired Txn Batch {}. Switching to first txn", batch); + batch.beginNextTransaction(); + } catch (Exception e) { + throw new TxnBatchException(endPoint, e); + } + return batch; + } + + private void closeTxnBatch() throws InterruptedException { + try { + LOG.debug("Closing Txn Batch {}", txnBatch); + timedCall(new CallRunner1<Void>() { + @Override + public Void call() throws InterruptedException, StreamingException { + txnBatch.close(); // could block + return null; + } + }); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOG.warn("Error closing Txn Batch " + txnBatch, e); + // Suppressing exceptions as we don't care for errors on batch close + } + } + + private <T> T timedCall(final CallRunner1<T> callRunner) + throws TimeoutException, InterruptedException, StreamingException { + Future<T> future = callTimeoutPool.submit(new Callable<T>() { + @Override + public T call() throws StreamingException, InterruptedException { + return callRunner.call(); + } + }); + + try { + if (callTimeout > 0) { + return future.get(callTimeout, TimeUnit.MILLISECONDS); + } else { + return future.get(); + } + } catch (TimeoutException eT) { + future.cancel(true); + sinkCounter.incrementConnectionFailedCount(); + throw eT; + } catch (ExecutionException e1) { + sinkCounter.incrementConnectionFailedCount(); + Throwable cause = e1.getCause(); + if (cause instanceof IOException ) { + throw new StreamingIOFailure("I/O Failure", (IOException) cause); + } else if (cause instanceof StreamingException) { + throw (StreamingException) cause; + } else if (cause instanceof TimeoutException) { + throw new StreamingException("Operation Timed Out.", (TimeoutException) cause); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof InterruptedException) { + throw (InterruptedException) cause; + } + throw new StreamingException(e1.getMessage(), e1); + } + } + + long getLastUsed() { + return lastUsed; + } + + /** + * Simple interface whose <tt>call</tt> method is called by + * {#callWithTimeout} in a new thread inside a + * {@linkplain java.security.PrivilegedExceptionAction#run()} call. + * @param <T> + */ + private interface CallRunner<T> { + T call() throws Exception; + } + + + private interface CallRunner1<T> { + T call() throws StreamingException, InterruptedException; + } + + + public static class Failure extends Exception { + public Failure(String msg, Throwable cause) { + super(msg, cause); + } + } + + public static class WriteException extends Failure { + public WriteException(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) { + super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause); + } + } + + public static class CommitException extends Failure { + public CommitException(HiveEndPoint endPoint, Long txnID, Throwable cause) { + super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause); + } + } + + public static class ConnectException extends Failure { + public ConnectException(HiveEndPoint ep, Throwable cause) { + super("Failed connecting to EndPoint " + ep, cause); + } + } + + public static class TxnBatchException extends Failure { + public TxnBatchException(HiveEndPoint ep, Throwable cause) { + super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause); + } + } + + private class TxnFailure extends Failure { + public TxnFailure(TransactionBatch txnBatch, Throwable cause) { + super("Failed switching to next Txn in TxnBatch " + txnBatch, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java new file mode 100644 index 0000000..46724f2 --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.flume.sink.hive; + +import com.google.common.collect.Lists; +import junit.framework.Assert; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.SimpleEvent; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.UUID; + +public class TestHiveSink { + // 1) partitioned table + final static String dbName = "testing"; + final static String tblName = "alerts"; + + public static final String PART1_NAME = "continent"; + public static final String PART2_NAME = "country"; + public static final String[] partNames = { PART1_NAME, PART2_NAME }; + + private static final String COL1 = "id"; + private static final String COL2 = "msg"; + final String[] colNames = {COL1,COL2}; + private String[] colTypes = { "int", "string" }; + + private static final String PART1_VALUE = "Asia"; + private static final String PART2_VALUE = "India"; + private final ArrayList<String> partitionVals; + + // 2) un-partitioned table + final static String dbName2 = "testing2"; + final static String tblName2 = "alerts2"; + final String[] colNames2 = {COL1,COL2}; + private String[] colTypes2 = { "int", "string" }; + + HiveSink sink = new HiveSink(); + + private final HiveConf conf; + + private final Driver driver; + + final String metaStoreURI; + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + + private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class); + + public TestHiveSink() throws Exception { + partitionVals = new ArrayList<String>(2); + partitionVals.add(PART1_VALUE); + partitionVals.add(PART2_VALUE); + + metaStoreURI = "null"; + + conf = new HiveConf(this.getClass()); + TestUtil.setConfValues(conf); + + // 1) prepare hive + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + // 2) Setup Hive client + SessionState.start(new CliSessionState(conf)); + driver = new Driver(conf); + + } + + + @Before + public void setUp() throws Exception { + TestUtil.dropDB(conf, dbName); + + sink = new HiveSink(); + sink.setName("HiveSink-" + UUID.randomUUID().toString()); + + String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + TestUtil.createDbAndTable(driver, dbName, tblName, partitionVals, colNames, + colTypes, partNames, dbLocation); + } + + @After + public void tearDown() throws MetaException, HiveException { + TestUtil.dropDB(conf, dbName); + } + + + @Test + public void testSingleWriterSimplePartitionedTable() + throws EventDeliveryException, IOException, CommandNeedRetryException { + int totalRecords = 4; + int batchSize = 2; + int batchCount = totalRecords / batchSize; + + Context context = new Context(); + context.put("hive.metastore", metaStoreURI); + context.put("hive.database",dbName); + context.put("hive.table",tblName); + context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + context.put("autoCreatePartitions","false"); + context.put("batchSize","" + batchSize); + context.put("serializer", HiveDelimitedTextSerializer.ALIAS); + context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + context.put("heartBeatInterval", "0"); + + Channel channel = startSink(sink, context); + + List<String> bodies = Lists.newArrayList(); + + // push the events in two batches + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int j = 1; j <= totalRecords; j++) { + Event event = new SimpleEvent(); + String body = j + ",blah,This is a log message,other stuff"; + event.setBody(body.getBytes()); + bodies.add(body); + channel.put(event); + } + // execute sink to process the events + txn.commit(); + txn.close(); + + + checkRecordCountInTable(0, dbName, tblName); + for (int i = 0; i < batchCount ; i++) { + sink.process(); + } + sink.stop(); + checkRecordCountInTable(totalRecords, dbName, tblName); + } + + @Test + public void testSingleWriterSimpleUnPartitionedTable() + throws Exception { + TestUtil.dropDB(conf, dbName2); + String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2 + , null, dbLocation); + + try { + int totalRecords = 4; + int batchSize = 2; + int batchCount = totalRecords / batchSize; + + Context context = new Context(); + context.put("hive.metastore", metaStoreURI); + context.put("hive.database", dbName2); + context.put("hive.table", tblName2); + context.put("autoCreatePartitions","false"); + context.put("batchSize","" + batchSize); + context.put("serializer", HiveDelimitedTextSerializer.ALIAS); + context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + context.put("heartBeatInterval", "0"); + + Channel channel = startSink(sink, context); + + List<String> bodies = Lists.newArrayList(); + + // Push the events in two batches + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int j = 1; j <= totalRecords; j++) { + Event event = new SimpleEvent(); + String body = j + ",blah,This is a log message,other stuff"; + event.setBody(body.getBytes()); + bodies.add(body); + channel.put(event); + } + + txn.commit(); + txn.close(); + + checkRecordCountInTable(0, dbName2, tblName2); + for (int i = 0; i < batchCount ; i++) { + sink.process(); + } + + // check before & after stopping sink + checkRecordCountInTable(totalRecords, dbName2, tblName2); + sink.stop(); + checkRecordCountInTable(totalRecords, dbName2, tblName2); + } finally { + TestUtil.dropDB(conf, dbName2); + } + } + + @Test + public void testSingleWriterUseHeaders() + throws Exception { + String[] colNames = {COL1, COL2}; + String PART1_NAME = "country"; + String PART2_NAME = "hour"; + String[] partNames = {PART1_NAME, PART2_NAME}; + List<String> partitionVals = null; + String PART1_VALUE = "%{" + PART1_NAME + "}"; + String PART2_VALUE = "%y-%m-%d-%k"; + partitionVals = new ArrayList<String>(2); + partitionVals.add(PART1_VALUE); + partitionVals.add(PART2_VALUE); + + String tblName = "hourlydata"; + TestUtil.dropDB(conf, dbName2); + String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + TestUtil.createDbAndTable(driver, dbName2, tblName, partitionVals, colNames, + colTypes, partNames, dbLocation); + + int totalRecords = 4; + int batchSize = 2; + int batchCount = totalRecords / batchSize; + + Context context = new Context(); + context.put("hive.metastore",metaStoreURI); + context.put("hive.database",dbName2); + context.put("hive.table",tblName); + context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + context.put("autoCreatePartitions","true"); + context.put("useLocalTimeStamp", "false"); + context.put("batchSize","" + batchSize); + context.put("serializer", HiveDelimitedTextSerializer.ALIAS); + context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + context.put("heartBeatInterval", "0"); + + Channel channel = startSink(sink, context); + + Calendar eventDate = Calendar.getInstance(); + List<String> bodies = Lists.newArrayList(); + + // push events in two batches - two per batch. each batch is diff hour + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int j = 1; j <= totalRecords; j++) { + Event event = new SimpleEvent(); + String body = j + ",blah,This is a log message,other stuff"; + event.setBody(body.getBytes()); + eventDate.clear(); + eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm + event.getHeaders().put( "timestamp", + String.valueOf(eventDate.getTimeInMillis()) ); + event.getHeaders().put( PART1_NAME, "Asia" ); + bodies.add(body); + channel.put(event); + } + // execute sink to process the events + txn.commit(); + txn.close(); + + checkRecordCountInTable(0, dbName2, tblName); + for (int i = 0; i < batchCount ; i++) { + sink.process(); + } + checkRecordCountInTable(totalRecords, dbName2, tblName); + sink.stop(); + + // verify counters + SinkCounter counter = sink.getCounter(); + Assert.assertEquals(2, counter.getConnectionCreatedCount()); + Assert.assertEquals(2, counter.getConnectionClosedCount()); + Assert.assertEquals(2, counter.getBatchCompleteCount()); + Assert.assertEquals(0, counter.getBatchEmptyCount()); + Assert.assertEquals(0, counter.getConnectionFailedCount() ); + Assert.assertEquals(4, counter.getEventDrainAttemptCount()); + Assert.assertEquals(4, counter.getEventDrainSuccessCount() ); + + } + + @Test + public void testHeartBeat() + throws EventDeliveryException, IOException, CommandNeedRetryException { + int batchSize = 2; + int batchCount = 3; + int totalRecords = batchCount*batchSize; + Context context = new Context(); + context.put("hive.metastore", metaStoreURI); + context.put("hive.database", dbName); + context.put("hive.table", tblName); + context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + context.put("autoCreatePartitions","true"); + context.put("batchSize","" + batchSize); + context.put("serializer", HiveDelimitedTextSerializer.ALIAS); + context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + context.put("hive.txnsPerBatchAsk", "20"); + context.put("heartBeatInterval", "3"); // heartbeat in seconds + + Channel channel = startSink(sink, context); + + List<String> bodies = Lists.newArrayList(); + + // push the events in two batches + for (int i = 0; i < batchCount; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int j = 1; j <= batchSize; j++) { + Event event = new SimpleEvent(); + String body = i*j + ",blah,This is a log message,other stuff"; + event.setBody(body.getBytes()); + bodies.add(body); + channel.put(event); + } + // execute sink to process the events + txn.commit(); + txn.close(); + + sink.process(); + sleep(3000); // allow heartbeat to happen + } + + sink.stop(); + checkRecordCountInTable(totalRecords, dbName, tblName); + } + + @Test + public void testJsonSerializer() throws Exception { + int batchSize = 2; + int batchCount = 2; + int totalRecords = batchCount*batchSize; + Context context = new Context(); + context.put("hive.metastore",metaStoreURI); + context.put("hive.database",dbName); + context.put("hive.table",tblName); + context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + context.put("autoCreatePartitions","true"); + context.put("batchSize","" + batchSize); + context.put("serializer", HiveJsonSerializer.ALIAS); + context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + context.put("heartBeatInterval", "0"); + + Channel channel = startSink(sink, context); + + List<String> bodies = Lists.newArrayList(); + + // push the events in two batches + for (int i = 0; i < batchCount; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int j = 1; j <= batchSize; j++) { + Event event = new SimpleEvent(); + String body = "{\"id\" : 1, \"msg\" : \"using json serializer\"}"; + event.setBody(body.getBytes()); + bodies.add(body); + channel.put(event); + } + // execute sink to process the events + txn.commit(); + txn.close(); + + sink.process(); + } + checkRecordCountInTable(totalRecords, dbName, tblName); + sink.stop(); + checkRecordCountInTable(totalRecords, dbName, tblName); + } + + private void sleep(int n) { + try { + Thread.sleep(n); + } catch (InterruptedException e) { + } + } + + private static Channel startSink(HiveSink sink, Context context) { + Configurables.configure(sink, context); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + sink.setChannel(channel); + sink.start(); + return channel; + } + + private void checkRecordCountInTable(int expectedCount, String db, String tbl) + throws CommandNeedRetryException, IOException { + int count = TestUtil.listRecordsInTable(driver, db, tbl).size(); + Assert.assertEquals(expectedCount, count); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5dfd3d96/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java new file mode 100644 index 0000000..174f179 --- /dev/null +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import junit.framework.Assert; +import org.apache.flume.Context; +import org.apache.flume.event.SimpleEvent; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TestHiveWriter { + final static String dbName = "testing"; + final static String tblName = "alerts"; + + public static final String PART1_NAME = "continent"; + public static final String PART2_NAME = "country"; + public static final String[] partNames = { PART1_NAME, PART2_NAME }; + + private static final String COL1 = "id"; + private static final String COL2 = "msg"; + final String[] colNames = {COL1,COL2}; + private String[] colTypes = { "int", "string" }; + + private static final String PART1_VALUE = "Asia"; + private static final String PART2_VALUE = "India"; + private final ArrayList<String> partVals; + + private final String metaStoreURI; + + private HiveDelimitedTextSerializer serializer; + + private final HiveConf conf; + + private ExecutorService callTimeoutPool; + int timeout = 10000; // msec + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + private final Driver driver; + + public TestHiveWriter() throws Exception { + partVals = new ArrayList<String>(2); + partVals.add(PART1_VALUE); + partVals.add(PART2_VALUE); + + metaStoreURI = null; + + int callTimeoutPoolSize = 1; + callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize, + new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build()); + + // 1) Start metastore + conf = new HiveConf(this.getClass()); + TestUtil.setConfValues(conf); + if (metaStoreURI != null) { + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); + } + + // 2) Setup Hive client + SessionState.start(new CliSessionState(conf)); + driver = new Driver(conf); + + } + + @Before + public void setUp() throws Exception { + // 1) prepare hive + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + + // 1) Setup tables + TestUtil.dropDB(conf, dbName); + String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes + , partNames, dbLocation); + + // 2) Setup serializer + Context ctx = new Context(); + ctx.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); + serializer = new HiveDelimitedTextSerializer(); + serializer.configure(ctx); + } + + @Test + public void testInstantiate() throws Exception { + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter); + + writer.close(); + } + + @Test + public void testWriteBasic() throws Exception { + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter); + + writeEvents(writer,3); + writer.flush(false); + writer.close(); + checkRecordCountInTable(3); + } + + @Test + public void testWriteMultiFlush() throws Exception { + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter); + + checkRecordCountInTable(0); + SimpleEvent event = new SimpleEvent(); + + String REC1 = "1,xyz,Hello world,abc"; + event.setBody(REC1.getBytes()); + writer.write(event); + checkRecordCountInTable(0); + writer.flush(true); + checkRecordCountInTable(1); + + String REC2 = "2,xyz,Hello world,abc"; + event.setBody(REC2.getBytes()); + writer.write(event); + checkRecordCountInTable(1); + writer.flush(true); + checkRecordCountInTable(2); + + String REC3 = "3,xyz,Hello world,abc"; + event.setBody(REC3.getBytes()); + writer.write(event); + writer.flush(true); + checkRecordCountInTable(3); + writer.close(); + + checkRecordCountInTable(3); + } + + private void checkRecordCountInTable(int expectedCount) + throws CommandNeedRetryException, IOException { + int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size(); + Assert.assertEquals(expectedCount, count); + } + + /** + * Sets up input fields to have same order as table columns, + * Also sets the separator on serde to be same as i/p field separator + * @throws Exception + */ + @Test + public void testInOrderWrite() throws Exception { + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + int timeout = 5000; // msec + + HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer(); + Context ctx = new Context(); + ctx.put("serializer.fieldnames", COL1 + "," + COL2); + ctx.put("serializer.serdeSeparator", ","); + serializer2.configure(ctx); + + + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, + "flumetest", serializer2, sinkCounter); + + SimpleEvent event = new SimpleEvent(); + event.setBody("1,Hello world 1".getBytes()); + writer.write(event); + event.setBody("2,Hello world 2".getBytes()); + writer.write(event); + event.setBody("3,Hello world 3".getBytes()); + writer.write(event); + writer.flush(false); + writer.close(); + } + + @Test + public void testSerdeSeparatorCharParsing() throws Exception { + HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); + int timeout = 10000; // msec + + // 1) single character serdeSeparator + HiveDelimitedTextSerializer serializer1 = new HiveDelimitedTextSerializer(); + Context ctx = new Context(); + ctx.put("serializer.fieldnames", COL1 + "," + COL2); + ctx.put("serializer.serdeSeparator", ","); + serializer1.configure(ctx); + // show not throw + + + // 2) special character as serdeSeparator + HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer(); + ctx = new Context(); + ctx.put("serializer.fieldnames", COL1 + "," + COL2); + ctx.put("serializer.serdeSeparator", "'\t'"); + serializer2.configure(ctx); + // show not throw + + + // 2) bad spec as serdeSeparator + HiveDelimitedTextSerializer serializer3 = new HiveDelimitedTextSerializer(); + ctx = new Context(); + ctx.put("serializer.fieldnames", COL1 + "," + COL2); + ctx.put("serializer.serdeSeparator", "ab"); + try { + serializer3.configure(ctx); + Assert.assertTrue("Bad serdeSeparator character was accepted" ,false); + } catch (Exception e){ + // expect an exception + } + + } + + + @Test + public void testSecondWriterBeforeFirstCommits() throws Exception { + // here we open a new writer while the first is still writing (not committed) + HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + ArrayList<String> partVals2 = new ArrayList<String>(2); + partVals2.add(PART1_VALUE); + partVals2.add("Nepal"); + HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2); + + SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName()); + SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName()); + + HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter1); + + writeEvents(writer1, 3); + + HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter2); + writeEvents(writer2, 3); + writer2.flush(false); // commit + + writer1.flush(false); // commit + writer1.close(); + + writer2.close(); + } + + + @Test + public void testSecondWriterAfterFirstCommits() throws Exception { + // here we open a new writer after the first writer has committed one txn + HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); + ArrayList<String> partVals2 = new ArrayList<String>(2); + partVals2.add(PART1_VALUE); + partVals2.add("Nepal"); + HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2); + + SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName()); + SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName()); + + HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter1); + + writeEvents(writer1, 3); + + writer1.flush(false); // commit + + + HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout + , callTimeoutPool, "flumetest", serializer, sinkCounter2); + writeEvents(writer2, 3); + writer2.flush(false); // commit + + + writer1.close(); + writer2.close(); + } + + + private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException { + SimpleEvent event = new SimpleEvent(); + for (int i = 1; i <= count; i++) { + event.setBody((i + ",xyz,Hello world,abc").getBytes()); + writer.write(event); + } + } +}
