Repository: hive
Updated Branches:
  refs/heads/master dfaf90f2b -> cc52e9b22


HIVE-19206: Automatic memory management for open streaming writers (Prasanth 
Jayachandran reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc52e9b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc52e9b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc52e9b2

Branch: refs/heads/master
Commit: cc52e9b22bd64be117d3acd9918279aa355b237b
Parents: dfaf90f
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Thu May 3 11:56:03 2018 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Thu May 3 11:56:03 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/HeapMemoryMonitor.java   | 154 +++++++++++++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  13 ++
 .../apache/hadoop/hive/ql/io/RecordUpdater.java |  12 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  |   5 +
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   9 +
 .../hive/ql/exec/TestFileSinkOperator.java      |   5 +
 .../hive/streaming/AbstractRecordWriter.java    | 169 ++++++++++++++++---
 .../apache/hive/streaming/ConnectionInfo.java   |  11 +-
 .../apache/hive/streaming/ConnectionStats.java  |  88 ++++++++++
 .../hive/streaming/HiveStreamingConnection.java |  85 ++++++----
 .../hive/streaming/StreamingConnection.java     |   7 +
 .../streaming/StrictDelimitedInputWriter.java   |   3 +-
 .../apache/hive/streaming/StrictJsonWriter.java |   3 +-
 .../hive/streaming/StrictRegexWriter.java       |   3 +-
 14 files changed, 487 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
new file mode 100644
index 0000000..42286be
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryNotificationInfo;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.management.NotificationEmitter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that monitors memory usage and notifies the listeners when a certain 
of threshold of memory is used
+ * after GC (collection usage).
+ */
+public class HeapMemoryMonitor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HeapMemoryMonitor.class.getName());
+  // notifies when memory usage is 70% after GC
+  private static final double DEFAULT_THRESHOLD = 0.7d;
+  private static final MemoryPoolMXBean tenuredGenPool = getTenuredGenPool();
+
+  private final double threshold;
+  private List<Listener> listeners = new ArrayList<>();
+
+  public interface Listener {
+    void memoryUsageAboveThreshold(long usedMemory, long maxMemory);
+  }
+
+  public HeapMemoryMonitor(double threshold) {
+    this.threshold = threshold <= 0.0d || threshold > 1.0d ? DEFAULT_THRESHOLD 
: threshold;
+    setupTenuredGenPoolThreshold(tenuredGenPool);
+  }
+
+  private void setupTenuredGenPoolThreshold(final MemoryPoolMXBean 
tenuredGenPool) {
+    if (tenuredGenPool == null) {
+      return;
+    }
+    for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+      final long memoryThreshold = (int) Math.floor(pool.getUsage().getMax() * 
threshold);
+      final boolean isTenured = isTenured(pool);
+      if (!isTenured) {
+        continue;
+      }
+      // set memory threshold on memory used after GC
+      final boolean isCollectionUsageThresholdSupported = 
pool.isCollectionUsageThresholdSupported();
+      if (isCollectionUsageThresholdSupported) {
+        LOG.info("Setting collection usage threshold to {}", memoryThreshold);
+        pool.setCollectionUsageThreshold(memoryThreshold);
+        return;
+      } else {
+        // if collection usage threshold is not support, worst case set memory 
threshold on memory usage (before GC)
+        final boolean isUsageThresholdSupported = 
pool.isUsageThresholdSupported();
+        if (isUsageThresholdSupported) {
+          LOG.info("Setting usage threshold to {}", memoryThreshold);
+          pool.setUsageThreshold(memoryThreshold);
+          return;
+        }
+      }
+    }
+  }
+
+  private static MemoryPoolMXBean getTenuredGenPool() {
+    for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+      final String vendor = System.getProperty("java.vendor");
+      final boolean isTenured = isTenured(pool);
+      if (!isTenured) {
+        continue;
+      }
+      final boolean isCollectionUsageThresholdSupported = 
pool.isCollectionUsageThresholdSupported();
+      if (isCollectionUsageThresholdSupported) {
+        return pool;
+      } else {
+        final boolean isUsageThresholdSupported = 
pool.isUsageThresholdSupported();
+        if (isUsageThresholdSupported) {
+          return pool;
+        } else {
+          LOG.error("{} vendor does not support 
isCollectionUsageThresholdSupported() and isUsageThresholdSupported()" +
+            " for tenured memory pool '{}'.", vendor, pool.getName());
+        }
+      }
+    }
+    return null;
+  }
+
+
+  private static boolean isTenured(MemoryPoolMXBean memoryPoolMXBean) {
+    if (memoryPoolMXBean.getType() != MemoryType.HEAP) {
+      return false;
+    }
+
+    String name = memoryPoolMXBean.getName();
+    return name.equals("CMS Old Gen") // CMS
+      || name.equals("PS Old Gen") // Parallel GC
+      || name.equals("G1 Old Gen") // G1GC
+      // other vendors like IBM, Azul etc. use different names
+      || name.equals("Old Space")
+      || name.equals("Tenured Gen")
+      || name.equals("Java heap")
+      || name.equals("GenPauseless Old Gen");
+  }
+
+  public void registerListener(final Listener listener) {
+    listeners.add(listener);
+  }
+
+  public MemoryUsage getTenuredGenMemoryUsage() {
+    if (tenuredGenPool == null) {
+      return null;
+    }
+    return tenuredGenPool.getUsage();
+  }
+
+  public void start() {
+    // unsupported if null
+    if (tenuredGenPool == null) {
+      return;
+    }
+    MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+    NotificationEmitter emitter = (NotificationEmitter) mxBean;
+    emitter.addNotificationListener((n, hb) -> {
+      if (n.getType().equals(
+        MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
+        long maxMemory = tenuredGenPool.getUsage().getMax();
+        long usedMemory = tenuredGenPool.getUsage().getUsed();
+        for (Listener listener : listeners) {
+          listener.memoryUsageAboveThreshold(usedMemory, maxMemory);
+        }
+      }
+    }, null, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6358ff3..d0eb2a4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1899,6 +1899,19 @@ public class HiveConf extends Configuration {
         " (split generation reads and caches file footers). HYBRID chooses 
between the above strategies" +
         " based on heuristics."),
 
+    // hive streaming ingest settings
+    HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", 
true, "Whether to enable memory \n" +
+      "monitoring and automatic flushing of open record updaters during 
streaming ingest. This is an expert level \n" +
+      "setting and disabling this may have severe performance impact under 
memory pressure."),
+    
HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD("hive.heap.memory.monitor.usage.threshold",
 0.7f,
+      "Hive streaming does automatic memory management across all open record 
writers. This threshold will let the \n" +
+        "memory monitor take an action (flush open files) when heap memory 
usage exceeded this threshold."),
+    
HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE("hive.streaming.auto.flush.check.interval.size",
 "100Mb",
+      new SizeValidator(),
+      "Hive streaming ingest has auto flush mechanism to flush all open record 
updaters under memory pressure.\n" +
+        "When memory usage exceed 
hive.heap.memory.monitor.default.usage.threshold, the auto-flush mechanism will 
\n" +
+        "wait until this size (default 100Mb) of records are ingested before 
triggering flush."),
+
     
HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", 
false,
         "Whether to enable using file metadata cache in metastore for ORC file 
footers."),
     
HIVE_ORC_MS_FOOTER_CACHE_PPD("hive.orc.splits.ms.footer.cache.ppd.enabled", 
true,

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
index 0aed172..737e677 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-
 import java.io.IOException;
 
+import org.apache.hadoop.hive.serde2.SerDeStats;
+
 /**
  * API for supporting updating records.
  */
@@ -73,4 +72,11 @@ public interface RecordUpdater {
    * @return SerDeStats
    */
   SerDeStats getStats();
+
+  /**
+   * Returns the number of rows in memory before flush().
+   *
+   * @return - buffered row count
+   */
+  long getBufferedRowCount();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index e69d1a0..835f0e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -227,6 +227,11 @@ public class OrcOutputFormat extends 
FileOutputFormat<NullWritable, OrcSerdeRow>
       return null;
     }
 
+    @Override
+    public long getBufferedRowCount() {
+      return 0;
+    }
+
     private void stringifyObject(StringBuilder buffer,
                                  Object obj,
                                  ObjectInspector inspector

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 0f03dbc..e187ce1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -106,6 +106,8 @@ public class OrcRecordUpdater implements RecordUpdater {
   // This records how many rows have been inserted or deleted.  It is separate 
from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
+  // used only for insert events, this is the number of rows held in memory 
before flush() is invoked
+  private long bufferedRows = 0;
   private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert");
   private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record 
identifier in
@@ -438,6 +440,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row);
     }
     rowCountDelta++;
+    bufferedRows++;
   }
 
   @Override
@@ -481,6 +484,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       flushLengths.writeLong(len);
       OrcInputFormat.SHIMS.hflush(flushLengths);
     }
+    bufferedRows = 0;
     //multiple transactions only happen for streaming ingest which only allows 
inserts
     assert deleteEventWriter == null : "unexpected delete writer for " + path;
   }
@@ -539,6 +543,11 @@ public class OrcRecordUpdater implements RecordUpdater {
     return stats;
   }
 
+  @Override
+  public long getBufferedRowCount() {
+    return bufferedRows;
+  }
+
   static RecordIdentifier[] parseKeyIndex(Reader reader) {
     String[] stripes;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 7f6077c..71127c2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -773,6 +773,11 @@ public class TestFileSinkOperator {
           stats.setRowCount(numRecordsAdded);
           return stats;
         }
+
+        @Override
+        public long getBufferedRowCount() {
+          return records.size();
+        }
       };
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 643bcc4..a979df0 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -20,29 +20,32 @@ package org.apache.hive.streaming;
 
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HeapMemoryMonitor;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -51,7 +54,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +62,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   protected HiveConf conf;
   private StreamingConnection conn;
-  protected Table tbl;
+  protected Table table;
   List<String> inputColumns;
   List<String> inputTypes;
   private String fullyQualifiedTableName;
@@ -85,6 +87,32 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   private AcidOutputFormat<?, ?> acidOutputFormat;
   private Long curBatchMinWriteId;
   private Long curBatchMaxWriteId;
+  private HeapMemoryMonitor heapMemoryMonitor;
+  // if low memory canary is set and if records after set canary exceeds 
threshold, trigger a flush.
+  // This is to avoid getting notified of low memory too often and flushing 
too often.
+  private AtomicBoolean lowMemoryCanary;
+  private long ingestSizeBytes = 0;
+  private boolean autoFlush;
+  private float memoryUsageThreshold;
+  private long ingestSizeThreshold;
+
+  private static class OrcMemoryPressureMonitor implements 
HeapMemoryMonitor.Listener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName());
+    private final AtomicBoolean lowMemoryCanary;
+
+    OrcMemoryPressureMonitor(final AtomicBoolean lowMemoryCanary) {
+      this.lowMemoryCanary = lowMemoryCanary;
+    }
+
+    @Override
+    public void memoryUsageAboveThreshold(final long usedMemory, final long 
maxMemory) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Orc memory pressure notified! usedMemory: {} maxMemory: 
{}.",
+          LlapUtil.humanReadableByteCount(usedMemory), 
LlapUtil.humanReadableByteCount(maxMemory));
+      }
+      lowMemoryCanary.set(true);
+    }
+  }
 
   @Override
   public void init(StreamingConnection conn, long minWriteId, long maxWriteId) 
