marton-bod commented on a change in pull request #2512:
URL: https://github.com/apache/hive/pull/2512#discussion_r677298347



##########
File path: common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
##########
@@ -474,6 +474,7 @@
   NULL_TREATMENT_NOT_SUPPORTED(10426, "Function {0} does not support null 
treatment.", true),
   DATACONNECTOR_ALREADY_EXISTS(10427, "Dataconnector {0} already exists", 
true),
   DATACONNECTOR_NOT_EXISTS(10428, "Dataconnector does not exist:"),
+  TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please 
chose a storage format which supports the feature.", true),

Review comment:
       typo: choose

##########
File path: 
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS 
OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS 
OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, 
InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) 
+ "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) 
+ "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) 
+ "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra 
snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create

Review comment:
       nit: don't we create a snapshot for the initial table creation? (so we 
end up with versions + 1 in the end?)

##########
File path: 
service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
##########
@@ -494,7 +494,7 @@ public RowSet getNextRowSet(FetchOrientation orientation, 
long maxRows)
       }
       return rowSet;
     } catch (Exception e) {
-      throw new HiveSQLException("Unable to get the next row set", e);
+      throw new HiveSQLException("Unable to get the next row set with 
exception: " + e.getMessage(), e);

Review comment:
       This is very much needed thanks! :)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
##########
@@ -1261,4 +1275,19 @@ public boolean equalsWithIgnoreWriteId(Table tbl ) {
     return result;
   }
 
+  public long getAsOfVersion() {
+    return asOfVersion;
+  }
+
+  public void setAsOfVersion(long asOfVersion) {

Review comment:
       nit: not that I want to complicate our own life, but shall we consider 
making the version field a string to be more generic, in case other storage 
handlers would implement this in the future, e.g. with git-like hash versions? 
Then we could accept any string and simply convert it to a long, if it's an 
Iceberg query. If we set in stone now that the version must be a number on the 
syntax level, then it'd become a breaking change later to amend that. But it's 
just food for thought, again, don't want to complicate life either

##########
File path: 
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,

Review comment:
       What happens if we provide a timestamp in the far future? Will it 
default to the current snapshot? 

##########
File path: 
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS 
OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS 
OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, 
InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) 
+ "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) 
+ "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) 
+ "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra 
snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create
+   * @return The table created
+   * @throws IOException When there is a problem during table creation
+   * @throws InterruptedException When there is a problem during adding new 
data to the table
+   */
+  private Table prepareTableWithVersions(int versions) throws IOException, 
InterruptedException {
+    Table table = testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    for (int i = 0; i < versions - 1; ++i) {
+      // Just wait a little so we definitely will not have the same timestamp 
for the snapshots
+      Thread.sleep(100);
+      shell.executeStatement("INSERT INTO customers values(" +
+          (i + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size()) + 
",'Alice','Green_" + i + "')");
+    }
+
+    table.refresh();
+
+    return table;
+  }
+
+  /**
+   * Get the timestamp string which we can use in the queries. The timestamp 
will be after the given snapshot
+   * and before the next one
+   * @param table The table which we want to query
+   * @param snapshotPosition The position of the last snapshot we want to see 
in the query results
+   * @return The timestamp which we can use in the queries
+   */
+  private String timestampAfterSnapshot(Table table, int snapshotPosition) {
+    List<HistoryEntry> history = table.history();
+    long snapshotTime = history.get(snapshotPosition).timestampMillis();
+    long time = snapshotTime + 100;
+    if (history.size() > snapshotPosition + 1) {
+      time = snapshotTime + ((history.get(snapshotPosition + 
1).timestampMillis() - snapshotTime) / 2);
+    }
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS000000");

Review comment:
       Shall we factor this into a constant somewhere, or have a util method 
for this conversion? I think we'll need this for the metadata table output too

##########
File path: 
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS 
OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS 
OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, 
InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) 
+ "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) 
+ "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) 
+ "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());

Review comment:
       Can we add one more test case, where we query one table in the join with 
timestamp and the other with version?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
##########
@@ -2223,6 +2237,22 @@ private void getMetaData(QB qb, ReadEntity parentInput)
         }
       }
 
