Repository: hive Updated Branches: refs/heads/master 8613ef200 -> ea3be9549
HIVE-15691 Create StrictRegexWriter to work with RegexSerializer for Flume Hive Sink (Kalyan via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea3be954 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea3be954 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea3be954 Branch: refs/heads/master Commit: ea3be9549dca7eaed5e838bbcb69d2372817ce42 Parents: 8613ef2 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Wed Mar 22 13:22:08 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Wed Mar 22 13:22:08 2017 -0700 ---------------------------------------------------------------------- .../hcatalog/streaming/StrictRegexWriter.java | 188 +++++++++++++++++++ .../hive/hcatalog/streaming/TestStreaming.java | 81 +++++++- 2 files changed, 263 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ea3be954/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java new file mode 100644 index 0000000..78987ab --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.RegexSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Text; + +/** + * Streaming Writer handles text input data with regex. Uses + * org.apache.hadoop.hive.serde2.RegexSerDe + */ +public class StrictRegexWriter extends AbstractRecordWriter { + private RegexSerDe serde; + private final StructObjectInspector recordObjInspector; + private final ObjectInspector[] bucketObjInspectors; + private final StructField[] bucketStructFields; + + /** + * @param endPoint the end point to write to + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + this(null, endPoint, null, conn); + } + + /** + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + this(null, endPoint, conf, conn); + } + + /** + * @param regex to parse the data + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint, conf, conn); + this.serde = createSerde(tbl, conf, regex); + // get ObjInspectors for entire record and bucketed cols + try { + recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + } + + @Override + public AbstractSerDe getSerde() { + return serde; + } + + @Override + protected StructObjectInspector getRecordObjectInspector() { + return recordObjInspector; + } + + @Override + protected StructField[] getBucketStructFields() { + return bucketStructFields; + } + + @Override + protected ObjectInspector[] getBucketObjectInspectors() { + return bucketObjInspectors; + } + + + @Override + public void write(long transactionId, byte[] record) + throws StreamingIOFailure, SerializationError { + try { + Object encodedRow = encode(record); + int bucket = getBucket(encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction(" + + transactionId + ")", e); + } + } + + /** + * Creates RegexSerDe + * @param tbl used to create serde + * @param conf used to create serde + * @param regex used to create serde + * @return + * @throws SerializationError if serde could not be initialized + */ + private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); + ArrayList<String> tableColumns = getCols(tbl); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ",")); + RegexSerDe serde = new RegexSerDe(); + SerDeUtils.initializeSerDe(serde, conf, tableProps, null); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e); + } + } + + private static ArrayList<String> getCols(Table table) { + List<FieldSchema> cols = table.getSd().getCols(); + ArrayList<String> colNames = new ArrayList<String>(cols.size()); + for (FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + /** + * Encode Utf8 encoded string bytes using RegexSerDe + * + * @param utf8StrRecord + * @return The encoded object + * @throws SerializationError + */ + @Override + public Object encode(byte[] utf8StrRecord) throws SerializationError { + try { + Text blob = new Text(utf8StrRecord); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/ea3be954/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index bf29993..8ea58e6 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -64,10 +64,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.orc.tools.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -82,11 +78,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; +import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -485,9 +485,9 @@ public class TestStreaming { NullWritable key = rr.createKey(); OrcStruct value = rr.createValue(); - for (int i = 0; i < records.length; i++) { + for (String record : records) { Assert.assertEquals(true, rr.next(key, value)); - Assert.assertEquals(records[i], value.toString()); + Assert.assertEquals(record, value.toString()); } Assert.assertEquals(false, rr.next(key, value)); } @@ -787,6 +787,75 @@ public class TestStreaming { } @Test + public void testTransactionBatchCommit_Regex() throws Exception { + testTransactionBatchCommit_Regex(null); + } + @Test + public void testTransactionBatchCommit_RegexUGI() throws Exception { + testTransactionBatchCommit_Regex(Utils.getUGI()); + } + private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection); + + // 1st Txn + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + // 2nd Txn + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("2,Welcome to streaming".getBytes()); + + // data should not be visible + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + + txnBatch.commit(); + + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + + connection.close(); + + + // To Unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + regex = "([^:]*):(.*)"; + writer = new StrictRegexWriter(regex, endPt, conf, connection); + + // 1st Txn + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1:Hello streaming".getBytes()); + txnBatch.commit(); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);