throws StreamingException {
@@ -97,21 +125,22 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
       this.curBatchMaxWriteId = maxWriteId;
       this.conf = conn.getHiveConf();
       this.defaultPartitionName = 
conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
-      final IMetaStoreClient msClient = 
HiveMetaStoreUtils.getHiveMetastoreClient(this.conf);
-      this.tbl = msClient.getTable(conn.getDatabase(), conn.getTable());
-      this.inputColumns = 
tbl.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
-      this.inputTypes = 
tbl.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
+      this.table = conn.getTable();
+      this.inputColumns = 
table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
+      this.inputTypes = 
table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
       if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
-        this.partitionColumns = 
tbl.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
+        this.partitionColumns = 
table.getPartitionKeys().stream().map(FieldSchema::getName)
+          .collect(Collectors.toList());
         this.inputColumns.addAll(partitionColumns);
-        
this.inputTypes.addAll(tbl.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList()));
+        this.inputTypes
+          
.addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList()));
       }
-      this.fullyQualifiedTableName = 
Warehouse.getQualifiedName(conn.getDatabase(), conn.getTable());
-      String outFormatName = this.tbl.getSd().getOutputFormat();
-      this.acidOutputFormat = (AcidOutputFormat<?, ?>) 
ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
-    } catch (MetaException | NoSuchObjectException e) {
-      throw new ConnectionError(conn, e);
-    } catch (TException | ClassNotFoundException | IOException e) {
+      this.fullyQualifiedTableName = 
Warehouse.getQualifiedName(table.getDbName(), table.getTableName());
+      String outFormatName = this.table.getSd().getOutputFormat();
+      this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils
+        .newInstance(JavaUtils.loadClass(outFormatName), conf);
+      setupMemoryMonitoring();
+    } catch (ClassNotFoundException e) {
       throw new StreamingException(e.getMessage(), e);
     }
 
