PHOENIX-3406 CSV BulkLoad MR job incorrectly handle ROW_TIMESTAMP
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb120162 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb120162 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb120162 Branch: refs/heads/4.x-HBase-1.1 Commit: cb12016206c0b589b9781d5cb06555ab276d7d9a Parents: 6fcf5bb Author: Sergey Soldatov <s...@apache.org> Authored: Tue Oct 25 14:09:54 2016 -0700 Committer: Sergey Soldatov <s...@apache.org> Committed: Tue Sep 5 12:45:46 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/CsvBulkLoadToolIT.java | 38 ++++++++++++++++++++ .../mapreduce/FormatToBytesWritableMapper.java | 1 + .../mapreduce/FormatToKeyValueReducer.java | 7 ++-- 3 files changed, 43 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb120162/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 5a186a0..40fe900 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -92,6 +92,44 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { rs.close(); stmt.close(); } + @Test + public void testImportWithRowTimestamp() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE S.TABLE9 (ID INTEGER NOT NULL , NAME VARCHAR, T DATE NOT NULL," + + " " + + "CONSTRAINT PK PRIMARY KEY (ID, T ROW_TIMESTAMP))"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.println("2,Name 2,1971/01/01"); + printWriter.println("3,Name 2,1972/01/01"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv", + "--table", "table9", + "--schema", "s", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table9 WHERE T < to_date" + + "('1972-01-01') AND T > to_date('1970-01-01') ORDER BY id"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1971-01-01"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + @Test public void testImportWithTabs() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb120162/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index 1dae981..360859e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -314,6 +314,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri continue; } outputStream.writeByte(cell.getTypeByte()); + WritableUtils.writeVLong(outputStream,cell.getTimestamp()); WritableUtils.writeVInt(outputStream, i); WritableUtils.writeVInt(outputStream, cell.getValueLength()); outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb120162/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 07cf285..72af1a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -144,6 +144,7 @@ public class FormatToKeyValueReducer DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); while (input.available() != 0) { byte type = input.readByte(); + long timestamp = WritableUtils.readVLong(input); int index = WritableUtils.readVInt(input); ImmutableBytesWritable family; ImmutableBytesWritable cq; @@ -161,10 +162,10 @@ public class FormatToKeyValueReducer KeyValue.Type kvType = KeyValue.Type.codeToType(type); switch (kvType) { case Put: // not null value - kv = builder.buildPut(key.getRowkey(), family, cq, value); + kv = builder.buildPut(key.getRowkey(), family, cq, timestamp, value); break; case DeleteColumn: // null value - kv = builder.buildDeleteColumns(key.getRowkey(), family, cq); + kv = builder.buildDeleteColumns(key.getRowkey(), family, cq, timestamp); break; default: throw new IOException("Unsupported KeyValue type " + kvType); @@ -180,4 +181,4 @@ public class FormatToKeyValueReducer if (++index % 100 == 0) context.setStatus("Wrote " + index); } } -} \ No newline at end of file +}