HIVE-14114 Ensure RecordWriter in streaming API is using the same 
UserGroupInformation as StreamingConnection (Eugene Koifman, reviewed by Wei 
Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f24aa1d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f24aa1d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f24aa1d

Branch: refs/heads/branch-1
Commit: 8f24aa1d0a72440a05ff91cd27a9c23cc94e7e87
Parents: ed7cce0
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Fri Jul 8 19:30:48 2016 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Fri Jul 8 19:30:48 2016 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         | 61 +++++++++++++------
 .../streaming/DelimitedInputWriter.java         | 46 ++++++++++----
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 10 +++-
 .../hcatalog/streaming/StreamingConnection.java |  6 ++
 .../hcatalog/streaming/StrictJsonWriter.java    | 26 +++++---
 .../hive/hcatalog/streaming/TestStreaming.java  | 63 +++++++++++---------
 6 files changed, 148 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 34bc0cc..898f3c7 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -37,12 +37,14 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
 
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -68,34 +70,59 @@ abstract class AbstractRecordWriter implements RecordWriter 
{
   private Long curBatchMinTxnId;
   private Long curBatchMaxTxnId;
 
+  private static final class TableWriterPair {
+    private final Table tbl;
+    private final Path partitionPath;
+    TableWriterPair(Table t, Path p) {
+      tbl = t;
+      partitionPath = p;
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
   protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
-          throws ConnectionError, StreamingException {
-    this.endPoint = endPoint;
+    throws ConnectionError, StreamingException {
+    this(endPoint, conf, null);
+  }
+  protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, 
StreamingConnection conn)
+          throws StreamingException {
+    this.endPoint = endPoint2;
     this.conf = conf!=null ? conf
                 : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, 
endPoint.metaStoreUri);
     try {
       msClient = HCatUtil.getHiveMetastoreClient(this.conf);
-      this.tbl = msClient.getTable(endPoint.database, endPoint.table);
-      this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() 
: null;
+      if (ugi == null) {
+        this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+        this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      } else {
+        TableWriterPair twp = ugi.doAs(
+          new PrivilegedExceptionAction<TableWriterPair>() {
+            @Override
+            public TableWriterPair run() throws Exception {
+              return new TableWriterPair(msClient.getTable(endPoint.database, 
endPoint.table),
+                getPathForEndPoint(msClient, endPoint));
+            }
+          });
+        this.tbl = twp.tbl;
+        this.partitionPath = twp.partitionPath;
+      }
       this.totalBuckets = tbl.getSd().getNumBuckets();
-      if(totalBuckets <= 0) {
+      if (totalBuckets <= 0) {
         throw new StreamingException("Cannot stream to table that has not been 
bucketed : "
-                + endPoint);
+          + endPoint);
       }
-      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), 
tbl.getSd().getCols()) ;
+      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), 
tbl.getSd().getCols());
       this.bucketFieldData = new Object[bucketIds.size()];
       String outFormatName = this.tbl.getSd().getOutputFormat();
-      outf = (AcidOutputFormat<?,?>) 
ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+      outf = (AcidOutputFormat<?, ?>) 
ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
       bucketFieldData = new Object[bucketIds.size()];
-    } catch (MetaException e) {
-      throw new ConnectionError(endPoint, e);
-    } catch (NoSuchObjectException e) {
-      throw new ConnectionError(endPoint, e);
-    } catch (TException e) {
-      throw new StreamingException(e.getMessage(), e);
-    } catch (ClassNotFoundException e) {
-      throw new StreamingException(e.getMessage(), e);
-    } catch (IOException e) {
+    } catch(InterruptedException e) {
+      throw new StreamingException(endPoint2.toString(), e);
+    } catch (MetaException | NoSuchObjectException e) {
+      throw new ConnectionError(endPoint2, e);
+    } catch (TException | ClassNotFoundException | IOException e) {
       throw new StreamingException(e.getMessage(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index a522feb..2356a9b 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -73,12 +73,11 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
    * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
    */
   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint)
-          throws ClassNotFoundException, ConnectionError, SerializationError,
-                 InvalidColumn, StreamingException {
-    this(colNamesForFields, delimiter, endPoint, null);
+                              HiveEndPoint endPoint, StreamingConnection conn)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+      InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, conn);
   }