@@ -120,7 +149,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
       this.inputRowObjectInspector = (StructObjectInspector) 
serDe.getObjectInspector();
       if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
         preparePartitioningFields();
-        int dpStartCol = 
inputRowObjectInspector.getAllStructFieldRefs().size() - 
tbl.getPartitionKeys().size();
+        int dpStartCol = 
inputRowObjectInspector.getAllStructFieldRefs().size() - 
table.getPartitionKeys().size();
         this.outputRowObjectInspector = new 
SubStructObjectInspector(inputRowObjectInspector, 0, dpStartCol);
       } else {
         this.outputRowObjectInspector = inputRowObjectInspector;
@@ -131,14 +160,36 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
   }
 
+  private void setupMemoryMonitoring() {
+    this.autoFlush = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
+    this.memoryUsageThreshold = 
conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
+    this.ingestSizeThreshold = 
conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);
+    LOG.info("Memory monitorings settings - autoFlush: {} 
memoryUsageThreshold: {} ingestSizeThreshold: {}",
+      autoFlush, memoryUsageThreshold, ingestSizeBytes);
+    this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold);
+    MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
+    if (tenuredMemUsage != null) {
+      lowMemoryCanary = new AtomicBoolean(false);
+      heapMemoryMonitor.registerListener(new 
OrcMemoryPressureMonitor(lowMemoryCanary));
+      heapMemoryMonitor.start();
+      // alert if we already running low on memory (starting with low memory 
will lead to frequent auto flush)
+      float currentUsage = (float) tenuredMemUsage.getUsed() / (float) 
tenuredMemUsage.getMax();
+      if (currentUsage > memoryUsageThreshold) {
+        LOG.warn("LOW MEMORY ALERT! Tenured gen memory is already low. 
Increase memory to improve performance." +
+            " Used: {} Max: {}", 
LlapUtil.humanReadableByteCount(tenuredMemUsage.getUsed()),
+          LlapUtil.humanReadableByteCount(tenuredMemUsage.getMax()));
+      }
+    }
+  }
+
   private void prepareBucketingFields() {
-    this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+    this.isBucketed = table.getSd().getNumBuckets() > 0;
     // For unbucketed tables we have exactly 1 RecordUpdater (until 
HIVE-19208) for each AbstractRecordWriter which
     // ends up writing to a file bucket_000000.
     // See also {@link #getBucket(Object)}
