[
https://issues.apache.org/jira/browse/GOBBLIN-2147?focusedWorklogId=960348&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-960348
]
ASF GitHub Bot logged work on GOBBLIN-2147:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Mar/25 17:37
Start Date: 05/Mar/25 17:37
Worklog Time Spent: 10m
Work Description: abhishekmjain commented on code in PR #4044:
URL: https://github.com/apache/gobblin/pull/4044#discussion_r1981876448
##########
gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java:
##########
@@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState>
previousStates, String lowW
return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
}
+ /** Returns the low watermark value based on lookback which is equal to
current time minus lookback time. */
+ private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+ try {
+ Duration lookBackDuration =
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+ return new DateTime().minus(lookBackDuration).getMillis();
+ } catch (IOException e) {
+ Throwables.propagate(e);
Review Comment:
Should we propagate the exception if we want to have a fallback to 0?
Also, looks like Throwables.propagate is deprecated. Let's just log the
exception and move ahead here.
##########
gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java:
##########
@@ -367,7 +384,25 @@ private long getLowWaterMark(Iterable<WorkUnitState>
previousStates, String lowW
return lowWaterMarkValue + getRetriever().getWatermarkIncrementMs();
}
+ /** Returns the low watermark value based on lookback which is equal to
current time minus lookback time. */
+ private long getLowWaterMarkFromLookbackTime(String lookBackTime) {
+ try {
+ Duration lookBackDuration =
PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(lookBackTime);
+ return new DateTime().minus(lookBackDuration).getMillis();
+ } catch (IOException e) {
+ Throwables.propagate(e);
Review Comment:
Should we propagate the exception if we want to have a fallback to 0?
Also, looks like Throwables.propagate is deprecated. Let's just log the
exception instead.
Issue Time Tracking
-------------------
Worklog Id: (was: 960348)
Time Spent: 1.5h (was: 1h 20m)
> Add lookback time property in PartitionedFileSource
> ---------------------------------------------------
>
> Key: GOBBLIN-2147
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2147
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> All FileBasedSource implementations should have config for lookback time.
>
> Currently
> FileBasedSources look for data since the time set by
> `conversion.min.watermark` and time granularity is decided by the lowest time
> denomination. that denomination in many cases, including this one, is 1 second
> (determined by
> |gobblin.flow.input.dataset.descriptor.partition.pattern|yyyy-MM-dd_HH_mm_ss|
>
> It is an extremely abusive way to find workunits.
> Let's enable these jobs to use lookback time configs like several other
> dataset finders do.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)