[
https://issues.apache.org/jira/browse/GOBBLIN-1669?focusedWorklogId=793053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-793053
]
ASF GitHub Bot logged work on GOBBLIN-1669:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/22 07:01
Start Date: 20/Jul/22 07:01
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r925243204
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -49,14 +52,11 @@ public class TimeAwareRecursiveCopyableDataset extends
RecursiveCopyableDataset
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
- private final boolean isPatternDaily;
- private final boolean isPatternHourly;
- private final boolean isPatternMinutely;
private final LocalDateTime currentTime;
- private final DatePattern pattern;
+ private final DatePattern patternQualifier;
enum DatePattern {
- MINUTELY, HOURLY, DAILY
+ SECONDLY, MINUTELY, HOURLY, DAILY
Review Comment:
I can't think of better names (and understand how these result from applying
a general rule), but I do stumble when reading these according to the common
defs, "of the second thing" and "in fine detail" (!) ;)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -108,6 +101,9 @@ public LocalDateTime next() {
LocalDateTime dateTime = startDate;
switch (datePattern) {
+ case SECONDLY:
+ startDate = startDate.plusSeconds(1);
+ break;
case MINUTELY:
startDate = startDate.plusMinutes(1);
Review Comment:
it's interesting to see how firmly tied this finder is to specific time
semantics (especially time zones). if e.g. the dir producer were not timezone
aware (or used a broken/presumed calculation) to create a date folder like
2:30am on the day the finder's TZ actually has it "springing forward" (from 2am
-> 3am) due to DST, this finder would miss that folder. probably not a big
risk, all things considered, but worth noting
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDatasetTest.java:
##########
@@ -216,6 +223,40 @@ public void testGetFilesAtPath() throws IOException {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
+ // test ds of daily/yyyy-MM-dd-HH-mm-ss
+ datePattern = "yyyy-MM-dd-HH-mm-ss";
+ formatter = DateTimeFormat.forPattern(datePattern);
+ endDate =
LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
+
+ candidateFiles = new HashSet<>();
+ for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
+ String startDate =
endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
Review Comment:
does using random values mean the test is not necessarily deterministic?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
}
}
- private boolean isDatePatternHourly(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay =
refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
+ private DatePattern validateLookbackWithDatePatternFormat(String
datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
+ PeriodFormatterBuilder formatterBuilder;
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ // Validate that the lookback is supported for the time format
+ if
(!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.SECONDLY;
+ } else if
(!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.MINUTELY;
+ } else if
(!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.HOURLY;
+ } else if
(!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) )
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.DAILY;
}
+ return null;
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder
formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+
+ // Data inside of nested folders representing timestamps need to be
fetched differently
+ if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+ // Use an iterator that traverses through all times from lookback to
current time, based on format
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, this.patternQualifier);
+ while (dateRangeIterator.hasNext()) {
+ Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
+ if (!fs.exists(pathWithDateTime)) {
+ continue;
+ }
+ fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+ }
+ } else {
+ // Look at the top level directories and compare if those fit into the
date format
+ Iterator<FileStatus> folderIterator =
Arrays.asList(fs.listStatus(path)).iterator();
+ while (folderIterator.hasNext()) {
+ Path folderPath = folderIterator.next().getPath();
+ String datePath =
folderPath.getName().split("/")[folderPath.getName().split("/").length -1];
+ try {
+ LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+ if (folderDate.isAfter(startDate) && folderDate.isBefore(endDate)) {
Review Comment:
the iterator based approach was `!isAfter(endDate)`, therefore wouldn't
equivalent here be "is before or equal to" `endDate`?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
}
}
- private boolean isDatePatternHourly(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay =
refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
+ private DatePattern validateLookbackWithDatePatternFormat(String
datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
+ PeriodFormatterBuilder formatterBuilder;
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ // Validate that the lookback is supported for the time format
+ if
(!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.SECONDLY;
+ } else if
(!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.MINUTELY;
+ } else if
(!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.HOURLY;
+ } else if
(!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) )
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.DAILY;
}
+ return null;
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder
formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+
+ // Data inside of nested folders representing timestamps need to be
fetched differently
+ if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+ // Use an iterator that traverses through all times from lookback to
current time, based on format
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, this.patternQualifier);
+ while (dateRangeIterator.hasNext()) {
+ Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
+ if (!fs.exists(pathWithDateTime)) {
Review Comment:
wow, that's a lot of FS fstat access: from 1440 when by-minute to 86400 when
by-second--and that's just each day! I'm sure we all agree "premature
optimization is the root of all evil"... but still I have serious concerns that
such scanning will exceed the acceptable load on the host FS. worst would be
if users default to this as a way to 'keep their options open', even when in
practice they don't require nearly such fine-grained differentiation between
the dirs they create.
(clearly my suggestions here imply a fundamental rethinking of the impl...)
first off I note that the iterator's stride is always just one. a
sequential scan of the entire space is generally unnecessary merely to test
interval membership. I expect that was the same intuition leading you to
develop the `else` case (when no dir separator detected)... I'm just suggesting
you go farther.
overall my expectation is of an infrequent/sparse hit-rate from this
probing, since most producers won't be creating a new snapshot/partition
every/most seconds or every/most minutes. if borne out, it would likely be far
more efficient (although somewhat more complex to implement) targeted directory
child listing, with filtering of the results, as opposed to constructing every
possible concrete embodiment, only to find the vast majority not actually to
exist.
if you feel such concerns are warranted, I'm happy to suggest
simple/efficient implementation for targeted dir listing
Issue Time Tracking
-------------------
Worklog Id: (was: 793053)
Time Spent: 1h (was: 50m)
> Support seconds with TimeAwareRecursiveCopyableDataset
> ------------------------------------------------------
>
> Key: GOBBLIN-1669
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1669
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> # Support seconds with the timeiterator
> # Optimize non-nested timestamp representations e.g. yyyy-mm-dd-hh-mm-ss to
> not use an iterator, and instead list the top level directory to reduce the
> number of FS calls.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)