-    this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+    this.totalBuckets = isBucketed ? table.getSd().getNumBuckets() : 1;
     if (isBucketed) {
-      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), 
tbl.getSd().getCols());
+      this.bucketIds = getBucketColIDs(table.getSd().getBucketCols(), 
table.getSd().getCols());
       this.bucketFieldData = new Object[bucketIds.size()];
       this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, 
inputRowObjectInspector);
       this.bucketStructFields = new StructField[bucketIds.size()];
@@ -150,7 +201,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   }
 
   private void preparePartitioningFields() {
-    final int numPartitions = tbl.getPartitionKeys().size();
+    final int numPartitions = table.getPartitionKeys().size();
     this.partitionFieldData = new Object[numPartitions];
     this.partitionObjInspectors = new ObjectInspector[numPartitions];
     int startIdx = inputRowObjectInspector.getAllStructFieldRefs().size() - 
numPartitions;
@@ -186,6 +237,12 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     return result;
   }
 
+  /**
+   * Create SerDe for the record writer.
+   *
+   * @return - serde
+   * @throws SerializationError - if serde cannot be created.
+   */
   public abstract AbstractSerDe createSerde() throws SerializationError;
 
   /**
@@ -205,7 +262,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
     Object[] bucketFields = getBucketFields(row);
     int bucketingVersion = Utilities.getBucketingVersion(
-      
tbl.getParameters().get(hive_metastoreConstants.TABLE_BUCKETING_VERSION));
+      
table.getParameters().get(hive_metastoreConstants.TABLE_BUCKETING_VERSION));
 
     return bucketingVersion == 2 ?
       ObjectInspectorUtils.getBucketNumber(bucketFields, bucketObjInspectors, 
totalBuckets) :
@@ -239,6 +296,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   @Override
   public void flush() throws StreamingIOFailure {
     try {
+      logStats("Stats before flush:");
       for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) 
{
         LOG.info("Flushing record updater for partitions: {}", entry.getKey());
         for (RecordUpdater updater : entry.getValue()) {
@@ -247,6 +305,8 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
           }
         }
       }
+      ingestSizeBytes = 0;
+      logStats("Stats after flush:");
     } catch (IOException e) {
       throw new StreamingIOFailure("Unable to flush recordUpdater", e);
     }
@@ -256,6 +316,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
   public void close() throws StreamingIOFailure {
     boolean haveError = false;
     String partition = null;
+    logStats("Stats before close:");
     for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
       partition = entry.getKey();
       LOG.info("Closing updater for partitions: {}", partition);
@@ -273,6 +334,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
       entry.getValue().clear();
     }
     updaters.clear();
+    logStats("Stats after close:");
     if (haveError) {
       throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark(partition));
     }
@@ -306,6 +368,8 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   @Override
   public void write(final long writeId, final byte[] record) throws 
StreamingException {
+    checkAutoFlush();
+    ingestSizeBytes += record.length;
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
@@ -317,6 +381,33 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
   }
 
+  private void checkAutoFlush() throws StreamingIOFailure {
+    if (!autoFlush) {
+      return;
+    }
+    if (lowMemoryCanary != null) {
+      if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) {
+        LOG.info("Low memory canary is set and ingestion size (buffered) 
threshold '{}' exceeded. " +
+          "Flushing all record updaters..", 
LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+        flush();
+        conn.getConnectionStats().incrementAutoFlushCount();
+        lowMemoryCanary.set(false);
+      }
+    } else {
+      if (ingestSizeBytes > ingestSizeThreshold) {
+        MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+        MemoryUsage heapUsage = mxBean.getHeapMemoryUsage();
+        float memUsedFraction = ((float) heapUsage.getUsed() / (float) 
heapUsage.getMax());
+        if (memUsedFraction > memoryUsageThreshold) {
+          LOG.info("Memory usage threshold '{}' and ingestion size (buffered) 
threshold '{}' exceeded. " +
+            "Flushing all record updaters..", memUsedFraction, 
LlapUtil.humanReadableByteCount(ingestSizeThreshold));
+          flush();
+          conn.getConnectionStats().incrementAutoFlushCount();
+        }
+      }
+    }
+  }
+
   @Override
   public Set<String> getPartitions() {
     return addedPartitions;
@@ -329,7 +420,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     // may define certain table parameters that may be required while writing. 
The table parameter
     // 'transactional_properties' is one such example.
     Properties tblProperties = new Properties();
-    tblProperties.putAll(tbl.getParameters());
+    tblProperties.putAll(table.getParameters());
     return acidOutputFormat.getRecordUpdater(partitionPath,
       new AcidOutputFormat.Options(conf)
         .inspector(outputRowObjectInspector)
@@ -354,7 +445,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
       } else {
         // un-partitioned table
         if (partitionValues == null) {
-          destLocation = new Path(tbl.getSd().getLocation());
+          destLocation = new Path(table.getSd().getLocation());
         } else {
           PartitionInfo partitionInfo = 
conn.createPartitionIfNotExists(partitionValues);
           // collect the newly added partitions. 
connection.commitTransaction() will report the dynamically added
@@ -395,4 +486,28 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
     return result;
   }
+
+  private void logStats(final String prefix) {
+    int openRecordUpdaters = updaters.values()
+      .stream()
+      .mapToInt(List::size)
+      .sum();
+    long bufferedRecords = updaters.values()
+      .stream()
+      .flatMap(List::stream)
+      .filter(Objects::nonNull)
+      .mapToLong(RecordUpdater::getBufferedRowCount)
+      .sum();
+    MemoryUsage memoryUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
+    String oldGenUsage = "NA";
+    if (memoryUsage != null) {
+      oldGenUsage = "used/max => " + 
LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" +
+        LlapUtil.humanReadableByteCount(memoryUsage.getMax());
+    }
+    LOG.info("{} [record-updaters: {}, partitions: {}, buffered-records: {} 
total-records: {} " +
+        "buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: 
{}]", prefix, openRecordUpdaters,
+      partitionPaths.size(), bufferedRecords, 
conn.getConnectionStats().getRecordsWritten(),
+      LlapUtil.humanReadableByteCount(ingestSizeBytes),
+      
LlapUtil.humanReadableByteCount(conn.getConnectionStats().getRecordsSize()), 
oldGenUsage);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java 
b/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java
index ca8babf..511ffdd 100644
--- a/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java
@@ -20,6 +20,8 @@ package org.apache.hive.streaming;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.metadata.Table;
+
 /**
  * Helper interface to get connection related information.
  */
