HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator (Ashutosh Chauhan via Prasanth J)
Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24988f77 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24988f77 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24988f77 Branch: refs/heads/llap Commit: 24988f77f2898bbcd91f5665b865bcc251e3cade Parents: 522bb60 Author: Ashutosh Chauhan <hashut...@apache.org> Authored: Sat Sep 26 12:19:00 2015 -0800 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Thu Oct 1 11:41:53 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/FileSinkOperator.java | 19 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 17 + .../optimizer/ConstantPropagateProcFactory.java | 11 +- .../hive/ql/optimizer/GenMapRedUtils.java | 10 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 30 +- .../hive/ql/plan/DynamicPartitionCtx.java | 27 -- .../hive/ql/exec/TestFileSinkOperator.java | 384 ++++++++++++------- 7 files changed, 284 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2604d5d..39944a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -493,24 +493,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has " + inputObjInspectors.length; StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; - // remove the last dpMapping.size() columns from the OI - List<? extends StructField> fieldOI = soi.getAllStructFieldRefs(); - ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>(); - ArrayList<String> newFieldsName = new ArrayList<String>(); - this.dpStartCol = 0; - for (StructField sf : fieldOI) { - String fn = sf.getFieldName(); - if (!dpCtx.getInputToDPCols().containsKey(fn)) { - newFieldsOI.add(sf.getFieldObjectInspector()); - newFieldsName.add(sf.getFieldName()); - this.dpStartCol++; - } else { - // once we found the start column for partition column we are done - break; - } - } - assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; - + this.dpStartCol = Utilities.getDPColOffset(conf); this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol); this.dpVals = new ArrayList<String>(numDynParts); this.dpWritables = new ArrayList<Object>(numDynParts); http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index bcf85a4..5b21af9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -3916,4 +3917,20 @@ public final class Utilities { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, ""); } } + + public static int getDPColOffset(FileSinkDesc conf) { + + if (conf.getWriteType() == AcidUtils.Operation.DELETE) { + // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return 1; + } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { + // For updates, ROW__ID is an extra column at index 0. + //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. + return getColumnNames(conf.getTableInfo().getProperties()).size() + 1; + } else { + return getColumnNames(conf.getTableInfo().getProperties()).size(); + } + + } } http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index 5c6a6df..25156b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -843,7 +843,7 @@ public final class ConstantPropagateProcFactory { } } if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) { - // nested complex types cannot be folded cleanly + // nested complex types cannot be folded cleanly return null; } Object value = constant.getValue(); @@ -1163,16 +1163,15 @@ public final class ConstantPropagateProcFactory { DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx(); if (dpCtx != null) { - // If all dynamic partitions are propagated as constant, remove DP. - Set<String> inputs = dpCtx.getInputToDPCols().keySet(); - // Assume only 1 parent for FS operator Operator<? extends Serializable> parent = op.getParentOperators().get(0); Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent); RowSchema rs = parent.getSchema(); boolean allConstant = true; - for (String input : inputs) { - ColumnInfo ci = rs.getColumnInfo(input); + int dpColStartIdx = Utilities.getDPColOffset(fsdesc); + List<ColumnInfo> colInfos = rs.getSignature(); + for (int i = dpColStartIdx; i < colInfos.size(); i++) { + ColumnInfo ci = colInfos.get(i); if (parentConstants.get(ci) == null) { allConstant = false; break; http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 02fbdfe..c696fd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -497,9 +497,6 @@ public final class GenMapRedUtils { partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id); } catch (SemanticException e) { throw e; - } catch (HiveException e) { - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new SemanticException(e.getMessage(), e); } } @@ -990,7 +987,7 @@ public final class GenMapRedUtils { fileSinkOp.setParentOperators(Utilities.makeList(parent)); // Create a dummy TableScanOperator for the file generated through fileSinkOp - TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator( + TableScanOperator tableScanOp = createTemporaryTableScanOperator( parent.getSchema()); // Connect this TableScanOperator to child. @@ -1235,19 +1232,16 @@ public final class GenMapRedUtils { // adding DP ColumnInfo to the RowSchema signature ArrayList<ColumnInfo> signature = inputRS.getSignature(); String tblAlias = fsInputDesc.getTableInfo().getTableName(); - LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>(); for (String dpCol : dpCtx.getDPColNames()) { ColumnInfo colInfo = new ColumnInfo(dpCol, TypeInfoFactory.stringTypeInfo, // all partition column type should be string tblAlias, true); // partition column is virtual column signature.add(colInfo); - colMap.put(dpCol, dpCol); // input and output have the same column name } inputRS.setSignature(signature); // create another DynamicPartitionCtx, which has a different input-to-DP column mapping DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); - dpCtx2.setInputToDPCols(colMap); fsOutputDesc.setDynPartCtx(dpCtx2); // update the FileSinkOperator to include partition columns @@ -1896,7 +1890,7 @@ public final class GenMapRedUtils { "Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, " + Arrays.toString(partTypes)); - Map<String, String> typeMap = new HashMap(); + Map<String, String> typeMap = new HashMap<>(); for (int i = 0; i < partNames.length; i++) { String previousValue = typeMap.put(partNames[i], partTypes[i]); Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. " http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index dbc6d8f..4bec228 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -736,7 +736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Path dataDir = null; if(!qb.getEncryptedTargetTablePaths().isEmpty()) { //currently only Insert into T values(...) is supported thus only 1 values clause - //and only 1 target table are possible. If/when support for + //and only 1 target table are possible. If/when support for //select ... from values(...) is added an insert statement may have multiple //encrypted target tables. dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri()); @@ -1556,7 +1556,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { for (String alias : tabAliases) { String tab_name = qb.getTabNameForAlias(alias); - + // we first look for this alias from CTE, and then from catalog. /* * if this s a CTE reference: Add its AST as a SubQuery to this QB. @@ -6830,30 +6830,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getColumnInfos()), input), rowResolver); input.setColumnExprMap(colExprMap); } - - rowFields = opParseCtx.get(input).getRowResolver() - .getColumnInfos(); - if (deleting()) { - // Figure out if we have partition columns in the list or not. If so, - // add them into the mapping. Partition columns will be located after the row id. - if (rowFields.size() > 1) { - // This means we have partition columns to deal with, so set up the mapping from the - // input to the partition columns. - dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size())); - } - } else if (updating()) { - // In this case we expect the number of in fields to exceed the number of out fields by one - // (for the ROW__ID virtual column). If there are more columns than this, - // then the extras are for dynamic partitioning - if (dynPart && dpCtx != null) { - dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size())); - } - } else { - if (dynPart && dpCtx != null) { - // create the mapping from input ExprNode to dest table DP column - dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size())); - } - } return input; } @@ -10105,7 +10081,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return; } for (Node child : node.getChildren()) { - //each insert of multi insert looks like + //each insert of multi insert looks like //(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1))) if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) { continue; http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 24db7d0..95d5635 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.Table; public class DynamicPartitionCtx implements Serializable { @@ -43,8 +40,6 @@ public class DynamicPartitionCtx implements Serializable { private Path rootPath; // the root path DP columns paths start from private int numBuckets; // number of buckets in each partition - private Map<String, String> inputToDPCols; // mapping from input column names to DP columns - private List<String> spNames; // sp column names private List<String> dpNames; // dp column names private String defaultPartName; // default partition name in case of null or empty value @@ -71,7 +66,6 @@ public class DynamicPartitionCtx implements Serializable { } this.numDPCols = dpNames.size(); this.numSPCols = spNames.size(); - this.inputToDPCols = new HashMap<String, String>(); if (this.numSPCols > 0) { this.spPath = Warehouse.makeDynamicPartName(partSpec); } else { @@ -86,25 +80,12 @@ public class DynamicPartitionCtx implements Serializable { this.spPath = dp.spPath; this.rootPath = dp.rootPath; this.numBuckets = dp.numBuckets; - this.inputToDPCols = dp.inputToDPCols; this.spNames = dp.spNames; this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; } - public void mapInputToDP(List<ColumnInfo> fs) { - - assert fs.size() == this.numDPCols: "input DP column size != numDPCols"; - - Iterator<ColumnInfo> itr1 = fs.iterator(); - Iterator<String> itr2 = dpNames.iterator(); - - while (itr1.hasNext() && itr2.hasNext()) { - inputToDPCols.put(itr1.next().getInternalName(), itr2.next()); - } - } - public int getMaxPartitionsPerNode() { return this.maxPartsPerNode; } @@ -161,14 +142,6 @@ public class DynamicPartitionCtx implements Serializable { this.spNames = sp; } - public Map<String, String> getInputToDPCols() { - return this.inputToDPCols; - } - - public void setInputToDPCols(Map<String, String> map) { - this.inputToDPCols = map; - } - public void setNumDPCols(int dp) { this.numDPCols = dp; } http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index c6ae030..9e89376 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -45,12 +44,11 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -77,7 +75,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -94,8 +91,7 @@ public class TestFileSinkOperator { private static TableDesc nonAcidTableDescriptor; private static TableDesc acidTableDescriptor; private static ObjectInspector inspector; - private static List<TFSORow> rows; - private static ValidTxnList txnList; + private static List<Row> rows; private Path basePath; private JobConf jc; @@ -105,34 +101,33 @@ public class TestFileSinkOperator { Properties properties = new Properties(); properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName()); nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + properties.setProperty(serdeConstants.LIST_COLUMNS,"data"); properties = new Properties(properties); properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1"); acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); - tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "testFileSinkOperator"); tmpdir.mkdir(); tmpdir.deleteOnExit(); - txnList = new ValidReadTxnList(new long[]{}, 2); } @Test public void testNonAcidWrite() throws Exception { setBasePath("write"); - setupData(DataFormat.SIMPLE); + setupData(DataFormat.WITH_PARTITION_VALUE); FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0); processRows(op); - confirmOutput(); + confirmOutput(DataFormat.WITH_PARTITION_VALUE); } @Test public void testInsert() throws Exception { setBasePath("insert"); - setupData(DataFormat.SIMPLE); + setupData(DataFormat.WITH_PARTITION_VALUE); FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1); processRows(op); Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_PARTITION_VALUE); } @Test @@ -142,7 +137,7 @@ public class TestFileSinkOperator { FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2); processRows(op); Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_RECORD_ID); } @Test @@ -152,7 +147,7 @@ public class TestFileSinkOperator { FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2); processRows(op); Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_RECORD_ID); } @Test @@ -161,7 +156,7 @@ public class TestFileSinkOperator { setupData(DataFormat.WITH_PARTITION_VALUE); FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0); processRows(op); - confirmOutput(); + confirmOutput(DataFormat.WITH_PARTITION_VALUE); } @@ -174,7 +169,7 @@ public class TestFileSinkOperator { // We only expect 5 here because we'll get whichever of the partitions published its stats // last. Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_PARTITION_VALUE); } @Test @@ -184,19 +179,19 @@ public class TestFileSinkOperator { FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2); processRows(op); Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); } @Test public void testDeleteDynamicPartitioning() throws Exception { setBasePath("deleteDP"); - setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); + setupData(DataFormat.WITH_RECORD_ID); FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2); processRows(op); // We only expect -5 here because we'll get whichever of the partitions published its stats // last. Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); - confirmOutput(); + confirmOutput(DataFormat.WITH_RECORD_ID); } @@ -217,64 +212,52 @@ public class TestFileSinkOperator { } - private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE, - WITH_RECORD_ID_AND_PARTITION_VALUE}; + private enum DataFormat {WITH_RECORD_ID, WITH_PARTITION_VALUE, WITH_RECORD_ID_AND_PARTITION_VALUE}; private void setupData(DataFormat format) { - // Build object inspector - inspector = ObjectInspectorFactory.getReflectionObjectInspector - (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - rows = new ArrayList<TFSORow>(); - + Class<?> rType; switch (format) { - case SIMPLE: - // Build rows - for (int i = 0; i < 10; i++) { - rows.add( - new TFSORow( - new Text("mary had a little lamb") - ) - ); - } + case WITH_PARTITION_VALUE: + rType = RowWithPartVal.class; break; - case WITH_RECORD_ID: - for (int i = 0; i < 10; i++) { - rows.add( - new TFSORow( - new Text("its fleect was white as snow"), - new RecordIdentifier(1, 1, i) - ) - ); - } + rType = RowWithRecID.class; break; - - case WITH_PARTITION_VALUE: - for (int i = 0; i < 10; i++) { - rows.add( - new TFSORow( - new Text("its fleect was white as snow"), - (i < 5) ? new Text("Monday") : new Text("Tuesday") - ) - ); - } - break; - case WITH_RECORD_ID_AND_PARTITION_VALUE: - for (int i = 0; i < 10; i++) { - rows.add( - new TFSORow( - new Text("its fleect was white as snow"), - (i < 5) ? new Text("Monday") : new Text("Tuesday"), - new RecordIdentifier(1, 1, i) - ) - ); - } + rType = RowWithPartNRecID.class; break; - default: - throw new RuntimeException("Unknown option!"); + throw new RuntimeException("Unknown type"); + } + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (rType, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + rows = new ArrayList<Row>(); + Row r; + for (int i = 0; i < 10; i++) { + switch (format) { + case WITH_PARTITION_VALUE: + r = + new RowWithPartVal( + new Text("mary had a little lamb"), + (i < 5) ? new Text("Monday") : new Text("Tuesday")); + break; + case WITH_RECORD_ID: + r = new RowWithRecID(new RecordIdentifier(1, 1, i), + (i < 5) ? new Text("Monday") : new Text("Tuesday")); + break; + case WITH_RECORD_ID_AND_PARTITION_VALUE: + r = new RowWithPartNRecID( + new Text("its fleect was white as snow"), + (i < 5) ? new Text("Monday") : new Text("Tuesday"), + new RecordIdentifier(1, 1, i)); + break; + default: + throw new RuntimeException("Unknown data format"); + } + rows.add(r); + } } @@ -300,9 +283,6 @@ public class TestFileSinkOperator { Map<String, String> partColMap= new LinkedHashMap<String, String>(1); partColMap.put(PARTCOL_NAME, null); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); - Map<String, String> partColNames = new HashMap<String, String>(1); - partColNames.put(PARTCOL_NAME, PARTCOL_NAME); - dpCtx.setInputToDPCols(partColNames); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); } else { @@ -320,27 +300,27 @@ public class TestFileSinkOperator { } private void processRows(FileSinkOperator op) throws HiveException { - for (TFSORow r : rows) op.process(r, 0); + for (Object r : rows) op.process(r, 0); op.jobCloseOp(jc, true); op.close(false); } - private void confirmOutput() throws IOException, SerDeException { + private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException { Path[] paths = findFilesInBasePath(); - TFSOInputFormat input = new TFSOInputFormat(); + TFSOInputFormat input = new TFSOInputFormat(rType); FileInputFormat.setInputPaths(jc, paths); InputSplit[] splits = input.getSplits(jc, 1); - RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc, + RecordReader<NullWritable, Row> reader = input.getRecordReader(splits[0], jc, Mockito.mock(Reporter.class)); NullWritable key = reader.createKey(); - TFSORow value = reader.createValue(); - List<TFSORow> results = new ArrayList<TFSORow>(rows.size()); - List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size()); + Row value = reader.createValue(); + List<Row> results = new ArrayList<Row>(rows.size()); + List<Row> sortedRows = new ArrayList<Row>(rows.size()); for (int i = 0; i < rows.size(); i++) { Assert.assertTrue(reader.next(key, value)); - results.add(new TFSORow(value)); - sortedRows.add(new TFSORow(rows.get(i))); + results.add(value.clone()); + sortedRows.add(rows.get(i)); } Assert.assertFalse(reader.next(key, value)); Collections.sort(results); @@ -370,36 +350,172 @@ public class TestFileSinkOperator { } } - private static class TFSORow implements WritableComparable<TFSORow> { + public static interface Row extends WritableComparable<Row> { + + Row clone() throws CloneNotSupportedException; + } + + private static class RowWithRecID implements Row { + private RecordIdentifier recId; - private Text data; private Text partVal; - TFSORow() { - this(null, null, null); + public RowWithRecID() { + } + public RowWithRecID(RecordIdentifier recId, Text partVal) { + super(); + this.recId = recId; + this.partVal = partVal; } - TFSORow(Text t) { - this(t, null, null); + @Override + public + Row clone() throws CloneNotSupportedException { + return new RowWithRecID(this.recId, this.partVal); } - TFSORow(Text t, Text pv) { - this(t, pv, null); + @Override + public void write(DataOutput dataOutput) throws IOException { + if (partVal == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + partVal.write(dataOutput); + } + if (recId == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + recId.write(dataOutput); + } } + @Override + public void readFields(DataInput dataInput) throws IOException { + boolean notNull = dataInput.readBoolean(); + if (notNull) { + partVal = new Text(); + partVal.readFields(dataInput); + } + notNull = dataInput.readBoolean(); + if (notNull) { + recId = new RecordIdentifier(); + recId.readFields(dataInput); + } - TFSORow(Text t, RecordIdentifier ri) { - this(t, null, ri); } + @Override + public int compareTo(Row row) { + RowWithRecID other = (RowWithRecID) row; + if (recId == null && other.recId == null) { + return comparePartVal(other); + } else if (recId == null) { + return -1; + } else { + int rc = recId.compareTo(other.recId); + if (rc == 0) return comparePartVal(other); + else return rc; + } + } + private int comparePartVal(RowWithRecID other) { - TFSORow(Text t, Text pv, RecordIdentifier ri) { + return partVal.compareTo(other.partVal); + } + + @Override + public boolean equals(Object obj) { + return compareTo((RowWithRecID)obj) == 0; + } + } + private static class RowWithPartVal implements Row { + + public RowWithPartVal(Text data, Text partVal) { + super(); + this.data = data; + this.partVal = partVal; + } + + public RowWithPartVal() { + } + + private Text data; + private Text partVal; + + @Override + public Row clone() throws CloneNotSupportedException { + return new RowWithPartVal(this.data, this.partVal); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + data.write(dataOutput); + if (partVal == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + partVal.write(dataOutput); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + data = new Text(); + data.readFields(dataInput); + boolean notNull = dataInput.readBoolean(); + if (notNull) { + partVal = new Text(); + partVal.readFields(dataInput); + } + } + + @Override + public int compareTo(Row row) { + RowWithPartVal other = (RowWithPartVal) row; + if (partVal == null && other.partVal == null) { + return compareData(other); + } else if (partVal == null) { + return -1; + } else { + int rc = partVal.compareTo(other.partVal); + if (rc == 0) return compareData(other); + else return rc; + } + } + + private int compareData(RowWithPartVal other) { + if (data == null && other.data == null) return 0; + else if (data == null) return -1; + else return data.compareTo(other.data); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RowWithPartVal) { + RowWithPartVal other = (RowWithPartVal) obj; + return compareTo(other) == 0; + + } else { + return false; + } + } + } + private static class RowWithPartNRecID implements Row { + private RecordIdentifier recId; + private Text data; + private Text partVal; + + RowWithPartNRecID() { + this(null, null, null); + } + + RowWithPartNRecID(Text t, Text pv, RecordIdentifier ri) { data = t; partVal = pv; recId = ri; - } - TFSORow(TFSORow other) { - this(other.data, other.partVal, other.recId); + @Override + public RowWithPartNRecID clone() throws CloneNotSupportedException { + return new RowWithPartNRecID(this.data, this.partVal, this.recId); } @Override @@ -437,8 +553,8 @@ public class TestFileSinkOperator { @Override public boolean equals(Object obj) { - if (obj instanceof TFSORow) { - TFSORow other = (TFSORow) obj; + if (obj instanceof RowWithPartNRecID) { + RowWithPartNRecID other = (RowWithPartNRecID) obj; if (data == null && other.data == null) return checkPartVal(other); else if (data == null) return false; else if (data.equals(other.data)) return checkPartVal(other); @@ -448,21 +564,22 @@ public class TestFileSinkOperator { } } - private boolean checkPartVal(TFSORow other) { + private boolean checkPartVal(RowWithPartNRecID other) { if (partVal == null && other.partVal == null) return checkRecId(other); else if (partVal == null) return false; else if (partVal.equals(other.partVal)) return checkRecId(other); else return false; } - private boolean checkRecId(TFSORow other) { + private boolean checkRecId(RowWithPartNRecID other) { if (recId == null && other.recId == null) return true; else if (recId == null) return false; else return recId.equals(other.recId); } @Override - public int compareTo(TFSORow other) { + public int compareTo(Row row) { + RowWithPartNRecID other = (RowWithPartNRecID) row; if (recId == null && other.recId == null) { return comparePartVal(other); } else if (recId == null) { @@ -474,7 +591,7 @@ public class TestFileSinkOperator { } } - private int comparePartVal(TFSORow other) { + private int comparePartVal(RowWithPartNRecID other) { if (partVal == null && other.partVal == null) { return compareData(other); } else if (partVal == null) { @@ -486,21 +603,26 @@ public class TestFileSinkOperator { } } - private int compareData(TFSORow other) { + private int compareData(RowWithPartNRecID other) { if (data == null && other.data == null) return 0; else if (data == null) return -1; else return data.compareTo(other.data); } } - private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow> - implements AcidInputFormat<NullWritable, TFSORow> { + private static class TFSOInputFormat extends FileInputFormat<NullWritable, Row> + implements AcidInputFormat<NullWritable, Row> { FSDataInputStream in[] = null; int readingFrom = -1; + DataFormat rType; + + public TFSOInputFormat(DataFormat rType) { + this.rType = rType; + } @Override - public RecordReader<NullWritable, TFSORow> getRecordReader( + public RecordReader<NullWritable, Row> getRecordReader( InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { if (in == null) { Path paths[] = FileInputFormat.getInputPaths(entries); @@ -511,10 +633,10 @@ public class TestFileSinkOperator { } readingFrom = 0; } - return new RecordReader<NullWritable, TFSORow>() { + return new RecordReader<NullWritable, Row>() { @Override - public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws + public boolean next(NullWritable nullWritable, Row tfsoRecord) throws IOException { try { tfsoRecord.readFields(in[readingFrom]); @@ -532,8 +654,18 @@ public class TestFileSinkOperator { } @Override - public TFSORow createValue() { - return new TFSORow(); + public Row createValue() { + switch (rType) { + case WITH_RECORD_ID_AND_PARTITION_VALUE: + return new RowWithPartNRecID(); + case WITH_PARTITION_VALUE: + return new RowWithPartVal(); + case WITH_RECORD_ID: + return new RowWithRecID(); + + default: + throw new RuntimeException("Unknown row Type"); + } } @Override @@ -554,14 +686,14 @@ public class TestFileSinkOperator { } @Override - public RowReader<TFSORow> getReader(InputSplit split, + public RowReader<Row> getReader(InputSplit split, Options options) throws IOException { return null; } @Override - public RawReader<TFSORow> getRawReader(Configuration conf, + public RawReader<Row> getRawReader(Configuration conf, boolean collapseEvents, int bucket, ValidTxnList validTxnList, @@ -578,9 +710,9 @@ public class TestFileSinkOperator { } } - public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow> - implements AcidOutputFormat<NullWritable, TFSORow> { - List<TFSORow> records = new ArrayList<TFSORow>(); + public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, Row> + implements AcidOutputFormat<NullWritable, Row> { + List<Row> records = new ArrayList<>(); long numRecordsAdded = 0; FSDataOutputStream out = null; @@ -588,7 +720,6 @@ public class TestFileSinkOperator { public RecordUpdater getRecordUpdater(final Path path, final Options options) throws IOException { - final StructObjectInspector inspector = (StructObjectInspector)options.getInspector(); return new RecordUpdater() { @Override public void insert(long currentTransaction, Object row) throws IOException { @@ -608,9 +739,8 @@ public class TestFileSinkOperator { } private void addRow(Object row) { - assert row instanceof TFSORow : "Expected TFSORow but got " + - row.getClass().getName(); - records.add((TFSORow)row); + assert row instanceof Row : "Expected Row but got " + row.getClass().getName(); + records.add((Row)row); } @Override @@ -619,7 +749,7 @@ public class TestFileSinkOperator { FileSystem fs = path.getFileSystem(options.getConfiguration()); out = fs.create(path); } - for (TFSORow r : records) r.write(out); + for (Writable r : records) r.write(out); records.clear(); out.flush(); } @@ -657,8 +787,8 @@ public class TestFileSinkOperator { return new FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { - Assert.assertTrue(w instanceof TFSORow); - records.add((TFSORow) w); + Assert.assertTrue(w instanceof Row); + records.add((Row)w); } @Override @@ -667,7 +797,7 @@ public class TestFileSinkOperator { FileSystem fs = finalOutPath.getFileSystem(jc); out = fs.create(finalOutPath); } - for (TFSORow r : records) r.write(out); + for (Writable r : records) r.write(out); records.clear(); out.flush(); out.close(); @@ -676,7 +806,7 @@ public class TestFileSinkOperator { } @Override - public RecordWriter<NullWritable, TFSORow> getRecordWriter( + public RecordWriter<NullWritable, Row> getRecordWriter( FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws IOException { return null; @@ -688,7 +818,7 @@ public class TestFileSinkOperator { } } - public static class TFSOSerDe implements SerDe { + public static class TFSOSerDe extends AbstractSerDe { @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { @@ -697,20 +827,18 @@ public class TestFileSinkOperator { @Override public Class<? extends Writable> getSerializedClass() { - return TFSORow.class; + return RowWithPartNRecID.class; } @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - assert obj instanceof TFSORow : "Expected TFSORow or decendent, got " - + obj.getClass().getName(); - return (TFSORow)obj; + assert obj instanceof Row : "Expected Row or decendent, got " + obj.getClass().getName(); + return (Row)obj; } @Override public Object deserialize(Writable blob) throws SerDeException { - assert blob instanceof TFSORow : "Expected TFSORow or decendent, got " - + blob.getClass().getName(); + assert blob instanceof Row : "Expected Row or decendent, got "+ blob.getClass().getName(); return blob; }