[GitHub] [flink] StephanEwen commented on a change in pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.

2020-11-03 Thread GitBox


StephanEwen commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r516139098



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive

Review comment:
   Suggested improvement for the JavaDocs:
   
   ```
   This interface let's functions/operators receive commit- or abort 
notifications for
   checkpoints.
   
   Important: Just using state (like {@link ValueState}, {@link 
ListState}, etc.) in
   functions/operators does NOT require implementing this interface.
   
   This interface is typically only needed for transactional interaction 
with the
   "outside world", like committing external side effects on checkpoints.
   An example is committing external transactions once a checkpoint completes.
   
   Invocation Guarantees
   
   It is NOT guaranteed that the implementation will receive a notification 
for each
   completed or aborted checkpoint. While these notifications come in most 
cases, notifications
   might not happen, for example, when a failure/restore happens directly after 
a checkpoint 
   completed.
   
   To handle this correctly, implementation should follow the "Checkpoint 
Subsuming Contract"
   described below.
   
   Exceptions
   
   The notifications from this interface come "after the fact", meaning 
after the checkpoint has
   been aborted or completed. Throwing an exception will not change the 
completion/abortion of
   the checkpoint.
   
   Exceptions thrown from this method result in task- or job failure and 
recovery.
   
   Checkpoint Subsuming Contract
   
   Checkpoint IDs are strictly increasing. A checkpoint with higher ID 
always subsumes
   a checkpoint with lower ID. For example, when checkpoint T is confirmed 
complete, the
   code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are 
pending any more.
   No checkpoint with lower ID will ever be committed after a checkpoint 
with a higher ID.
   
   This does not necessarily mean that all of the previous checkpoints 
actually completed
   successfully. It is also possible that some checkpoint timed out or was not 
fully acknowledged
   by all tasks. Implementations must then behave as if that checkpoint did not 
happen.
   The recommended way to do this is to let the completion of a new checkpoint 
(higher ID)
   subsume the completion of all earlier checkpoints (lower ID).
   
   This property is easy to achieve for cases where increasing "offsets", 
"watermarks", or
   other progress indicators are communicated on checkpoint completion. A newer 
checkpoint
   will have a higher "offset" (more progress) than the previous checkpoint, so 
it automatically subsumes the previous one. Remember the "offset to commit" for 
a checkpoint ID
   and commit it when that specific checkpoint (by ID) gets the notification 
that it is complete.
   
   If you need to publish some specific artifacts (like files) or 
acknowledge some specific IDs
   after a checkpoint, you can follow a pattern like below.
   
   Implementing Checkpoint Subsuming for Committing Artifacts
   
   The following is a sample pattern how applications can publish specific 
artifacts on checkpoint.
   Examples would be operators that acknowledge specific IDs or publish 
specific files on checkpoint.
   
   
   During processing, have two sets of artifacts.
   
   A "ready set": Artifacts that are ready to be published as 
part of the next checkpoint.
   Artifacts are added to this set as soon as they are ready to 
be committed.
   This set is "transient", it is not stored in Flink's state 
persisted anywhere.
   A "pending set": Artifacts being committed with a checkpoint.
   The actual publishing happens when the checkpoint is 
complete.
   This is a map of "{@code long => List}", mapping 
from the id of the
 

[GitHub] [flink] StephanEwen commented on a change in pull request #13784: [FLINK-19698][connectors/common] API improvements to the Sources.

2020-10-28 Thread GitBox


StephanEwen commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r513286480



##
File path: flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
##
@@ -565,6 +565,20 @@ else if (t instanceof Error) {
return Optional.empty();
}
 
+   /**
+* Find the root cause of the given throwable chain.
+*
+* @param throwable the throwable chain to check.
+* @return the root cause of the throwable chain.
+*/
+   public static Throwable findRootCause(Throwable throwable) {

Review comment:
   I think this needs an extra check against cyclic cause chains, which are 
possible and nasty.
   
   We previously used the Apache Commons Lang for that: 
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java#L597
   
   I think at the very least, we need a set to check duplicates and stop 
traversing once we see an exception we saw before.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##
@@ -0,0 +1,51 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one

Review comment:
   Nit: The header here uses a different style than the rest of the code 
(other files have * at every line start).

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
##
@@ -1,19 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ Licensed to the Apache Software Foundation (ASF) under one

Review comment:
   Nit: The header here uses a different style than the rest of the code 
(other files have * at every line start).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org