@@ -33,18 +35,11 @@ public interface ConnectionInfo {
   String getMetastoreUri();
 
   /**
-   * Get the database used by streaming connection.
-   *
-   * @return - database
-   */
-  String getDatabase();
-
-  /**
    * Get the table used by streaming connection.
    *
    * @return - table
    */
-  String getTable();
+  Table getTable();
 
   /**
    * Get any static partitions specified during streaming connection creation.

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java 
b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
new file mode 100644
index 0000000..355456e
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Store statistics about streaming connection.
+ */
+public class ConnectionStats {
+  private LongAdder recordsWritten = new LongAdder();
+  private LongAdder recordsSize = new LongAdder();
+  private LongAdder committedTransactions = new LongAdder();
+  private LongAdder abortedTransactions = new LongAdder();
+  private LongAdder autoFlushCount = new LongAdder();
+  private LongAdder metastoreCalls = new LongAdder();
+
+  public void incrementRecordsWritten() {
+    recordsWritten.increment();
+  }
+
+  public void incrementCommittedTransactions() {
+    committedTransactions.increment();
+  }
+
+  public void incrementAbortedTransactions() {
+    abortedTransactions.increment();
+  }
+
+  public void incrementAutoFlushCount() {
+    autoFlushCount.increment();
+  }
+
+  public void incrementMetastoreCalls() {
+    metastoreCalls.increment();
+  }
+
+  public void incrementRecordsSize(long delta) {
+    recordsSize.add(delta);
+  }
+
+  public long getRecordsWritten() {
+    return recordsWritten.longValue();
+  }
+
+  public long getRecordsSize() {
+    return recordsSize.longValue();
+  }
+
+  public long getCommittedTransactions() {
+    return committedTransactions.longValue();
+  }
+
+  public long getAbortedTransactions() {
+    return abortedTransactions.longValue();
+  }
+
+  public long getAutoFlushCount() {
+    return autoFlushCount.longValue();
+  }
+
+  public long getMetastoreCalls() {
+    return metastoreCalls.longValue();
+  }
+
+  @Override
+  public String toString() {
+    return "{records-written: " + recordsWritten + ", records-size: "+ 
recordsSize + ", committed-transactions: " +
+      committedTransactions + ", aborted-transactions: " + abortedTransactions 
+ ", auto-flushes: " + autoFlushCount +
+      ", metastore-calls: " + metastoreCalls + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 205ed6c..c4da7af 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -158,6 +158,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private final boolean secureMode;
   private Table tableObject = null;
   private String metastoreUri;
+  private ConnectionStats connectionStats;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
     this.database = builder.database.toLowerCase();
@@ -181,6 +182,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
     }
     this.transactionBatchSize = builder.transactionBatchSize;
     this.recordWriter = builder.recordWriter;
+    this.connectionStats = new ConnectionStats();
     if (agentInfo == null) {
       try {
         agentInfo = username + ":" + InetAddress.getLocalHost().getHostName() 
+ ":" + Thread.currentThread().getName();
@@ -194,10 +196,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
     }
     overrideConfSettings(conf);
     this.metastoreUri = 
conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName());
-    this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode);
+    this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, 
"streaming-connection");
     // We use a separate metastore client for heartbeat calls to ensure 
