Repository: hive Updated Branches: refs/heads/branch-1.2 07c86120e -> a3f718f7f
HIVE-15691 Create StrictRegexWriter to work with RegexSerializer for Flume Hive Sink (Kalyan, via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3f718f7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3f718f7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3f718f7 Branch: refs/heads/branch-1.2 Commit: a3f718f7f7b1d850e67a3019cbf99d9699e09d7d Parents: 07c8612 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Mar 28 08:15:57 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Mar 28 08:15:57 2017 -0700 ---------------------------------------------------------------------- .../hcatalog/streaming/StrictRegexWriter.java | 156 +++++++++++++++++++ .../hive/hcatalog/streaming/TestStreaming.java | 86 ++++++++-- 2 files changed, 230 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a3f718f7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java new file mode 100644 index 0000000..c76f582 --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.RegexSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.io.Text; + +/** + * Streaming Writer handles text input data with regex. Uses + * org.apache.hadoop.hive.serde2.RegexSerDe + */ +public class StrictRegexWriter extends AbstractRecordWriter { + private RegexSerDe serde; + private String regex; + + /** + * + * @param endPoint the end point to write to + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint, null); + } + + /** + * + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint, conf); + } + + + /** + * + * @param regex to parse the data + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint, conf); + this.regex = regex; + } + + @Override + SerDe getSerde() throws SerializationError { + if(serde!=null) { + return serde; + } + serde = createSerde(tbl, conf, regex); + return serde; + } + + @Override + public void write(long transactionId, byte[] record) + throws StreamingIOFailure, SerializationError { + try { + Object encodedRow = encode(record); + updater.insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction(" + + transactionId + ")", e); + } + + } + + /** + * Creates RegexSerDe + * @param tbl used to create serde + * @param conf used to create serde + * @param regex used to create serde + * @return + * @throws SerializationError if serde could not be initialized + */ + private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); + ArrayList<String> tableColumns = getCols(tbl); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ",")); + RegexSerDe serde = new RegexSerDe(); + SerDeUtils.initializeSerDe(serde, conf, tableProps, null); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e); + } + } + + private static ArrayList<String> getCols(Table table) { + List<FieldSchema> cols = table.getSd().getCols(); + ArrayList<String> colNames = new ArrayList<String>(cols.size()); + for (FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + /** + * Encode Utf8 encoded string bytes using RegexSerde + * @param utf8StrRecord + * @return The encoded object + * @throws SerializationError + */ + private Object encode(byte[] utf8StrRecord) throws SerializationError { + try { + Text blob = new Text(utf8StrRecord); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/a3f718f7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 329e5da..7343ef4 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -18,7 +18,16 @@ package org.apache.hive.hcatalog.streaming; -import junit.framework.Assert; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -58,15 +67,7 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import junit.framework.Assert; public class TestStreaming { @@ -216,9 +217,9 @@ public class TestStreaming { NullWritable key = rr.createKey(); OrcStruct value = rr.createValue(); - for (int i = 0; i < records.length; i++) { + for (String record : records) { Assert.assertEquals(true, rr.next(key, value)); - Assert.assertEquals(records[i], value.toString()); + Assert.assertEquals(record, value.toString()); } Assert.assertEquals(false, rr.next(key, value)); } @@ -391,6 +392,67 @@ public class TestStreaming { connection.close(); } + private void testTransactionBatchCommit_Regex() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + StreamingConnection connection = endPt.newConnection(true); + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf); + + // 1st Txn + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + // 2nd Txn + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("2,Welcome to streaming".getBytes()); + + // data should not be visible + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + + txnBatch.commit(); + + checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + + connection.close(); + + + // To Unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + connection = endPt.newConnection(true); + regex = "([^:]*):(.*)"; + writer = new StrictRegexWriter(regex, endPt, conf); + + // 1st Txn + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1:Hello streaming".getBytes()); + txnBatch.commit(); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + @Test public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,