[
https://issues.apache.org/jira/browse/GOBBLIN-1669?focusedWorklogId=793466&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-793466
]
ASF GitHub Bot logged work on GOBBLIN-1669:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/22 23:11
Start Date: 20/Jul/22 23:11
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3528:
URL: https://github.com/apache/gobblin/pull/3528#discussion_r926122575
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/TimeAwareRecursiveCopyableDataset.java:
##########
@@ -128,55 +124,83 @@ public void remove() {
}
}
- private boolean isDatePatternHourly(String datePattern) {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
- String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
- String refDateTimeStringAtStartOfDay =
refDateTimeAtStartOfDay.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
- }
-
- private boolean isDatePatternMinutely(String datePattern) {
+ private DatePattern validateLookbackWithDatePatternFormat(String
datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
- LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
+ LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
- LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
- String refDateTimeStringAtStartOfHour =
refDateTimeAtStartOfHour.toString(formatter);
- return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
- }
+ PeriodFormatterBuilder formatterBuilder;
- private boolean isLookbackTimeStringDaily(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
- try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
- return false;
+ // Validate that the lookback is supported for the time format
+ if
(!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").appendSeconds().appendSuffix("s");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.SECONDLY;
+ } else if
(!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString))
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.MINUTELY;
+ } else if
(!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.HOURLY;
+ } else if
(!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString) )
{
+ formatterBuilder = new
PeriodFormatterBuilder().appendDays().appendSuffix("d");
+ if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
+ throw new IllegalArgumentException(String.format("Expected lookback
time to be in daily format, check %s", LOOKBACK_TIME_KEY));
+ }
+ return DatePattern.DAILY;
}
+ return null;
}
- private boolean isLookbackTimeStringHourly(String lookbackTime) {
- PeriodFormatter periodFormatter = new
PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
+ private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder
formatterBuilder, String lookbackTime) {
try {
- periodFormatter.parsePeriod(lookbackTime);
- return true;
- } catch (Exception e) {
+ formatterBuilder.toFormatter().parsePeriod(lookbackTime);
+ } catch (IllegalArgumentException e) {
return false;
}
+ return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path,
PathFilter fileFilter) throws IOException {
- DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
+ DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
-
- DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
- while (dateRangeIterator.hasNext()) {
- Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
- fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+
+ // Data inside of nested folders representing timestamps need to be
fetched differently
+ if (datePattern.contains(FileSystems.getDefault().getSeparator())) {
+ // Use an iterator that traverses through all times from lookback to
current time, based on format
+ DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate,
endDate, this.patternQualifier);
+ while (dateRangeIterator.hasNext()) {
+ Path pathWithDateTime = new Path(path,
dateRangeIterator.next().toString(formatter));
+ if (!fs.exists(pathWithDateTime)) {
+ continue;
+ }
+ fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime,
fileFilter));
+ }
+ } else {
+ // Look at the top level directories and compare if those fit into the
date format
+ Iterator<FileStatus> folderIterator =
Arrays.asList(fs.listStatus(path)).iterator();
+ while (folderIterator.hasNext()) {
+ Path folderPath = folderIterator.next().getPath();
+ String datePath =
folderPath.getName().split("/")[folderPath.getName().split("/").length -1];
+ try {
+ LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
+ if (folderDate.isAfter(startDate) && folderDate.isBefore(endDate)) {
Review Comment:
Yes you're right, to handle this I had to round down the startDate
Issue Time Tracking
-------------------
Worklog Id: (was: 793466)
Time Spent: 1h 20m (was: 1h 10m)
> Support seconds with TimeAwareRecursiveCopyableDataset
> ------------------------------------------------------
>
> Key: GOBBLIN-1669
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1669
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> # Support seconds with the timeiterator
> # Optimize non-nested timestamp representations e.g. yyyy-mm-dd-hh-mm-ss to
> not use an iterator, and instead list the top level directory to reduce the
> number of FS calls.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)