heartbeat RPC calls are
     // isolated from the other transaction related RPC calls.
-    this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, 
secureMode);
+    this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, 
secureMode, "streaming-connection-heartbeat");
     validateTable();
     LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString());
   }
@@ -369,7 +371,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       partLocation = new Path(tableObject.getDataLocation(), 
Warehouse.makePartPath(partSpec)).toString();
       addPartitionDesc.addPartition(partSpec, partLocation);
       Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, 
addPartitionDesc.getPartition(0), conf);
-      msClient.add_partition(partition);
+      getMSC().add_partition(partition);
     } catch (AlreadyExistsException e) {
       exists = true;
     } catch (HiveException | TException e) {
@@ -379,9 +381,19 @@ public class HiveStreamingConnection implements 
StreamingConnection {
     return new PartitionInfo(partName, partLocation, exists);
   }
 
+  IMetaStoreClient getMSC() {
+    connectionStats.incrementMetastoreCalls();
+    return msClient;
+  }
+
+  IMetaStoreClient getHeatbeatMSC() {
+    connectionStats.incrementMetastoreCalls();
+    return heartbeatMSClient;
+  }
+
   private void validateTable() throws InvalidTable, ConnectionError {
     try {
-      tableObject = new Table(msClient.getTable(database, table));
+      tableObject = new Table(getMSC().getTable(database, table));
     } catch (Exception e) {
       LOG.warn("Unable to validate the table for connection: " + 
toConnectionInfoString(), e);
       throw new InvalidTable(database, table, e);
@@ -408,15 +420,15 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   }
 
   private static class HeartbeatRunnable implements Runnable {
-    private final IMetaStoreClient heartbeatMSClient;
+    private final HiveStreamingConnection conn;
     private final AtomicLong minTxnId;
     private final long maxTxnId;
     private final ReentrantLock transactionLock;
     private final AtomicBoolean isTxnClosed;
 
-    HeartbeatRunnable(final IMetaStoreClient heartbeatMSClient, final 
AtomicLong minTxnId, final long maxTxnId,
+    HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong 
minTxnId, final long maxTxnId,
       final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) {
-      this.heartbeatMSClient = heartbeatMSClient;
+      this.conn = conn;
       this.minTxnId = minTxnId;
       this.maxTxnId = maxTxnId;
       this.transactionLock = transactionLock;
@@ -428,7 +440,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       transactionLock.lock();
       try {
         if (minTxnId.get() > 0) {
-          HeartbeatTxnRangeResponse resp = 
heartbeatMSClient.heartbeatTxnRange(minTxnId.get(), maxTxnId);
+          HeartbeatTxnRangeResponse resp = 
conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId);
           if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
             LOG.error("Heartbeat failure: {}", resp.toString());
             isTxnClosed.set(true);
@@ -488,6 +500,8 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   public void write(final byte[] record) throws StreamingException {
     checkState();
     currentTransactionBatch.write(record);
+    connectionStats.incrementRecordsWritten();
+    connectionStats.incrementRecordsSize(record.length);
   }
 
   @Override
@@ -500,12 +514,14 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   public void commitTransaction() throws StreamingException {
     checkState();
     currentTransactionBatch.commit();
+    connectionStats.incrementCommittedTransactions();
   }
 
   @Override
   public void abortTransaction() throws StreamingException {
     checkState();
     currentTransactionBatch.abort();
+    connectionStats.incrementAbortedTransactions();
   }
 
   @Override
@@ -521,12 +537,19 @@ public class HiveStreamingConnection implements 
StreamingConnection {
     } catch (StreamingException e) {
       LOG.error("Unable to close current transaction batch: " + 
currentTransactionBatch, e);
     } finally {
-      msClient.close();
-      heartbeatMSClient.close();
+      getMSC().close();
+      getHeatbeatMSC().close();
     }
+    LOG.info("Closed streaming connection. Agent: {} Stats: {}", 
getAgentInfo(), getConnectionStats());
+  }
+
+  @Override
+  public ConnectionStats getConnectionStats() {
+    return connectionStats;
   }
 
-  private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String 
metastoreUri, boolean secureMode)
+  private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String 
metastoreUri, boolean secureMode,
+    String owner)
     throws ConnectionError {
     if (metastoreUri != null) {
       conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri);
@@ -535,6 +558,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), 
true);
     }
     try {
+      LOG.info("Creating metastore client for {}", owner);
       return HiveMetaStoreUtils.getHiveMetastoreClient(conf);
     } catch (MetaException | IOException e) {
       throw new ConnectionError("Error connecting to Hive Metastore URI: "
@@ -560,8 +584,6 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private static class TransactionBatch {
     private String username;
     private HiveStreamingConnection conn;
-    private IMetaStoreClient msClient;
-    private IMetaStoreClient heartbeatMSClient;
     private ScheduledExecutorService scheduledExecutorService;
     private RecordWriter recordWriter;
     private String partNameForLock = null;
@@ -614,16 +636,14 @@ public class HiveStreamingConnection implements 
StreamingConnection {
         }
         this.conn = conn;
         this.username = conn.username;
-        this.msClient = conn.msClient;
-        this.heartbeatMSClient = conn.heartbeatMSClient;
         this.recordWriter = conn.recordWriter;
         this.agentInfo = conn.agentInfo;
         this.numTxns = conn.transactionBatchSize;
 
         setupHeartBeatThread();
 
-        List<Long> txnIds = openTxnImpl(msClient, username, numTxns);
-        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds);
+        List<Long> txnIds = openTxnImpl(username, numTxns);
+        txnToWriteIds = allocateWriteIdsImpl(txnIds);
         assert (txnToWriteIds.size() == numTxns);
 
         txnStatus = new TxnState[numTxns];
@@ -664,19 +684,17 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       initialDelay = (long) (heartBeatInterval * 0.75 * Math.random());
       LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: 
{} ms for agentInfo: {}",
         heartBeatInterval, initialDelay, conn.agentInfo);
-      Runnable runnable = new HeartbeatRunnable(heartbeatMSClient, minTxnId, 
maxTxnId, transactionLock, isTxnClosed);
+      Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, 
transactionLock, isTxnClosed);
       this.scheduledExecutorService.scheduleWithFixedDelay(runnable, 
initialDelay, heartBeatInterval, TimeUnit
         .MILLISECONDS);
     }
 
