Repository: hive Updated Branches: refs/heads/master dfaf90f2b -> cc52e9b22
HIVE-19206: Automatic memory management for open streaming writers (Prasanth Jayachandran reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc52e9b2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc52e9b2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc52e9b2 Branch: refs/heads/master Commit: cc52e9b22bd64be117d3acd9918279aa355b237b Parents: dfaf90f Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Thu May 3 11:56:03 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Thu May 3 11:56:03 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/HeapMemoryMonitor.java | 154 +++++++++++++++++ .../org/apache/hadoop/hive/conf/HiveConf.java | 13 ++ .../apache/hadoop/hive/ql/io/RecordUpdater.java | 12 +- .../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 5 + .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 9 + .../hive/ql/exec/TestFileSinkOperator.java | 5 + .../hive/streaming/AbstractRecordWriter.java | 169 ++++++++++++++++--- .../apache/hive/streaming/ConnectionInfo.java | 11 +- .../apache/hive/streaming/ConnectionStats.java | 88 ++++++++++ .../hive/streaming/HiveStreamingConnection.java | 85 ++++++---- .../hive/streaming/StreamingConnection.java | 7 + .../streaming/StrictDelimitedInputWriter.java | 3 +- .../apache/hive/streaming/StrictJsonWriter.java | 3 +- .../hive/streaming/StrictRegexWriter.java | 3 +- 14 files changed, 487 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java new file mode 100644 index 0000000..42286be --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryNotificationInfo; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.List; + +import javax.management.NotificationEmitter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that monitors memory usage and notifies the listeners when a certain of threshold of memory is used + * after GC (collection usage). + */ +public class HeapMemoryMonitor { + private static final Logger LOG = LoggerFactory.getLogger(HeapMemoryMonitor.class.getName()); + // notifies when memory usage is 70% after GC + private static final double DEFAULT_THRESHOLD = 0.7d; + private static final MemoryPoolMXBean tenuredGenPool = getTenuredGenPool(); + + private final double threshold; + private List<Listener> listeners = new ArrayList<>(); + + public interface Listener { + void memoryUsageAboveThreshold(long usedMemory, long maxMemory); + } + + public HeapMemoryMonitor(double threshold) { + this.threshold = threshold <= 0.0d || threshold > 1.0d ? DEFAULT_THRESHOLD : threshold; + setupTenuredGenPoolThreshold(tenuredGenPool); + } + + private void setupTenuredGenPoolThreshold(final MemoryPoolMXBean tenuredGenPool) { + if (tenuredGenPool == null) { + return; + } + for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) { + final long memoryThreshold = (int) Math.floor(pool.getUsage().getMax() * threshold); + final boolean isTenured = isTenured(pool); + if (!isTenured) { + continue; + } + // set memory threshold on memory used after GC + final boolean isCollectionUsageThresholdSupported = pool.isCollectionUsageThresholdSupported(); + if (isCollectionUsageThresholdSupported) { + LOG.info("Setting collection usage threshold to {}", memoryThreshold); + pool.setCollectionUsageThreshold(memoryThreshold); + return; + } else { + // if collection usage threshold is not support, worst case set memory threshold on memory usage (before GC) + final boolean isUsageThresholdSupported = pool.isUsageThresholdSupported(); + if (isUsageThresholdSupported) { + LOG.info("Setting usage threshold to {}", memoryThreshold); + pool.setUsageThreshold(memoryThreshold); + return; + } + } + } + } + + private static MemoryPoolMXBean getTenuredGenPool() { + for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) { + final String vendor = System.getProperty("java.vendor"); + final boolean isTenured = isTenured(pool); + if (!isTenured) { + continue; + } + final boolean isCollectionUsageThresholdSupported = pool.isCollectionUsageThresholdSupported(); + if (isCollectionUsageThresholdSupported) { + return pool; + } else { + final boolean isUsageThresholdSupported = pool.isUsageThresholdSupported(); + if (isUsageThresholdSupported) { + return pool; + } else { + LOG.error("{} vendor does not support isCollectionUsageThresholdSupported() and isUsageThresholdSupported()" + + " for tenured memory pool '{}'.", vendor, pool.getName()); + } + } + } + return null; + } + + + private static boolean isTenured(MemoryPoolMXBean memoryPoolMXBean) { + if (memoryPoolMXBean.getType() != MemoryType.HEAP) { + return false; + } + + String name = memoryPoolMXBean.getName(); + return name.equals("CMS Old Gen") // CMS + || name.equals("PS Old Gen") // Parallel GC + || name.equals("G1 Old Gen") // G1GC + // other vendors like IBM, Azul etc. use different names + || name.equals("Old Space") + || name.equals("Tenured Gen") + || name.equals("Java heap") + || name.equals("GenPauseless Old Gen"); + } + + public void registerListener(final Listener listener) { + listeners.add(listener); + } + + public MemoryUsage getTenuredGenMemoryUsage() { + if (tenuredGenPool == null) { + return null; + } + return tenuredGenPool.getUsage(); + } + + public void start() { + // unsupported if null + if (tenuredGenPool == null) { + return; + } + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + NotificationEmitter emitter = (NotificationEmitter) mxBean; + emitter.addNotificationListener((n, hb) -> { + if (n.getType().equals( + MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { + long maxMemory = tenuredGenPool.getUsage().getMax(); + long usedMemory = tenuredGenPool.getUsage().getUsed(); + for (Listener listener : listeners) { + listener.memoryUsageAboveThreshold(usedMemory, maxMemory); + } + } + }, null, null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6358ff3..d0eb2a4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1899,6 +1899,19 @@ public class HiveConf extends Configuration { " (split generation reads and caches file footers). HYBRID chooses between the above strategies" + " based on heuristics."), + // hive streaming ingest settings + HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", true, "Whether to enable memory \n" + + "monitoring and automatic flushing of open record updaters during streaming ingest. This is an expert level \n" + + "setting and disabling this may have severe performance impact under memory pressure."), + HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD("hive.heap.memory.monitor.usage.threshold", 0.7f, + "Hive streaming does automatic memory management across all open record writers. This threshold will let the \n" + + "memory monitor take an action (flush open files) when heap memory usage exceeded this threshold."), + HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE("hive.streaming.auto.flush.check.interval.size", "100Mb", + new SizeValidator(), + "Hive streaming ingest has auto flush mechanism to flush all open record updaters under memory pressure.\n" + + "When memory usage exceed hive.heap.memory.monitor.default.usage.threshold, the auto-flush mechanism will \n" + + "wait until this size (default 100Mb) of records are ingested before triggering flush."), + HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false, "Whether to enable using file metadata cache in metastore for ORC file footers."), HIVE_ORC_MS_FOOTER_CACHE_PPD("hive.orc.splits.ms.footer.cache.ppd.enabled", true, http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 0aed172..737e677 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; - import java.io.IOException; +import org.apache.hadoop.hive.serde2.SerDeStats; + /** * API for supporting updating records. */ @@ -73,4 +72,11 @@ public interface RecordUpdater { * @return SerDeStats */ SerDeStats getStats(); + + /** + * Returns the number of rows in memory before flush(). + * + * @return - buffered row count + */ + long getBufferedRowCount(); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index e69d1a0..835f0e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -227,6 +227,11 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> return null; } + @Override + public long getBufferedRowCount() { + return 0; + } + private void stringifyObject(StringBuilder buffer, Object obj, ObjectInspector inspector http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 0f03dbc..e187ce1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -106,6 +106,8 @@ public class OrcRecordUpdater implements RecordUpdater { // This records how many rows have been inserted or deleted. It is separate from insertedRows // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; + // used only for insert events, this is the number of rows held in memory before flush() is invoked + private long bufferedRows = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert"); private KeyIndexBuilder deleteEventIndexBuilder; private StructField recIdField = null; // field to look for the record identifier in @@ -438,6 +440,7 @@ public class OrcRecordUpdater implements RecordUpdater { addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row); } rowCountDelta++; + bufferedRows++; } @Override @@ -481,6 +484,7 @@ public class OrcRecordUpdater implements RecordUpdater { flushLengths.writeLong(len); OrcInputFormat.SHIMS.hflush(flushLengths); } + bufferedRows = 0; //multiple transactions only happen for streaming ingest which only allows inserts assert deleteEventWriter == null : "unexpected delete writer for " + path; } @@ -539,6 +543,11 @@ public class OrcRecordUpdater implements RecordUpdater { return stats; } + @Override + public long getBufferedRowCount() { + return bufferedRows; + } + static RecordIdentifier[] parseKeyIndex(Reader reader) { String[] stripes; try { http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 7f6077c..71127c2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -773,6 +773,11 @@ public class TestFileSinkOperator { stats.setRowCount(numRecordsAdded); return stats; } + + @Override + public long getBufferedRowCount() { + return records.size(); + } }; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 643bcc4..a979df0 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -20,29 +20,32 @@ package org.apache.hive.streaming; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.HeapMemoryMonitor; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -51,7 +54,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +62,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { protected HiveConf conf; private StreamingConnection conn; - protected Table tbl; + protected Table table; List<String> inputColumns; List<String> inputTypes; private String fullyQualifiedTableName; @@ -85,6 +87,32 @@ public abstract class AbstractRecordWriter implements RecordWriter { private AcidOutputFormat<?, ?> acidOutputFormat; private Long curBatchMinWriteId; private Long curBatchMaxWriteId; + private HeapMemoryMonitor heapMemoryMonitor; + // if low memory canary is set and if records after set canary exceeds threshold, trigger a flush. + // This is to avoid getting notified of low memory too often and flushing too often. + private AtomicBoolean lowMemoryCanary; + private long ingestSizeBytes = 0; + private boolean autoFlush; + private float memoryUsageThreshold; + private long ingestSizeThreshold; + + private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName()); + private final AtomicBoolean lowMemoryCanary; + + OrcMemoryPressureMonitor(final AtomicBoolean lowMemoryCanary) { + this.lowMemoryCanary = lowMemoryCanary; + } + + @Override + public void memoryUsageAboveThreshold(final long usedMemory, final long maxMemory) { + if (LOG.isDebugEnabled()) { + LOG.debug("Orc memory pressure notified! usedMemory: {} maxMemory: {}.", + LlapUtil.humanReadableByteCount(usedMemory), LlapUtil.humanReadableByteCount(maxMemory)); + } + lowMemoryCanary.set(true); + } + } @Override public void init(StreamingConnection conn, long minWriteId, long maxWriteId) throws StreamingException { @@ -97,21 +125,22 @@ public abstract class AbstractRecordWriter implements RecordWriter { this.curBatchMaxWriteId = maxWriteId; this.conf = conn.getHiveConf(); this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); - final IMetaStoreClient msClient = HiveMetaStoreUtils.getHiveMetastoreClient(this.conf); - this.tbl = msClient.getTable(conn.getDatabase(), conn.getTable()); - this.inputColumns = tbl.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList()); - this.inputTypes = tbl.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList()); + this.table = conn.getTable(); + this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList()); + this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList()); if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { - this.partitionColumns = tbl.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + this.partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toList()); this.inputColumns.addAll(partitionColumns); - this.inputTypes.addAll(tbl.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList())); + this.inputTypes + .addAll(table.getPartitionKeys().stream().map(FieldSchema::getType).collect(Collectors.toList())); } - this.fullyQualifiedTableName = Warehouse.getQualifiedName(conn.getDatabase(), conn.getTable()); - String outFormatName = this.tbl.getSd().getOutputFormat(); - this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); - } catch (MetaException | NoSuchObjectException e) { - throw new ConnectionError(conn, e); - } catch (TException | ClassNotFoundException | IOException e) { + this.fullyQualifiedTableName = Warehouse.getQualifiedName(table.getDbName(), table.getTableName()); + String outFormatName = this.table.getSd().getOutputFormat(); + this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils + .newInstance(JavaUtils.loadClass(outFormatName), conf); + setupMemoryMonitoring(); + } catch (ClassNotFoundException e) { throw new StreamingException(e.getMessage(), e); } @@ -120,7 +149,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { this.inputRowObjectInspector = (StructObjectInspector) serDe.getObjectInspector(); if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) { preparePartitioningFields(); - int dpStartCol = inputRowObjectInspector.getAllStructFieldRefs().size() - tbl.getPartitionKeys().size(); + int dpStartCol = inputRowObjectInspector.getAllStructFieldRefs().size() - table.getPartitionKeys().size(); this.outputRowObjectInspector = new SubStructObjectInspector(inputRowObjectInspector, 0, dpStartCol); } else { this.outputRowObjectInspector = inputRowObjectInspector; @@ -131,14 +160,36 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } + private void setupMemoryMonitoring() { + this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED); + this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD); + this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE); + LOG.info("Memory monitorings settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}", + autoFlush, memoryUsageThreshold, ingestSizeBytes); + this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold); + MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage(); + if (tenuredMemUsage != null) { + lowMemoryCanary = new AtomicBoolean(false); + heapMemoryMonitor.registerListener(new OrcMemoryPressureMonitor(lowMemoryCanary)); + heapMemoryMonitor.start(); + // alert if we already running low on memory (starting with low memory will lead to frequent auto flush) + float currentUsage = (float) tenuredMemUsage.getUsed() / (float) tenuredMemUsage.getMax(); + if (currentUsage > memoryUsageThreshold) { + LOG.warn("LOW MEMORY ALERT! Tenured gen memory is already low. Increase memory to improve performance." + + " Used: {} Max: {}", LlapUtil.humanReadableByteCount(tenuredMemUsage.getUsed()), + LlapUtil.humanReadableByteCount(tenuredMemUsage.getMax())); + } + } + } + private void prepareBucketingFields() { - this.isBucketed = tbl.getSd().getNumBuckets() > 0; + this.isBucketed = table.getSd().getNumBuckets() > 0; // For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which // ends up writing to a file bucket_000000. // See also {@link #getBucket(Object)} - this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1; + this.totalBuckets = isBucketed ? table.getSd().getNumBuckets() : 1; if (isBucketed) { - this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()); + this.bucketIds = getBucketColIDs(table.getSd().getBucketCols(), table.getSd().getCols()); this.bucketFieldData = new Object[bucketIds.size()]; this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, inputRowObjectInspector); this.bucketStructFields = new StructField[bucketIds.size()]; @@ -150,7 +201,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } private void preparePartitioningFields() { - final int numPartitions = tbl.getPartitionKeys().size(); + final int numPartitions = table.getPartitionKeys().size(); this.partitionFieldData = new Object[numPartitions]; this.partitionObjInspectors = new ObjectInspector[numPartitions]; int startIdx = inputRowObjectInspector.getAllStructFieldRefs().size() - numPartitions; @@ -186,6 +237,12 @@ public abstract class AbstractRecordWriter implements RecordWriter { return result; } + /** + * Create SerDe for the record writer. + * + * @return - serde + * @throws SerializationError - if serde cannot be created. + */ public abstract AbstractSerDe createSerde() throws SerializationError; /** @@ -205,7 +262,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } Object[] bucketFields = getBucketFields(row); int bucketingVersion = Utilities.getBucketingVersion( - tbl.getParameters().get(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); + table.getParameters().get(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); return bucketingVersion == 2 ? ObjectInspectorUtils.getBucketNumber(bucketFields, bucketObjInspectors, totalBuckets) : @@ -239,6 +296,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void flush() throws StreamingIOFailure { try { + logStats("Stats before flush:"); for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) { LOG.info("Flushing record updater for partitions: {}", entry.getKey()); for (RecordUpdater updater : entry.getValue()) { @@ -247,6 +305,8 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } } + ingestSizeBytes = 0; + logStats("Stats after flush:"); } catch (IOException e) { throw new StreamingIOFailure("Unable to flush recordUpdater", e); } @@ -256,6 +316,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { public void close() throws StreamingIOFailure { boolean haveError = false; String partition = null; + logStats("Stats before close:"); for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) { partition = entry.getKey(); LOG.info("Closing updater for partitions: {}", partition); @@ -273,6 +334,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { entry.getValue().clear(); } updaters.clear(); + logStats("Stats after close:"); if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } @@ -306,6 +368,8 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void write(final long writeId, final byte[] record) throws StreamingException { + checkAutoFlush(); + ingestSizeBytes += record.length; try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); @@ -317,6 +381,33 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } + private void checkAutoFlush() throws StreamingIOFailure { + if (!autoFlush) { + return; + } + if (lowMemoryCanary != null) { + if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) { + LOG.info("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " + + "Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + flush(); + conn.getConnectionStats().incrementAutoFlushCount(); + lowMemoryCanary.set(false); + } + } else { + if (ingestSizeBytes > ingestSizeThreshold) { + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage heapUsage = mxBean.getHeapMemoryUsage(); + float memUsedFraction = ((float) heapUsage.getUsed() / (float) heapUsage.getMax()); + if (memUsedFraction > memoryUsageThreshold) { + LOG.info("Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " + + "Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold)); + flush(); + conn.getConnectionStats().incrementAutoFlushCount(); + } + } + } + } + @Override public Set<String> getPartitions() { return addedPartitions; @@ -329,7 +420,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { // may define certain table parameters that may be required while writing. The table parameter // 'transactional_properties' is one such example. Properties tblProperties = new Properties(); - tblProperties.putAll(tbl.getParameters()); + tblProperties.putAll(table.getParameters()); return acidOutputFormat.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(outputRowObjectInspector) @@ -354,7 +445,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } else { // un-partitioned table if (partitionValues == null) { - destLocation = new Path(tbl.getSd().getLocation()); + destLocation = new Path(table.getSd().getLocation()); } else { PartitionInfo partitionInfo = conn.createPartitionIfNotExists(partitionValues); // collect the newly added partitions. connection.commitTransaction() will report the dynamically added @@ -395,4 +486,28 @@ public abstract class AbstractRecordWriter implements RecordWriter { } return result; } + + private void logStats(final String prefix) { + int openRecordUpdaters = updaters.values() + .stream() + .mapToInt(List::size) + .sum(); + long bufferedRecords = updaters.values() + .stream() + .flatMap(List::stream) + .filter(Objects::nonNull) + .mapToLong(RecordUpdater::getBufferedRowCount) + .sum(); + MemoryUsage memoryUsage = heapMemoryMonitor.getTenuredGenMemoryUsage(); + String oldGenUsage = "NA"; + if (memoryUsage != null) { + oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" + + LlapUtil.humanReadableByteCount(memoryUsage.getMax()); + } + LOG.info("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " + + "buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: {}]", prefix, openRecordUpdaters, + partitionPaths.size(), bufferedRecords, conn.getConnectionStats().getRecordsWritten(), + LlapUtil.humanReadableByteCount(ingestSizeBytes), + LlapUtil.humanReadableByteCount(conn.getConnectionStats().getRecordsSize()), oldGenUsage); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java b/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java index ca8babf..511ffdd 100644 --- a/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java +++ b/streaming/src/java/org/apache/hive/streaming/ConnectionInfo.java @@ -20,6 +20,8 @@ package org.apache.hive.streaming; import java.util.List; +import org.apache.hadoop.hive.ql.metadata.Table; + /** * Helper interface to get connection related information. */ @@ -33,18 +35,11 @@ public interface ConnectionInfo { String getMetastoreUri(); /** - * Get the database used by streaming connection. - * - * @return - database - */ - String getDatabase(); - - /** * Get the table used by streaming connection. * * @return - table */ - String getTable(); + Table getTable(); /** * Get any static partitions specified during streaming connection creation. http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java new file mode 100644 index 0000000..355456e --- /dev/null +++ b/streaming/src/java/org/apache/hive/streaming/ConnectionStats.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.streaming; + +import java.util.concurrent.atomic.LongAdder; + +/** + * Store statistics about streaming connection. + */ +public class ConnectionStats { + private LongAdder recordsWritten = new LongAdder(); + private LongAdder recordsSize = new LongAdder(); + private LongAdder committedTransactions = new LongAdder(); + private LongAdder abortedTransactions = new LongAdder(); + private LongAdder autoFlushCount = new LongAdder(); + private LongAdder metastoreCalls = new LongAdder(); + + public void incrementRecordsWritten() { + recordsWritten.increment(); + } + + public void incrementCommittedTransactions() { + committedTransactions.increment(); + } + + public void incrementAbortedTransactions() { + abortedTransactions.increment(); + } + + public void incrementAutoFlushCount() { + autoFlushCount.increment(); + } + + public void incrementMetastoreCalls() { + metastoreCalls.increment(); + } + + public void incrementRecordsSize(long delta) { + recordsSize.add(delta); + } + + public long getRecordsWritten() { + return recordsWritten.longValue(); + } + + public long getRecordsSize() { + return recordsSize.longValue(); + } + + public long getCommittedTransactions() { + return committedTransactions.longValue(); + } + + public long getAbortedTransactions() { + return abortedTransactions.longValue(); + } + + public long getAutoFlushCount() { + return autoFlushCount.longValue(); + } + + public long getMetastoreCalls() { + return metastoreCalls.longValue(); + } + + @Override + public String toString() { + return "{records-written: " + recordsWritten + ", records-size: "+ recordsSize + ", committed-transactions: " + + committedTransactions + ", aborted-transactions: " + abortedTransactions + ", auto-flushes: " + autoFlushCount + + ", metastore-calls: " + metastoreCalls + " }"; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 205ed6c..c4da7af 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -158,6 +158,7 @@ public class HiveStreamingConnection implements StreamingConnection { private final boolean secureMode; private Table tableObject = null; private String metastoreUri; + private ConnectionStats connectionStats; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -181,6 +182,7 @@ public class HiveStreamingConnection implements StreamingConnection { } this.transactionBatchSize = builder.transactionBatchSize; this.recordWriter = builder.recordWriter; + this.connectionStats = new ConnectionStats(); if (agentInfo == null) { try { agentInfo = username + ":" + InetAddress.getLocalHost().getHostName() + ":" + Thread.currentThread().getName(); @@ -194,10 +196,10 @@ public class HiveStreamingConnection implements StreamingConnection { } overrideConfSettings(conf); this.metastoreUri = conf.get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); - this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode); + this.msClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection"); // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are // isolated from the other transaction related RPC calls. - this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode); + this.heartbeatMSClient = getMetaStoreClient(conf, metastoreUri, secureMode, "streaming-connection-heartbeat"); validateTable(); LOG.info("STREAMING CONNECTION INFO: {}", toConnectionInfoString()); } @@ -369,7 +371,7 @@ public class HiveStreamingConnection implements StreamingConnection { partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); addPartitionDesc.addPartition(partSpec, partLocation); Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); - msClient.add_partition(partition); + getMSC().add_partition(partition); } catch (AlreadyExistsException e) { exists = true; } catch (HiveException | TException e) { @@ -379,9 +381,19 @@ public class HiveStreamingConnection implements StreamingConnection { return new PartitionInfo(partName, partLocation, exists); } + IMetaStoreClient getMSC() { + connectionStats.incrementMetastoreCalls(); + return msClient; + } + + IMetaStoreClient getHeatbeatMSC() { + connectionStats.incrementMetastoreCalls(); + return heartbeatMSClient; + } + private void validateTable() throws InvalidTable, ConnectionError { try { - tableObject = new Table(msClient.getTable(database, table)); + tableObject = new Table(getMSC().getTable(database, table)); } catch (Exception e) { LOG.warn("Unable to validate the table for connection: " + toConnectionInfoString(), e); throw new InvalidTable(database, table, e); @@ -408,15 +420,15 @@ public class HiveStreamingConnection implements StreamingConnection { } private static class HeartbeatRunnable implements Runnable { - private final IMetaStoreClient heartbeatMSClient; + private final HiveStreamingConnection conn; private final AtomicLong minTxnId; private final long maxTxnId; private final ReentrantLock transactionLock; private final AtomicBoolean isTxnClosed; - HeartbeatRunnable(final IMetaStoreClient heartbeatMSClient, final AtomicLong minTxnId, final long maxTxnId, + HeartbeatRunnable(final HiveStreamingConnection conn, final AtomicLong minTxnId, final long maxTxnId, final ReentrantLock transactionLock, final AtomicBoolean isTxnClosed) { - this.heartbeatMSClient = heartbeatMSClient; + this.conn = conn; this.minTxnId = minTxnId; this.maxTxnId = maxTxnId; this.transactionLock = transactionLock; @@ -428,7 +440,7 @@ public class HiveStreamingConnection implements StreamingConnection { transactionLock.lock(); try { if (minTxnId.get() > 0) { - HeartbeatTxnRangeResponse resp = heartbeatMSClient.heartbeatTxnRange(minTxnId.get(), maxTxnId); + HeartbeatTxnRangeResponse resp = conn.getHeatbeatMSC().heartbeatTxnRange(minTxnId.get(), maxTxnId); if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { LOG.error("Heartbeat failure: {}", resp.toString()); isTxnClosed.set(true); @@ -488,6 +500,8 @@ public class HiveStreamingConnection implements StreamingConnection { public void write(final byte[] record) throws StreamingException { checkState(); currentTransactionBatch.write(record); + connectionStats.incrementRecordsWritten(); + connectionStats.incrementRecordsSize(record.length); } @Override @@ -500,12 +514,14 @@ public class HiveStreamingConnection implements StreamingConnection { public void commitTransaction() throws StreamingException { checkState(); currentTransactionBatch.commit(); + connectionStats.incrementCommittedTransactions(); } @Override public void abortTransaction() throws StreamingException { checkState(); currentTransactionBatch.abort(); + connectionStats.incrementAbortedTransactions(); } @Override @@ -521,12 +537,19 @@ public class HiveStreamingConnection implements StreamingConnection { } catch (StreamingException e) { LOG.error("Unable to close current transaction batch: " + currentTransactionBatch, e); } finally { - msClient.close(); - heartbeatMSClient.close(); + getMSC().close(); + getHeatbeatMSC().close(); } + LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats()); + } + + @Override + public ConnectionStats getConnectionStats() { + return connectionStats; } - private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode) + private static IMetaStoreClient getMetaStoreClient(HiveConf conf, String metastoreUri, boolean secureMode, + String owner) throws ConnectionError { if (metastoreUri != null) { conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri); @@ -535,6 +558,7 @@ public class HiveStreamingConnection implements StreamingConnection { conf.setBoolean(MetastoreConf.ConfVars.USE_THRIFT_SASL.getHiveName(), true); } try { + LOG.info("Creating metastore client for {}", owner); return HiveMetaStoreUtils.getHiveMetastoreClient(conf); } catch (MetaException | IOException e) { throw new ConnectionError("Error connecting to Hive Metastore URI: " @@ -560,8 +584,6 @@ public class HiveStreamingConnection implements StreamingConnection { private static class TransactionBatch { private String username; private HiveStreamingConnection conn; - private IMetaStoreClient msClient; - private IMetaStoreClient heartbeatMSClient; private ScheduledExecutorService scheduledExecutorService; private RecordWriter recordWriter; private String partNameForLock = null; @@ -614,16 +636,14 @@ public class HiveStreamingConnection implements StreamingConnection { } this.conn = conn; this.username = conn.username; - this.msClient = conn.msClient; - this.heartbeatMSClient = conn.heartbeatMSClient; this.recordWriter = conn.recordWriter; this.agentInfo = conn.agentInfo; this.numTxns = conn.transactionBatchSize; setupHeartBeatThread(); - List<Long> txnIds = openTxnImpl(msClient, username, numTxns); - txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds); + List<Long> txnIds = openTxnImpl(username, numTxns); + txnToWriteIds = allocateWriteIdsImpl(txnIds); assert (txnToWriteIds.size() == numTxns); txnStatus = new TxnState[numTxns]; @@ -664,19 +684,17 @@ public class HiveStreamingConnection implements StreamingConnection { initialDelay = (long) (heartBeatInterval * 0.75 * Math.random()); LOG.info("Starting heartbeat thread with interval: {} ms initialDelay: {} ms for agentInfo: {}", heartBeatInterval, initialDelay, conn.agentInfo); - Runnable runnable = new HeartbeatRunnable(heartbeatMSClient, minTxnId, maxTxnId, transactionLock, isTxnClosed); + Runnable runnable = new HeartbeatRunnable(conn, minTxnId, maxTxnId, transactionLock, isTxnClosed); this.scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, heartBeatInterval, TimeUnit .MILLISECONDS); } - private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns) - throws TException { - return msClient.openTxns(user, numTxns).getTxn_ids(); + private List<Long> openTxnImpl(final String user, final int numTxns) throws TException { + return conn.getMSC().openTxns(user, numTxns).getTxn_ids(); } - private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient, - final List<Long> txnIds) throws TException { - return msClient.allocateTableWriteIdsBatch(txnIds, conn.database, conn.table); + private List<TxnToWriteId> allocateWriteIdsImpl(final List<Long> txnIds) throws TException { + return conn.getMSC().allocateTableWriteIdsBatch(txnIds, conn.database, conn.table); } @Override @@ -714,7 +732,7 @@ public class HiveStreamingConnection implements StreamingConnection { lastTxnUsed = getCurrentTxnId(); lockRequest = createLockRequest(conn, partNameForLock, username, getCurrentTxnId(), agentInfo); try { - LockResponse res = msClient.lock(lockRequest); + LockResponse res = conn.getMSC().lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { throw new TransactionError("Unable to acquire lock on " + conn); } @@ -813,12 +831,12 @@ public class HiveStreamingConnection implements StreamingConnection { TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); if (conn.isDynamicPartitioning()) { List<String> partNames = new ArrayList<>(recordWriter.getPartitions()); - msClient.addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table, + conn.getMSC().addDynamicPartitions(txnToWriteId.getTxnId(), txnToWriteId.getWriteId(), conn.database, conn.table, partNames, DataOperationType.INSERT); } transactionLock.lock(); try { - msClient.commitTxn(txnToWriteId.getTxnId()); + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. if (currentTxnIndex + 1 < txnToWriteIds.size()) { @@ -873,7 +891,7 @@ public class HiveStreamingConnection implements StreamingConnection { (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); for (currentTxnIndex = minOpenTxnIndex; currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { - msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); + conn.getMSC().rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); txnStatus[currentTxnIndex] = TxnState.ABORTED; } currentTxnIndex--;//since the loop left it == txnToWriteIds.size() @@ -888,7 +906,7 @@ public class HiveStreamingConnection implements StreamingConnection { } long currTxnId = getCurrentTxnId(); if (currTxnId > 0) { - msClient.rollbackTxn(currTxnId); + conn.getMSC().rollbackTxn(currTxnId); txnStatus[currentTxnIndex] = TxnState.ABORTED; } } @@ -1008,13 +1026,8 @@ public class HiveStreamingConnection implements StreamingConnection { } @Override - public String getDatabase() { - return database; - } - - @Override - public String getTable() { - return table; + public Table getTable() { + return tableObject; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index cd7f3d8..5396d76 100644 --- a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -61,4 +61,11 @@ public interface StreamingConnection extends ConnectionInfo, PartitionHandler { * Closes streaming connection. */ void close(); + + /** + * Gets stats about the streaming connection. + * + * @return - connection stats + */ + ConnectionStats getConnectionStats(); } http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 4a07435..56f59fd 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -21,7 +21,6 @@ package org.apache.hive.streaming; import java.util.Properties; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -93,7 +92,7 @@ public class StrictDelimitedInputWriter extends AbstractRecordWriter { @Override public LazySimpleSerDe createSerde() throws SerializationError { try { - Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + Properties tableProps = table.getMetadata(); tableProps.setProperty(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(inputColumns)); tableProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(":").join(inputTypes)); tableProps.setProperty(serdeConstants.FIELD_DELIM, String.valueOf(fieldDelimiter)); http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 1600e7c..2c54eef 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -20,7 +20,6 @@ package org.apache.hive.streaming; import java.util.Properties; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.serde2.JsonSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -53,7 +52,7 @@ public class StrictJsonWriter extends AbstractRecordWriter { @Override public JsonSerDe createSerde() throws SerializationError { try { - Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + Properties tableProps = table.getMetadata(); JsonSerDe serde = new JsonSerDe(); SerDeUtils.initializeSerDe(serde, conf, tableProps, null); this.serde = serde; http://git-wip-us.apache.org/repos/asf/hive/blob/cc52e9b2/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 563cf66..ba2609f 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -21,7 +21,6 @@ package org.apache.hive.streaming; import java.util.Properties; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.RegexSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -67,7 +66,7 @@ public class StrictRegexWriter extends AbstractRecordWriter { @Override public RegexSerDe createSerde() throws SerializationError { try { - Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + Properties tableProps = table.getMetadata(); tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(inputColumns, ",")); RegexSerDe serde = new RegexSerDe();