Will-Lo commented on code in PR #3592: URL: https://github.com/apache/gobblin/pull/3592#discussion_r1004741034
########## gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java: ########## @@ -52,21 +52,16 @@ public DestinationDatasetHandlerService(SourceState jobState, Boolean canCleanUp * Executes handlers * @param workUnitStream */ - public void executeHandlers(WorkUnitStream workUnitStream) { - if (handlers.size() > 0) { - if (workUnitStream.isSafeToMaterialize()) { - Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection()); - for (DestinationDatasetHandler handler : this.handlers) { - try { - handler.handle(workUnits); - } catch (IOException e) { - throw new RuntimeException(String.format("Handler %s failed to execute", handler.getClass().getName()), e); - } - } - } else { - throw new RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not support work unit streams"); + public WorkUnitStream executeHandlers(WorkUnitStream workUnitStream) { + for (DestinationDatasetHandler handler : this.handlers) { + try { + workUnitStream = handler.handle(workUnitStream); Review Comment: Can we use a new variable here to return? Even if it defaults to the parameter, not good practice to modify parameters directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org