-    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final 
String user, final int numTxns)
-      throws TException {
-      return msClient.openTxns(user, numTxns).getTxn_ids();
+    private List<Long> openTxnImpl(final String user, final int numTxns) 
throws TException {
+      return conn.getMSC().openTxns(user, numTxns).getTxn_ids();
     }
 
-    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient 
msClient,
-      final List<Long> txnIds) throws TException {
-      return msClient.allocateTableWriteIdsBatch(txnIds, conn.database, 
conn.table);
+    private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) 
throws TException {
+      return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, 
conn.table);
     }
 
     @Override
@@ -714,7 +732,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       lastTxnUsed = getCurrentTxnId();
       lockRequest = createLockRequest(conn, partNameForLock, username, 
getCurrentTxnId(), agentInfo);
       try {
-        LockResponse res = msClient.lock(lockRequest);
+        LockResponse res = conn.getMSC().lock(lockRequest);
         if (res.getState() != LockState.ACQUIRED) {
           throw new TransactionError("Unable to acquire lock on " + conn);
         }
@@ -813,12 +831,12 @@ public class HiveStreamingConnection implements 
StreamingConnection {
         TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex);
         if (conn.isDynamicPartitioning()) {
           List<String> partNames = new 
ArrayList<>(recordWriter.getPartitions());
-          msClient.addDynamicPartitions(txnToWriteId.getTxnId(), 
txnToWriteId.getWriteId(), conn.database, conn.table,
+          conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), 
txnToWriteId.getWriteId(), conn.database, conn.table,
             partNames, DataOperationType.INSERT);
         }
         transactionLock.lock();
         try {
-          msClient.commitTxn(txnToWriteId.getTxnId());
+          conn.getMSC().commitTxn(txnToWriteId.getTxnId());
           // increment the min txn id so that heartbeat thread will heartbeat 
only from the next open transaction.
           // the current transaction is going to committed or fail, so don't 
need heartbeat for current transaction.
           if (currentTxnIndex + 1 < txnToWriteIds.size()) {
@@ -873,7 +891,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
             (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 
0), 0);
           for (currentTxnIndex = minOpenTxnIndex;
             currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
-            
msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+            
conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
             txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
           currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
@@ -888,7 +906,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
           }
           long currTxnId = getCurrentTxnId();
           if (currTxnId > 0) {
-            msClient.rollbackTxn(currTxnId);
+            conn.getMSC().rollbackTxn(currTxnId);
             txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
         }
@@ -1008,13 +1026,8 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   }
 
   @Override
