Repository: hive
Updated Branches:
  refs/heads/branch-1.2 07c86120e -> a3f718f7f


HIVE-15691 Create StrictRegexWriter to work with RegexSerializer for Flume Hive 
Sink (Kalyan, via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3f718f7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3f718f7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3f718f7

Branch: refs/heads/branch-1.2
Commit: a3f718f7f7b1d850e67a3019cbf99d9699e09d7d
Parents: 07c8612
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Tue Mar 28 08:15:57 2017 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Tue Mar 28 08:15:57 2017 -0700

----------------------------------------------------------------------
 .../hcatalog/streaming/StrictRegexWriter.java   | 156 +++++++++++++++++++
 .../hive/hcatalog/streaming/TestStreaming.java  |  86 ++++++++--
 2 files changed, 230 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a3f718f7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
new file mode 100644
index 0000000..c76f582
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+  private RegexSerDe serde;
+  private String regex;
+
+  /**
+   *
+   * @param endPoint the end point to write to
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, null);
+  }
+
+  /**
+   *
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive 
settings.
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf);
+  }
+  
+  
+  /**
+  *
+  * @param regex to parse the data
+  * @param endPoint the end point to write to
+  * @param conf a Hive conf object. Should be null if not using advanced Hive 
settings.
+  * @throws ConnectionError
+  * @throws SerializationError
+  * @throws StreamingException
+  */
+ public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf)
+         throws ConnectionError, SerializationError, StreamingException {
+   super(endPoint, conf);
+   this.regex = regex;
+ }
+
+  @Override
+  SerDe getSerde() throws SerializationError {
+    if(serde!=null) {
+      return serde;
+    }
+    serde = createSerde(tbl, conf, regex);
+    return serde;
+  }
+
+  @Override
+  public void write(long transactionId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      updater.insert(transactionId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction("
+              + transactionId + ")", e);
+    }
+
+  }
+
+  /**
+   * Creates RegexSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @param regex  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+      ArrayList<String> tableColumns = getCols(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(tableColumns, ","));
+      RegexSerDe serde = new RegexSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + 
RegexSerDe.class.getName(), e);
+    }
+  }
+  
+  private static ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return colNames;
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using RegexSerde
+   * @param utf8StrRecord
+   * @return  The encoded object
+   * @throws SerializationError
+   */
+  private Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into 
Object", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a3f718f7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 329e5da..7343ef4 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -18,7 +18,16 @@
 
 package org.apache.hive.hcatalog.streaming;
 
-import junit.framework.Assert;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -58,15 +67,7 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import junit.framework.Assert;
 
 
 public class TestStreaming {
@@ -216,9 +217,9 @@ public class TestStreaming {
 
     NullWritable key = rr.createKey();
     OrcStruct value = rr.createValue();
-    for (int i = 0; i < records.length; i++) {
+    for (String record : records) {
       Assert.assertEquals(true, rr.next(key, value));
-      Assert.assertEquals(records[i], value.toString());
+      Assert.assertEquals(record, value.toString());
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
@@ -391,6 +392,67 @@ public class TestStreaming {
     connection.close();
   }
 
+  private void testTransactionBatchCommit_Regex() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true);
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true);
+    regex = "([^:]*):(.*)";
+    writer = new StrictRegexWriter(regex, endPt, conf);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1:Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  
   @Test
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,

Reply via email to