HIVE-14114 Ensure RecordWriter in streaming API is using the same UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f24aa1d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f24aa1d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f24aa1d Branch: refs/heads/branch-1 Commit: 8f24aa1d0a72440a05ff91cd27a9c23cc94e7e87 Parents: ed7cce0 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Jul 8 19:30:48 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Jul 8 19:30:48 2016 -0700 ---------------------------------------------------------------------- .../streaming/AbstractRecordWriter.java | 61 +++++++++++++------ .../streaming/DelimitedInputWriter.java | 46 ++++++++++---- .../hive/hcatalog/streaming/HiveEndPoint.java | 10 +++- .../hcatalog/streaming/StreamingConnection.java | 6 ++ .../hcatalog/streaming/StrictJsonWriter.java | 26 +++++--- .../hive/hcatalog/streaming/TestStreaming.java | 63 +++++++++++--------- 6 files changed, 148 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 34bc0cc..898f3c7 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -37,12 +37,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.thrift.TException; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -68,34 +70,59 @@ abstract class AbstractRecordWriter implements RecordWriter { private Long curBatchMinTxnId; private Long curBatchMaxTxnId; + private static final class TableWriterPair { + private final Table tbl; + private final Path partitionPath; + TableWriterPair(Table t, Path p) { + tbl = t; + partitionPath = p; + } + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) - throws ConnectionError, StreamingException { - this.endPoint = endPoint; + throws ConnectionError, StreamingException { + this(endPoint, conf, null); + } + protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn) + throws StreamingException { + this.endPoint = endPoint2; this.conf = conf!=null ? conf : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); try { msClient = HCatUtil.getHiveMetastoreClient(this.conf); - this.tbl = msClient.getTable(endPoint.database, endPoint.table); - this.partitionPath = getPathForEndPoint(msClient, endPoint); + UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null; + if (ugi == null) { + this.tbl = msClient.getTable(endPoint.database, endPoint.table); + this.partitionPath = getPathForEndPoint(msClient, endPoint); + } else { + TableWriterPair twp = ugi.doAs( + new PrivilegedExceptionAction<TableWriterPair>() { + @Override + public TableWriterPair run() throws Exception { + return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table), + getPathForEndPoint(msClient, endPoint)); + } + }); + this.tbl = twp.tbl; + this.partitionPath = twp.partitionPath; + } this.totalBuckets = tbl.getSd().getNumBuckets(); - if(totalBuckets <= 0) { + if (totalBuckets <= 0) { throw new StreamingException("Cannot stream to table that has not been bucketed : " - + endPoint); + + endPoint); } - this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ; + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); this.bucketFieldData = new Object[bucketIds.size()]; String outFormatName = this.tbl.getSd().getOutputFormat(); - outf = (AcidOutputFormat<?,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); + outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); bucketFieldData = new Object[bucketIds.size()]; - } catch (MetaException e) { - throw new ConnectionError(endPoint, e); - } catch (NoSuchObjectException e) { - throw new ConnectionError(endPoint, e); - } catch (TException e) { - throw new StreamingException(e.getMessage(), e); - } catch (ClassNotFoundException e) { - throw new StreamingException(e.getMessage(), e); - } catch (IOException e) { + } catch(InterruptedException e) { + throw new StreamingException(endPoint2.toString(), e); + } catch (MetaException | NoSuchObjectException e) { + throw new ConnectionError(endPoint2, e); + } catch (TException | ClassNotFoundException | IOException e) { throw new StreamingException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index a522feb..2356a9b 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -73,12 +73,11 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint) - throws ClassNotFoundException, ConnectionError, SerializationError, - InvalidColumn, StreamingException { - this(colNamesForFields, delimiter, endPoint, null); + HiveEndPoint endPoint, StreamingConnection conn) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, conn); } - /** Constructor. Uses default separator of the LazySimpleSerde * @param colNamesForFields Column name assignment for input fields. nulls or empty * strings in the array indicates the fields to be skipped @@ -92,13 +91,12 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint, HiveConf conf) + HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) throws ClassNotFoundException, ConnectionError, SerializationError, InvalidColumn, StreamingException { this(colNamesForFields, delimiter, endPoint, conf, - (char) LazySerDeParameters.DefaultSeparators[0]); + (char) LazySerDeParameters.DefaultSeparators[0], conn); } - /** * Constructor. Allows overriding separator of the LazySimpleSerde * @param colNamesForFields Column name assignment for input fields @@ -108,6 +106,7 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @param serdeSeparator separator used when encoding data that is fed into the * LazySimpleSerde. Ensure this separator does not occur * in the field data + * @param conn connection this Writer is to be used with * @throws ConnectionError Problem talking to Hive * @throws ClassNotFoundException Serde class not found * @throws SerializationError Serde initialization/interaction failed @@ -115,10 +114,10 @@ public class DelimitedInputWriter extends AbstractRecordWriter { * @throws InvalidColumn any element in colNamesForFields refers to a non existing column */ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, - HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn) throws ClassNotFoundException, ConnectionError, SerializationError, InvalidColumn, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.tableColumns = getCols(tbl); this.serdeSeparator = serdeSeparator; this.delimiter = delimiter; @@ -143,6 +142,33 @@ public class DelimitedInputWriter extends AbstractRecordWriter { bucketStructFields[i] = allFields.get(bucketIds.get(i)); } } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, null, null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf) + throws ClassNotFoundException, ConnectionError, SerializationError, + InvalidColumn, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, + (char) LazySerDeParameters.DefaultSeparators[0], null); + } + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)} + */ + public DelimitedInputWriter(String[] colNamesForFields, String delimiter, + HiveEndPoint endPoint, HiveConf conf, char serdeSeparator) + throws ClassNotFoundException, StreamingException { + this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null); + } private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) { return !( delimiter.equals(String.valueOf(getSerdeSeparator())) http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index d268570..b34a2ed 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -97,7 +97,7 @@ public class HiveEndPoint { /** - * @deprecated Use {@link #newConnection(boolean, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed @@ -105,7 +105,7 @@ public class HiveEndPoint { return newConnection(createPartIfNotExists, null, null, null); } /** - * @deprecated Use {@link #newConnection(boolean, HiveConf, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf) throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed @@ -113,7 +113,7 @@ public class HiveEndPoint { return newConnection(createPartIfNotExists, conf, null, null); } /** - * @deprecated Use {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} + * @deprecated As of release 1.3/2.1. Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)} */ public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf, final UserGroupInformation authenticatedUser) @@ -395,6 +395,10 @@ public class HiveEndPoint { } } + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } /** * Acquires a new batch of transactions from Hive. http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java index 25acff0..8785a21 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java @@ -18,6 +18,8 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.security.UserGroupInformation; + /** * Represents a connection to a HiveEndPoint. Used to acquire transaction batches. */ @@ -46,4 +48,8 @@ public interface StreamingConnection { */ public void close(); + /** + * @return UserGroupInformation associated with this connection or {@code null} if there is none + */ + UserGroupInformation getUserGroupInformation(); } http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 3e3de68..4cd8161 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -46,28 +46,40 @@ public class StrictJsonWriter extends AbstractRecordWriter { private final StructField[] bucketStructFields; /** - * + * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ + public StrictJsonWriter(HiveEndPoint endPoint) + throws ConnectionError, SerializationError, StreamingException { + this(endPoint, null, null); + } + + /** + * @deprecated As of release 1.3/2.1. Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)} + */ + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException { + this(endPoint, conf, null); + } + /** * @param endPoint the end point to write to * @throws ConnectionError * @throws SerializationError * @throws StreamingException */ - public StrictJsonWriter(HiveEndPoint endPoint) + public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn) throws ConnectionError, SerializationError, StreamingException { - this(endPoint, null); + this(endPoint, null, conn); } - /** - * * @param endPoint the end point to write to * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with * @throws ConnectionError * @throws SerializationError * @throws StreamingException */ - public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) + public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) throws ConnectionError, SerializationError, StreamingException { - super(endPoint, conf); + super(endPoint, conf, conn); this.serde = createSerde(tbl, conf); // get ObjInspectors for entire record and bucketed cols try { http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index f1ff712..bcf88a1 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -81,11 +81,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; +import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -301,11 +303,11 @@ public class TestStreaming { List<String> partitionVals = new ArrayList<String>(); partitionVals.add("2015"); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", - "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); - StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -563,8 +565,8 @@ public class TestStreaming { // 1) to partitioned table HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -642,8 +644,8 @@ public class TestStreaming { @Test public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); txnBatch.beginNextTransaction(); @@ -671,8 +673,8 @@ public class TestStreaming { // 1) to partitioned table HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -698,28 +700,35 @@ public class TestStreaming { @Test public void testTransactionBatchCommit_Delimited() throws Exception { + testTransactionBatchCommit_Delimited(null); + } + @Test + public void testTransactionBatchCommit_DelimitedUGI() throws Exception { + testTransactionBatchCommit_Delimited(Utils.getUGI()); + } + private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); - StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + partitionVals); + StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); // 2nd Txn txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -728,11 +737,11 @@ public class TestStreaming { txnBatch.commit(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + "{2, Welcome to streaming}"); txnBatch.close(); Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); @@ -740,19 +749,19 @@ public class TestStreaming { // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); - writer = new DelimitedInputWriter(fieldNames,",", endPt); - connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); // 1st Txn txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); } @@ -760,8 +769,8 @@ public class TestStreaming { public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - StrictJsonWriter writer = new StrictJsonWriter(endPt); StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName()); + StrictJsonWriter writer = new StrictJsonWriter(endPt, connection); // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -845,8 +854,8 @@ public class TestStreaming { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); @@ -873,8 +882,8 @@ public class TestStreaming { String agentInfo = "UT_" + Thread.currentThread().getName(); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); - DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -1174,8 +1183,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1187,8 +1196,8 @@ public class TestStreaming { HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); - DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); StreamingConnection connection2 = endPt2.newConnection(false, agentInfo); + DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection); TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); txnBatch2.beginNextTransaction(); @@ -1251,8 +1260,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1323,8 +1332,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); // we need side file for this test, so we create 2 txn batch and test with only one TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); @@ -1449,8 +1458,8 @@ public class TestStreaming { // 2) Insert data into both tables HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); - DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.beginNextTransaction(); @@ -1671,9 +1680,9 @@ public class TestStreaming { runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null); - DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt); - FaultyWriter writer = new FaultyWriter(innerWriter); StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection); + FaultyWriter writer = new FaultyWriter(innerWriter); TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); txnBatch.close();