-  public String getDatabase() {
-    return database;
-  }
-
-  @Override
-  public String getTable() {
-    return table;
+  public Table getTable() {
+    return tableObject;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
index cd7f3d8..5396d76 100644
--- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -61,4 +61,11 @@ public interface StreamingConnection extends ConnectionInfo, 
PartitionHandler {
    * Closes streaming connection.
    */
   void close();
+
+  /**
+   * Gets stats about the streaming connection.
+   *
+   * @return - connection stats
+   */
+  ConnectionStats getConnectionStats();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
index 4a07435..56f59fd 100644
--- 
a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
+++ 
b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java
@@ -21,7 +21,6 @@ package org.apache.hive.streaming;
 
 import java.util.Properties;
 
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -93,7 +92,7 @@ public class StrictDelimitedInputWriter extends 
AbstractRecordWriter {
   @Override
   public LazySimpleSerDe createSerde() throws SerializationError {
     try {
-      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      Properties tableProps = table.getMetadata();
       tableProps.setProperty(serdeConstants.LIST_COLUMNS, 
Joiner.on(",").join(inputColumns));
       tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, 
Joiner.on(":").join(inputTypes));
       tableProps.setProperty(serdeConstants.FIELD_DELIM, 
String.valueOf(fieldDelimiter));

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
index 1600e7c..2c54eef 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -20,7 +20,6 @@ package org.apache.hive.streaming;
 
 import java.util.Properties;
 
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.serde2.JsonSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -53,7 +52,7 @@ public class StrictJsonWriter extends AbstractRecordWriter {
   @Override
   public JsonSerDe createSerde() throws SerializationError {
     try {
-      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      Properties tableProps = table.getMetadata();
       JsonSerDe serde = new JsonSerDe();
       SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
       this.serde = serde;

http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
index 563cf66..ba2609f 100644
--- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -21,7 +21,6 @@ package org.apache.hive.streaming;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.RegexSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -67,7 +66,7 @@ public class StrictRegexWriter extends AbstractRecordWriter {
   @Override
   public RegexSerDe createSerde() throws SerializationError {
     try {
-      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      Properties tableProps = table.getMetadata();
       tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
       tableProps.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(inputColumns, ","));
       RegexSerDe serde = new RegexSerDe();

Reply via email to