[ 
https://issues.apache.org/jira/browse/GOBBLIN-1608?focusedWorklogId=725400&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-725400
 ]

ASF GitHub Bot logged work on GOBBLIN-1608:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Feb/22 19:51
            Start Date: 11/Feb/22 19:51
    Worklog Time Spent: 10m 
      Work Description: jack-moseley commented on a change in pull request 
#3466:
URL: https://github.com/apache/gobblin/pull/3466#discussion_r804960876



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -368,29 +369,40 @@ private void flush(String dbName, String tableName) 
throws IOException {
    * Note that this is one of the place where the materialization of 
aggregated metadata happens.
    * When there's a change of {@link OperationType}, it also interrupts 
metadata aggregation,
    * and triggers materialization of metadata.
+   *
+   * @param emitAllErrors if true, go through all datasets in {@link 
#datasetErrorMap} and emit GTE for each. We only want
+   *                      to do this if a watermark is is being advanced.
+   *
    * @throws IOException
    */
-  @Override
-  public void flush() throws IOException {
+  public void flush(boolean emitAllErrors) throws IOException {
     log.info(String.format("start to flushing %s records", 
String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = 
Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    // Emit events for all current errors, since the GMCE watermark will be 
advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : 
datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    if (emitAllErrors) {

Review comment:
       My thought was that this way is better, because when we call flush(), 
there may be iceberg watermarks advanced for tables that actually do flush, and 
for those ones, we do want to send failure GTE (but not for ones that just 
failed and are not having a successful flush).
   
   >I think we only want flush when watermark is moving
   Did you mean to say only send GTE when watermark is moving or are you 
suggesting we remove the flush from close method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 725400)
    Time Spent: 40m  (was: 0.5h)

> Fix case where error GTE is incorrectly sent from MCE writer
> ------------------------------------------------------------
>
>                 Key: GOBBLIN-1608
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1608
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Jack Moseley
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to