-
  /** Constructor. Uses default separator of the LazySimpleSerde
   * @param colNamesForFields Column name assignment for input fields. nulls or 
empty
   *                          strings in the array indicates the fields to be 
skipped
@@ -92,13 +91,12 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
   * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
   */
    public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint, HiveConf conf)
+                              HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
           throws ClassNotFoundException, ConnectionError, SerializationError,
                  InvalidColumn, StreamingException {
      this(colNamesForFields, delimiter, endPoint, conf,
-             (char) LazySerDeParameters.DefaultSeparators[0]);
+       (char) LazySerDeParameters.DefaultSeparators[0], conn);
    }
-
   /**
    * Constructor. Allows overriding separator of the LazySimpleSerde
    * @param colNamesForFields Column name assignment for input fields
@@ -108,6 +106,7 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
    * @param serdeSeparator separator used when encoding data that is fed into 
the
    *                             LazySimpleSerde. Ensure this separator does 
not occur
    *                             in the field data
+   * @param conn connection this Writer is to be used with
    * @throws ConnectionError Problem talking to Hive
    * @throws ClassNotFoundException Serde class not found
    * @throws SerializationError Serde initialization/interaction failed
@@ -115,10 +114,10 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
    * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
    */
   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
-                              HiveEndPoint endPoint, HiveConf conf, char 
serdeSeparator)
+                              HiveEndPoint endPoint, HiveConf conf, char 
serdeSeparator, StreamingConnection conn)
           throws ClassNotFoundException, ConnectionError, SerializationError,
                  InvalidColumn, StreamingException {
-    super(endPoint, conf);
+    super(endPoint, conf, conn);
     this.tableColumns = getCols(tbl);
     this.serdeSeparator = serdeSeparator;
     this.delimiter = delimiter;
@@ -143,6 +142,33 @@ public class DelimitedInputWriter extends 
AbstractRecordWriter {
       bucketStructFields[i] = allFields.get(bucketIds.get(i));
     }
   }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, 
StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf,
+      (char) LazySerDeParameters.DefaultSeparators[0], null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, 
StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char 
serdeSeparator)
+    throws ClassNotFoundException, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+  }
 
   private boolean isReorderingNeeded(String delimiter, ArrayList<String> 
tableColumns) {
     return !( delimiter.equals(String.valueOf(getSerdeSeparator()))

http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index d268570..b34a2ed 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -97,7 +97,7 @@ public class HiveEndPoint {
 
 
   /**
-   * @deprecated Use {@link #newConnection(boolean, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, String)}
    */
   public StreamingConnection newConnection(final boolean createPartIfNotExists)
     throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
@@ -105,7 +105,7 @@ public class HiveEndPoint {
     return newConnection(createPartIfNotExists, null, null, null);
   }
   /**
-   * @deprecated Use {@link #newConnection(boolean, HiveConf, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, HiveConf, String)}
    */
   public StreamingConnection newConnection(final boolean 
createPartIfNotExists, HiveConf conf)
     throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
@@ -113,7 +113,7 @@ public class HiveEndPoint {
     return newConnection(createPartIfNotExists, conf, null, null);
   }
   /**
-   * @deprecated Use {@link #newConnection(boolean, HiveConf, 
UserGroupInformation, String)}
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, HiveConf, UserGroupInformation, String)}
    */
   public StreamingConnection newConnection(final boolean 
createPartIfNotExists, final HiveConf conf,
                                            final UserGroupInformation 
authenticatedUser)
@@ -395,6 +395,10 @@ public class HiveEndPoint {
       }
     }
 
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return ugi;
+    }
 
     /**
      * Acquires a new batch of transactions from Hive.

http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
index 25acff0..8785a21 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StreamingConnection.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.hcatalog.streaming;
 
+import org.apache.hadoop.security.UserGroupInformation;
+
 /**
  * Represents a connection to a HiveEndPoint. Used to acquire transaction 
batches.
  */
@@ -46,4 +48,8 @@ public interface StreamingConnection {
    */
   public void close();
 
+  /**
+   * @return UserGroupInformation associated with this connection or {@code 
null} if there is none
+   */
+  UserGroupInformation getUserGroupInformation();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 3e3de68..4cd8161 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -46,28 +46,40 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   private final StructField[] bucketStructFields;
 
   /**
-   *
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, null);
+  }
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws 
StreamingException {
+    this(endPoint, conf, null);
+  }
+  /**
    * @param endPoint the end point to write to
    * @throws ConnectionError
    * @throws SerializationError
    * @throws StreamingException
    */
-  public StrictJsonWriter(HiveEndPoint endPoint)
+  public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
           throws ConnectionError, SerializationError, StreamingException {
-    this(endPoint, null);
+    this(endPoint, null, conn);
   }
-
   /**
-   *
    * @param endPoint the end point to write to
    * @param conf a Hive conf object. Should be null if not using advanced Hive 
settings.
+   * @param conn connection this Writer is to be used with
    * @throws ConnectionError
    * @throws SerializationError
    * @throws StreamingException
    */
-  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
           throws ConnectionError, SerializationError, StreamingException {
-    super(endPoint, conf);
+    super(endPoint, conf, conn);
     this.serde = createSerde(tbl, conf);
     // get ObjInspectors for entire record and bucketed cols
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/8f24aa1d/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index f1ff712..bcf88a1 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -81,11 +81,13 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -301,11 +303,11 @@ public class TestStreaming {
     List<String> partitionVals = new ArrayList<String>();
     partitionVals.add("2015");
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", 
"store_sales", partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
       "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", 
"ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
       "ss_wholesale_cost", "ss_list_price", "ss_sales_price", 
"ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
-      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -563,8 +565,8 @@ public class TestStreaming {
     // 1)  to partitioned table
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
 
@@ -642,8 +644,8 @@ public class TestStreaming {
   @Test
   public void testHeartbeat() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
     txnBatch.beginNextTransaction();
@@ -671,8 +673,8 @@ public class TestStreaming {
     // 1) to partitioned table
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
@@ -698,28 +700,35 @@ public class TestStreaming {
 
   @Test
   public void testTransactionBatchCommit_Delimited() throws Exception {
+    testTransactionBatchCommit_Delimited(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+    testTransactionBatchCommit_Delimited(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) 
throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, conf, connection);
 
     // 1st Txn
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
 
     // 2nd Txn
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -728,11 +737,11 @@ public class TestStreaming {
     txnBatch.commit();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-        "{2, Welcome to streaming}");
+      "{2, Welcome to streaming}");
 
     txnBatch.close();
     Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
 
 
     connection.close();
@@ -740,19 +749,19 @@ public class TestStreaming {
 
     // To Unpartitioned table
     endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    writer = new DelimitedInputWriter(fieldNames,",", endPt);
-    connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + 
Thread.currentThread().getName());
+    writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
 
     // 1st Txn
     txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
+      , txnBatch.getCurrentTransactionState());
     connection.close();
   }
 
@@ -760,8 +769,8 @@ public class TestStreaming {
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    StrictJsonWriter writer = new StrictJsonWriter(endPt);
     StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
+    StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
 
     // 1st Txn
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
@@ -845,8 +854,8 @@ public class TestStreaming {
 
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
 
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
@@ -873,8 +882,8 @@ public class TestStreaming {
     String agentInfo = "UT_" + Thread.currentThread().getName();
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
@@ -1174,8 +1183,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1187,8 +1196,8 @@ public class TestStreaming {
 
 
     HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, 
null);
-    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", 
endPt2);
     StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", 
endPt2, connection);
     TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, 
writer2);
     txnBatch2.beginNextTransaction();
 
@@ -1251,8 +1260,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1323,8 +1332,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
 
     // we need side file for this test, so we create 2 txn batch and test with 
only one
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
@@ -1449,8 +1458,8 @@ public class TestStreaming {
 
     // 2) Insert data into both tables
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt);
     StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.beginNextTransaction();
@@ -1671,9 +1680,9 @@ public class TestStreaming {
     runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 
buckets stored as orc TBLPROPERTIES ('transactional'='true')");
 
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", 
null);
-    DelimitedInputWriter innerWriter = new 
DelimitedInputWriter("a,b".split(","),",", endPt);
-    FaultyWriter writer = new FaultyWriter(innerWriter);
     StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter innerWriter = new 
DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+    FaultyWriter writer = new FaultyWriter(innerWriter);
 
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
     txnBatch.close();

Reply via email to