pnowojski commented on a change in pull request #15294:
URL: https://github.com/apache/flink/pull/15294#discussion_r602380927



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -686,6 +686,8 @@ private void setVertexConfig(
             }
         }
         config.setInputs(inputConfigs);
+        config.setSupportsUnalignedInput(
+                inEdges.isEmpty() || 
inEdges.get(0).supportsUnalignedCheckpoints());

Review comment:
       why do we check only the first edge?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -400,32 +400,31 @@ public TaskAcknowledgeResult acknowledgeTask(
             int subtaskIndex = vertex.getParallelSubtaskIndex();
             long ackTimestamp = System.currentTimeMillis();
 
-            if (operatorSubtaskStates != null) {
-                for (OperatorIDPair operatorID : operatorIDs) {
-
-                    OperatorSubtaskState operatorSubtaskState =
-                            operatorSubtaskStates.getSubtaskStateByOperatorID(
-                                    operatorID.getGeneratedOperatorID());
-
-                    // if no real operatorSubtaskState was reported, we insert 
an empty state
-                    if (operatorSubtaskState == null) {
-                        operatorSubtaskState = 
OperatorSubtaskState.builder().build();
-                    }
-
-                    OperatorState operatorState =
-                            
operatorStates.get(operatorID.getGeneratedOperatorID());
-
-                    if (operatorState == null) {
-                        operatorState =
-                                new OperatorState(
-                                        operatorID.getGeneratedOperatorID(),
-                                        
vertex.getTotalNumberOfParallelSubtasks(),
-                                        vertex.getMaxParallelism());
-                        
operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
-                    }
-
-                    operatorState.putState(subtaskIndex, operatorSubtaskState);

Review comment:
       Is this change backward compatible? We we be able to recover from older 
checkpoints/savepoints?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -400,32 +400,31 @@ public TaskAcknowledgeResult acknowledgeTask(
             int subtaskIndex = vertex.getParallelSubtaskIndex();
             long ackTimestamp = System.currentTimeMillis();
 
-            if (operatorSubtaskStates != null) {
-                for (OperatorIDPair operatorID : operatorIDs) {
-
-                    OperatorSubtaskState operatorSubtaskState =
-                            operatorSubtaskStates.getSubtaskStateByOperatorID(
-                                    operatorID.getGeneratedOperatorID());
-
-                    // if no real operatorSubtaskState was reported, we insert 
an empty state
-                    if (operatorSubtaskState == null) {
-                        operatorSubtaskState = 
OperatorSubtaskState.builder().build();
-                    }
-
-                    OperatorState operatorState =
-                            
operatorStates.get(operatorID.getGeneratedOperatorID());
-
-                    if (operatorState == null) {
-                        operatorState =
-                                new OperatorState(
-                                        operatorID.getGeneratedOperatorID(),
-                                        
vertex.getTotalNumberOfParallelSubtasks(),
-                                        vertex.getMaxParallelism());
-                        
operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
-                    }
-
-                    operatorState.putState(subtaskIndex, operatorSubtaskState);

Review comment:
       missing test coverage for this change?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
##########
@@ -343,7 +343,7 @@ public static DataType getDataType(AbstractEvent event, 
boolean hasPriority) {
                 return EVENT_BUFFER;
             }
             CheckpointBarrier barrier = (CheckpointBarrier) event;
-            if (barrier.getCheckpointOptions().needsAlignment()) {
+            if (barrier.getCheckpointOptions().isExactlyOnceMode()) {

Review comment:
       ?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
##########
@@ -61,7 +67,7 @@
         PIPELINE {
             @Override
             UnalignedSettings createSettings(int parallelism) {
-                int numShuffles = 8;
+                int numShuffles = 10;

Review comment:
       Can you extract this magic constant `10`? And maybe ideally calculate it 
automatically? Are we at least verifying it somewhere?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -107,13 +107,20 @@
     private static final String TIME_CHARACTERISTIC = "timechar";
 
     private static final String MANAGED_MEMORY_FRACTION_PREFIX = 
"managedMemFraction.";
+
     private static final ConfigOption<Boolean> 
STATE_BACKEND_USE_MANAGED_MEMORY =
             ConfigOptions.key("statebackend.useManagedMemory")
                     .booleanType()
                     .noDefaultValue()
                     .withDescription(
                             "If state backend is specified, whether it uses 
managed memory.");
 
+    private static final ConfigOption<Boolean> SUPPORTS_UNALIGNED_INPUT =
+            ConfigOptions.key("unaligned_input")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Flag whether all input exchanges support 
unaligned input.");
+

Review comment:
       Why do we have this flag both in the `StreamConfig` and `StreamEdge`? 
Why not just one place?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedController.java
##########
@@ -39,6 +40,8 @@
 /** Controller for aligned checkpoints. */
 @Internal
 public class AlignedController implements CheckpointBarrierBehaviourController 
{
+    private final boolean failOnUnalignedBarriers;

Review comment:
       This will need to be reimplemented on top of @dawidwys changes :(

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
##########
@@ -871,13 +875,15 @@ public String toString() {
         private final LongCounter lostCounter = new LongCounter();
         private final LongCounter duplicatesCounter = new LongCounter();
         private final IntCounter numFailures = new IntCounter();
+        private final Duration backpressureInterval;
         private ListState<State> stateList;
         protected transient State state;
         protected final long minCheckpoints;
-        protected boolean backpressure;
+        @Nullable private Deadline backpressureUntil;
 
-        protected VerifyingSinkBase(long minCheckpoints) {
+        protected VerifyingSinkBase(long minCheckpoints, long 
checkpointingInterval) {
             this.minCheckpoints = minCheckpoints;
+            this.backpressureInterval = 
Duration.ofMillis(checkpointingInterval);

Review comment:
       I think in this commit you are changing too many independent things? I'm 
not following those back pressure changes, why do we need them?
   
   Also how is it supposed to be working? We are backpressuring only for the 
first checkpoint? Or are you bumping this backpressure after every snapshot 
state call? But in that case, I would be afraid backpressure is gone until we 
trigger next checkpoint, if there is even a small delay in for example 
checkpoint notification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to