HIVE-19534: Allow implementations to access member variables of AbstractRecordWriter (Prasanth Jayachandran reviewed by Matt Burgess, Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3ea0356f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3ea0356f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3ea0356f Branch: refs/heads/branch-3.0.0 Commit: 3ea0356f7dd9fc4d3406806d80c349187afd9d64 Parents: 66f6748 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon May 14 17:19:34 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon May 14 17:19:34 2018 -0700 ---------------------------------------------------------------------- .../hive/streaming/AbstractRecordWriter.java | 92 ++++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3ea0356f/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index b6c8890..0866850 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -64,48 +64,48 @@ public abstract class AbstractRecordWriter implements RecordWriter { private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; protected HiveConf conf; - private StreamingConnection conn; + protected StreamingConnection conn; protected Table table; - List<String> inputColumns; - List<String> inputTypes; - private String fullyQualifiedTableName; - private Map<String, List<RecordUpdater>> updaters = new HashMap<>(); - private Map<String, Path> partitionPaths = new HashMap<>(); - private Set<String> addedPartitions = new HashSet<>(); + protected List<String> inputColumns; + protected List<String> inputTypes; + protected String fullyQualifiedTableName; + protected Map<String, List<RecordUpdater>> updaters = new HashMap<>(); + protected Map<String, Path> partitionPaths = new HashMap<>(); + protected Set<String> addedPartitions = new HashSet<>(); // input OI includes table columns + partition columns - private StructObjectInspector inputRowObjectInspector; + protected StructObjectInspector inputRowObjectInspector; // output OI strips off the partition columns and retains other columns - private ObjectInspector outputRowObjectInspector; - private List<String> partitionColumns = new ArrayList<>(); - private ObjectInspector[] partitionObjInspectors = null; - private StructField[] partitionStructFields = null; - private Object[] partitionFieldData; - private ObjectInspector[] bucketObjInspectors = null; - private StructField[] bucketStructFields = null; - private Object[] bucketFieldData; - private List<Integer> bucketIds = new ArrayList<>(); - private int totalBuckets; - private String defaultPartitionName; - private boolean isBucketed; - private AcidOutputFormat<?, ?> acidOutputFormat; - private Long curBatchMinWriteId; - private Long curBatchMaxWriteId; - private final String lineDelimiter; - private HeapMemoryMonitor heapMemoryMonitor; + protected ObjectInspector outputRowObjectInspector; + protected List<String> partitionColumns = new ArrayList<>(); + protected ObjectInspector[] partitionObjInspectors = null; + protected StructField[] partitionStructFields = null; + protected Object[] partitionFieldData; + protected ObjectInspector[] bucketObjInspectors = null; + protected StructField[] bucketStructFields = null; + protected Object[] bucketFieldData; + protected List<Integer> bucketIds = new ArrayList<>(); + protected int totalBuckets; + protected String defaultPartitionName; + protected boolean isBucketed; + protected AcidOutputFormat<?, ?> acidOutputFormat; + protected Long curBatchMinWriteId; + protected Long curBatchMaxWriteId; + protected final String lineDelimiter; + protected HeapMemoryMonitor heapMemoryMonitor; // if low memory canary is set and if records after set canary exceeds threshold, trigger a flush. // This is to avoid getting notified of low memory too often and flushing too often. - private AtomicBoolean lowMemoryCanary; - private long ingestSizeBytes = 0; - private boolean autoFlush; - private float memoryUsageThreshold; - private long ingestSizeThreshold; + protected AtomicBoolean lowMemoryCanary; + protected long ingestSizeBytes = 0; + protected boolean autoFlush; + protected float memoryUsageThreshold; + protected long ingestSizeThreshold; public AbstractRecordWriter(final String lineDelimiter) { this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ? DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; } - private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { + protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName()); private final AtomicBoolean lowMemoryCanary; @@ -179,7 +179,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - private void setupMemoryMonitoring() { + protected void setupMemoryMonitoring() { this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED); this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD); this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE); @@ -201,7 +201,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - private void prepareBucketingFields() { + protected void prepareBucketingFields() { this.isBucketed = table.getSd().getNumBuckets() > 0; // For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which // ends up writing to a file bucket_000000. @@ -219,7 +219,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - private void preparePartitioningFields() { + protected void preparePartitioningFields() { final int numPartitions = table.getPartitionKeys().size(); this.partitionFieldData = new Object[numPartitions]; this.partitionObjInspectors = new ObjectInspector[numPartitions]; @@ -240,12 +240,12 @@ public abstract class AbstractRecordWriter implements RecordWriter { /** * used to tag error msgs to provided some breadcrumbs */ - private String getWatermark(String partition) { + protected String getWatermark(String partition) { return partition + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]"; } // return the column numbers of the bucketed columns - private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) { + protected List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) { ArrayList<Integer> result = new ArrayList<>(bucketCols.size()); HashSet<String> bucketSet = new HashSet<>(bucketCols); for (int i = 0; i < cols.size(); i++) { @@ -275,7 +275,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { public abstract Object encode(byte[] record) throws SerializationError; // returns the bucket number to which the record belongs to - private int getBucket(Object row) { + protected int getBucket(Object row) { if (!isBucketed) { return 0; } @@ -288,7 +288,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { ObjectInspectorUtils.getBucketNumberOld(bucketFields, bucketObjInspectors, totalBuckets); } - private List<String> getPartitionValues(final Object row) { + protected List<String> getPartitionValues(final Object row) { if (!conn.isPartitionedTable()) { return null; } @@ -359,7 +359,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - private static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds + protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds , StructObjectInspector recordObjInspector) { ObjectInspector[] result = new ObjectInspector[bucketIds.size()]; @@ -371,14 +371,14 @@ public abstract class AbstractRecordWriter implements RecordWriter { return result; } - private Object[] getBucketFields(Object row) { + protected Object[] getBucketFields(Object row) { for (int i = 0; i < bucketIds.size(); i++) { bucketFieldData[i] = inputRowObjectInspector.getStructFieldData(row, bucketStructFields[i]); } return bucketFieldData; } - private Object[] getPartitionFields(Object row) { + protected Object[] getPartitionFields(Object row) { for (int i = 0; i < partitionFieldData.length; i++) { partitionFieldData[i] = inputRowObjectInspector.getStructFieldData(row, partitionStructFields[i]); } @@ -412,7 +412,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - private void checkAutoFlush() throws StreamingIOFailure { + protected void checkAutoFlush() throws StreamingIOFailure { if (!autoFlush) { return; } @@ -444,7 +444,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { return addedPartitions; } - private RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, + protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId, Long maxWriteID) throws IOException { // Initialize table properties from the table parameters. This is required because the table @@ -463,7 +463,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { .finalDestination(partitionPath)); } - private RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure { + protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure { RecordUpdater recordUpdater; String key; Path destLocation; @@ -510,7 +510,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { return recordUpdater; } - private List<RecordUpdater> initializeBuckets() { + protected List<RecordUpdater> initializeBuckets() { List<RecordUpdater> result = new ArrayList<>(totalBuckets); for (int bucket = 0; bucket < totalBuckets; bucket++) { result.add(bucket, null); //so that get(i) returns null rather than ArrayOutOfBounds @@ -518,7 +518,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { return result; } - private void logStats(final String prefix) { + protected void logStats(final String prefix) { int openRecordUpdaters = updaters.values() .stream() .mapToInt(List::size)