lcspinter commented on code in PR #3222:
URL: https://github.com/apache/hive/pull/3222#discussion_r853859878
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+ if (toTime != -1) {
+ if (fromTime >= toTime) {
Review Comment:
I think we can move this check to the beginning of the method, to spare some
execution time.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -163,4 +165,32 @@ public static void updateSpec(Configuration configuration,
Table table) {
public static boolean isBucketed(Table table) {
return table.spec().fields().stream().anyMatch(f ->
f.transform().toString().startsWith("bucket["));
}
+
+ /**
+ * Returns the snapshot ID which is immediately before (or exactly at) the
timestamp provided in millis.
+ * If the timestamp provided is before the first snapshot of the table, we
return an empty optional.
+ * If the timestamp provided is in the future compared to the latest
snapshot, we return the latest snapshot ID.
+ *
+ * E.g.: if we have snapshots S1, S2, S3 committed at times T3, T6, T9
respectively (T0 = start of epoch), then:
+ * - from T0 to T2 -> returns empty
+ * - from T3 to T5 -> returns S1
+ * - from T6 to T8 -> returns S2
+ * - from T9 to T∞ -> returns S3
+ *
+ * @param table the table whose snapshot ID we are trying to find
+ * @param time the timestamp provided in milliseconds
+ * @return the snapshot ID corresponding to the time
+ */
+ public static Optional<Long> findSnapshotForTimestamp(Table table, long
time) {
+ if (table.history().get(0).timestampMillis() > time) {
+ return Optional.empty();
+ }
+
+ for (Snapshot snapshot : table.snapshots()) {
Review Comment:
Are we certain that the table.snapshots() returns a list sorted by snapshot
time?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
Review Comment:
nit: Can we move the toTime to the method param?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+ if (toTime != -1) {
+ if (fromTime >= toTime) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must precede the provided TO timestamp.");
+ }
+ long toSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
toTime)
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Provided TO timestamp must be after the first snapshot of the
table."));
+ return scan.appendsBetween(fromSnapshot, toSnapshot);
+ } else {
+ return scan.appendsAfter(fromSnapshot);
+ }
+ }
+
+ private static TableScan scanWithVersionRange(Configuration conf, TableScan
scan, long fromSnapshot) {
+ long toSnapshot = conf.getLong(InputFormatConfig.TO_VERSION, -1);
Review Comment:
Nit: move toSnapshot to method param
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]