[ 
https://issues.apache.org/jira/browse/GOBBLIN-1673?focusedWorklogId=804524&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-804524
 ]

ASF GitHub Bot logged work on GOBBLIN-1673:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Aug/22 19:56
            Start Date: 29/Aug/22 19:56
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3539:
URL: https://github.com/apache/gobblin/pull/3539#discussion_r957740785


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java:
##########
@@ -16,28 +16,68 @@
  */
 package org.apache.gobblin.runtime.messaging;
 
-import java.time.Duration;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Abstraction for receiving {@link DynamicWorkUnitMessage} from {@link 
DynamicWorkUnitProducer}.
  * The class is responsible for fetching the messages from the messaging 
service. All business logic
  * should be done in the {@link DynamicWorkUnitMessage.Handler}.<br><br>
  *
- * For polling implementations (e.g. HDFS or Kafka), you can use the
- * {@link DynamicWorkUnitUtils#runInBackground(Runnable, Duration)} to call the
+ * This consumer can be used to poll a message buffer (e.g. HDFS or Kafka) 
using
+ * {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, 
TimeUnit)} to call the
  * {@link Runnable#run()} method periodically in a background thread <br><br>
  *
- * Push based implementations (e.g. helix or zk) can omit using this method 
and instead setup the callback methods
- * without spawning a background thread
+ * Each new {@link DynamicWorkUnitMessage} is passed to a {@link 
DynamicWorkUnitMessage.Handler}
+ * and will call {@link 
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)}
  */
-public abstract class DynamicWorkUnitConsumer implements Runnable {
-  protected List<DynamicWorkUnitMessage.Handler> handlers = new ArrayList<>();
+public class DynamicWorkUnitConsumer implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+  protected MessageBuffer<DynamicWorkUnitMessage> buffer;
+  protected List<DynamicWorkUnitMessage.Handler> handlers;
+
+  public DynamicWorkUnitConsumer(
+      MessageBuffer<DynamicWorkUnitMessage> buffer,
+      Collection<DynamicWorkUnitMessage.Handler> handlers) {
+    this.buffer = buffer;
+    for(DynamicWorkUnitMessage.Handler handler : handlers) {
+      handlers.add(handler);
+    }
+  }
+
+  /**
+   * Fetches all unread messages from sent by {@link DynamicWorkUnitProducer} 
and
+   * calls {@link 
DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} method for each 
handler added via
+   * {@link DynamicWorkUnitConsumer#DynamicWorkUnitConsumer(MessageBuffer, 
Collection)} or
+   * {@link DynamicWorkUnitConsumer#addHandler(DynamicWorkUnitMessage.Handler)}
+   */
+  public void run() {
+    List<DynamicWorkUnitMessage> messages = getMessages(this.buffer);
+    for (DynamicWorkUnitMessage msg : messages) {
+      handleMessage(msg);
+    }
+  }
+
+  protected static List<DynamicWorkUnitMessage> 
getMessages(MessageBuffer<DynamicWorkUnitMessage> buffer) {
+    try {
+      LOG.debug("Fetching {} from the file buffer, {}",
+          DynamicWorkUnitMessage.class.getSimpleName(),
+          buffer.getClass().getSimpleName());
+      return buffer.get();
+    } catch (IOException e) {
+      throw new RuntimeException("Encountered exception while getting messages 
from the message buffer", e);

Review Comment:
   Not sure if the application should explode loudly when there is an 
ioexception. This feels a little brittle since the dynamic workunit is an 
"optional" feature, but I can imagine it being difficult to uncover this issue 
if we do not do this.
   
   Gobblin logs are very noisy and full of red herring exceptions (exceptions 
that are "okay"). It may be hard to RC unless it explodes loudly or the person 
debugging is specifically looking for this log.





Issue Time Tracking
-------------------

            Worklog Id:     (was: 804524)
    Remaining Estimate: 71h  (was: 71h 10m)
            Time Spent: 1h  (was: 50m)

> [Helix Dynamic Workunit] Message Schema for splitting workuntis
> ---------------------------------------------------------------
>
>                 Key: GOBBLIN-1673
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1673
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-helix
>            Reporter: Matthew Ho
>            Assignee: Abhishek Tiwari
>            Priority: Major
>   Original Estimate: 72h
>          Time Spent: 1h
>  Remaining Estimate: 71h
>
> For the Helix Dynamic Workunits, task runners will produce messages 
> indicating the current workunit health and the application master will 
> consume these messages.
> A message will be sent from the task runner to the AM when the task runner 
> experiences lag during ingestion. This ticket is the schema proposal for this 
> message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to