+      Pair<String, String> asOf = qb.getAsOfForAlias(alias);
+      if (asOf != null) {
+        if 
(!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false))
 {
+          throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
+        }
+        if (asOf.getLeft() != null) {
+          tab.setAsOfVersion(Long.parseLong(asOf.getLeft()));
+        }
+        if (asOf.getRight() != null) {
+          ZoneId timeZone = SessionState.get() == null ? new 
HiveConf().getLocalTimeZone() :
+                                 
SessionState.get().getConf().getLocalTimeZone();
+          TimestampTZ ts = TimestampTZUtil.parse(stripQuotes(asOf.getRight()), 
timeZone);

Review comment:
       So this means that the timestamp the user passes into the query will be 
considered as a timestamp in the user's local time zone (as opposed to a UTC 
timezone)?

##########
File path: 
iceberg/iceberg-handler/src/test/queries/negative/timetravel_by_time_non_iceberg.q
##########
@@ -0,0 +1,2 @@
+create table tbl_orc (a int, b string);

Review comment:
       Do we positive qtests too for Iceberg tables?

##########
File path: 
iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -2271,6 +2274,116 @@ public void testStatWithPartitionedCTAS() {
     checkColStat("target", "dept");
   }
 
+  @Test
+  public void testAsOfTimestamp() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    List<Object[]> rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 0) + "'");
+
+    Assert.assertEquals(3, rows.size());
+
+    rows = shell.executeStatement(
+        "SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + 
timestampAfterSnapshot(table, 1) + "'");
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS 
OF '1970-01-01 00:00:00'");
+        });
+  }
+
+  @Test
+  public void testAsOfVersion() throws IOException, InterruptedException {
+    Table table = prepareTableWithVersions(2);
+
+    HistoryEntry first = table.history().get(0);
+    List<Object[]> rows =
+        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS 
OF " + first.snapshotId());
+
+    Assert.assertEquals(3, rows.size());
+
+    HistoryEntry second = table.history().get(1);
+    rows = shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF " + second.snapshotId());
+
+    Assert.assertEquals(4, rows.size());
+
+    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
+        "Cannot find snapshot with ID 1234", () -> {
+          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF 1234");
+        });
+  }
+
+  @Test
+  public void testAsOfTimestampWithJoins() throws IOException, 
InterruptedException {
+    Table table = prepareTableWithVersions(4);
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) 
+ "' fv, " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 1) 
+ "' sv " +
+        "WHERE fv.first_name=sv.first_name");
+
+    Assert.assertEquals(4, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
1) + "' sv, " +
+         "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 
2) + "' tv " +
+         "WHERE sv.first_name=tv.first_name");
+
+    Assert.assertEquals(8, rows.size());
+
+    rows = shell.executeStatement("SELECT * FROM " +
+        "customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 2) 
+ "' sv, " +
+        "customers lv " +
+        "WHERE sv.first_name=lv.first_name");
+
+    Assert.assertEquals(14, rows.size());
+  }
+
+  /**
+   * Creates the 'customers' table with the default records and creates extra 
snapshots by inserting one more line
+   * into the table.
+   * @param versions The number of snapshots we want to create
+   * @return The table created
+   * @throws IOException When there is a problem during table creation
+   * @throws InterruptedException When there is a problem during adding new 
data to the table
+   */
+  private Table prepareTableWithVersions(int versions) throws IOException, 
InterruptedException {
+    Table table = testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    for (int i = 0; i < versions - 1; ++i) {
+      // Just wait a little so we definitely will not have the same timestamp 
for the snapshots
+      Thread.sleep(100);
+      shell.executeStatement("INSERT INTO customers values(" +
+          (i + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size()) + 
",'Alice','Green_" + i + "')");
+    }
+
+    table.refresh();
+
+    return table;
+  }
+
+  /**
+   * Get the timestamp string which we can use in the queries. The timestamp 
will be after the given snapshot
+   * and before the next one
+   * @param table The table which we want to query
+   * @param snapshotPosition The position of the last snapshot we want to see 
in the query results
+   * @return The timestamp which we can use in the queries
+   */
+  private String timestampAfterSnapshot(Table table, int snapshotPosition) {
+    List<HistoryEntry> history = table.history();
+    long snapshotTime = history.get(snapshotPosition).timestampMillis();
+    long time = snapshotTime + 100;
+    if (history.size() > snapshotPosition + 1) {
+      time = snapshotTime + ((history.get(snapshotPosition + 
1).timestampMillis() - snapshotTime) / 2);
+    }
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS000000");

Review comment:
       Also, maybe one more test where we insert into a table using time 
travel? Not sure it has added value, but I can imagine CTAS being used w/ time 
travel frequently




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to