Copilot commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2097475681
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java:
##########
@@ -236,6 +237,9 @@ protected void writeImpl(InputStream inputStream, Path
writeAt, CopyableFile cop
final long blockSize = copyableFile.getBlockSize(this.fs);
final long fileSize = copyableFile.getFileStatus().getLen();
+ // Store source file size in task state
+ this.state.setProp(FileSizePolicy.BYTES_READ_KEY, fileSize);
Review Comment:
Using setProp here overwrites the bytes-read count for each file instead of
accumulating across multiple writes. Consider summing the file sizes (e.g.,
getPropAsLong + new size) to track total bytes read.
```suggestion
// Accumulate source file size in task state
long currentBytesRead =
this.state.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L);
this.state.setProp(FileSizePolicy.BYTES_READ_KEY, currentBytesRead +
fileSize);
```
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java:
##########
@@ -308,6 +312,8 @@ public boolean apply(FileStatus input) {
os.close();
log.info("OutputStream for file {} is closed.", writeAt);
inputStream.close();
+ long actualFileSize = this.fs.getFileStatus(writeAt).getLen();
+ this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize);
Review Comment:
Similarly, this overwrites the bytes-written count on each write. Consider
accumulating the sizes so the final state reflects the sum of all writes.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java:
##########
@@ -352,6 +352,9 @@ public boolean isParentTaskDone() {
return parentTaskDone;
}
+
Review Comment:
Add a brief JavaDoc to explain the purpose and usage of `getTaskState()`,
since it's a new public API.
```suggestion
/**
* Get the {@link TaskState} associated with this {@link Fork}.
*
* <p>
* The {@link TaskState} contains information about the execution state
of the parent {@link Task},
* including metrics and other runtime data. This method allows access
to the {@link TaskState}
* for monitoring or reporting purposes.
* </p>
*
* @return the {@link TaskState} of the parent {@link Task}.
*/
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java:
##########
@@ -316,6 +317,23 @@ private void completeShutdown() {
this.shutdownLatch.countDown();
}
+ private void computeAndUpdateTaskDataQuality() {
+ String overallTaskDataQuality =
TaskLevelPolicyChecker.DataQualityStatus.PASSED.name();
+ for (Optional<Fork> fork : this.forks.keySet()) {
+ if (fork.isPresent()) {
+ TaskState forkTaskState = fork.get().getTaskState();
+ if (forkTaskState != null) {
+ String forkDataQualityResult =
forkTaskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (forkDataQualityResult != null &&
TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(forkDataQualityResult))
{
+ overallTaskDataQuality =
TaskLevelPolicyChecker.DataQualityStatus.FAILED.name();
+ }
+ }
+ }
+ }
+ LOG.info("Data quality state of the task is " + overallTaskDataQuality);
Review Comment:
Use parameterized logging instead of string concatenation: `LOG.info("Data
quality state of the task is {}", overallTaskDataQuality);`.
```suggestion
LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java:
##########
@@ -90,7 +91,8 @@ public void onTaskRunCompletion(Task task) {
// Check the task state and handle task retry if task failed and
// it has not reached the maximum number of retries
WorkUnitState.WorkingState state = task.getTaskState().getWorkingState();
- if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() <
this.maxTaskRetries) {
+ String dataQualityResult =
task.getTaskState().getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+ if
(TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult)
|| state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() <
this.maxTaskRetries) {
Review Comment:
[nitpick] The mixed use of `||` and `&&` without parentheses can be unclear.
Consider adding parentheses around the retry condition: `...( FAILED ) ||
(state == FAILED && retry)`.
```suggestion
if
(TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult)
|| (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() <
this.maxTaskRetries)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]