pnowojski commented on a change in pull request #15294: URL: https://github.com/apache/flink/pull/15294#discussion_r602380927
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ########## @@ -686,6 +686,8 @@ private void setVertexConfig( } } config.setInputs(inputConfigs); + config.setSupportsUnalignedInput( + inEdges.isEmpty() || inEdges.get(0).supportsUnalignedCheckpoints()); Review comment: why do we check only the first edge? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ########## @@ -400,32 +400,31 @@ public TaskAcknowledgeResult acknowledgeTask( int subtaskIndex = vertex.getParallelSubtaskIndex(); long ackTimestamp = System.currentTimeMillis(); - if (operatorSubtaskStates != null) { - for (OperatorIDPair operatorID : operatorIDs) { - - OperatorSubtaskState operatorSubtaskState = - operatorSubtaskStates.getSubtaskStateByOperatorID( - operatorID.getGeneratedOperatorID()); - - // if no real operatorSubtaskState was reported, we insert an empty state - if (operatorSubtaskState == null) { - operatorSubtaskState = OperatorSubtaskState.builder().build(); - } - - OperatorState operatorState = - operatorStates.get(operatorID.getGeneratedOperatorID()); - - if (operatorState == null) { - operatorState = - new OperatorState( - operatorID.getGeneratedOperatorID(), - vertex.getTotalNumberOfParallelSubtasks(), - vertex.getMaxParallelism()); - operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState); - } - - operatorState.putState(subtaskIndex, operatorSubtaskState); Review comment: Is this change backward compatible? We we be able to recover from older checkpoints/savepoints? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ########## @@ -400,32 +400,31 @@ public TaskAcknowledgeResult acknowledgeTask( int subtaskIndex = vertex.getParallelSubtaskIndex(); long ackTimestamp = System.currentTimeMillis(); - if (operatorSubtaskStates != null) { - for (OperatorIDPair operatorID : operatorIDs) { - - OperatorSubtaskState operatorSubtaskState = - operatorSubtaskStates.getSubtaskStateByOperatorID( - operatorID.getGeneratedOperatorID()); - - // if no real operatorSubtaskState was reported, we insert an empty state - if (operatorSubtaskState == null) { - operatorSubtaskState = OperatorSubtaskState.builder().build(); - } - - OperatorState operatorState = - operatorStates.get(operatorID.getGeneratedOperatorID()); - - if (operatorState == null) { - operatorState = - new OperatorState( - operatorID.getGeneratedOperatorID(), - vertex.getTotalNumberOfParallelSubtasks(), - vertex.getMaxParallelism()); - operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState); - } - - operatorState.putState(subtaskIndex, operatorSubtaskState); Review comment: missing test coverage for this change? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java ########## @@ -343,7 +343,7 @@ public static DataType getDataType(AbstractEvent event, boolean hasPriority) { return EVENT_BUFFER; } CheckpointBarrier barrier = (CheckpointBarrier) event; - if (barrier.getCheckpointOptions().needsAlignment()) { + if (barrier.getCheckpointOptions().isExactlyOnceMode()) { Review comment: ? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java ########## @@ -61,7 +67,7 @@ PIPELINE { @Override UnalignedSettings createSettings(int parallelism) { - int numShuffles = 8; + int numShuffles = 10; Review comment: Can you extract this magic constant `10`? And maybe ideally calculate it automatically? Are we at least verifying it somewhere? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ########## @@ -107,13 +107,20 @@ private static final String TIME_CHARACTERISTIC = "timechar"; private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; + private static final ConfigOption<Boolean> STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions.key("statebackend.useManagedMemory") .booleanType() .noDefaultValue() .withDescription( "If state backend is specified, whether it uses managed memory."); + private static final ConfigOption<Boolean> SUPPORTS_UNALIGNED_INPUT = + ConfigOptions.key("unaligned_input") + .booleanType() + .defaultValue(true) + .withDescription("Flag whether all input exchanges support unaligned input."); + Review comment: Why do we have this flag both in the `StreamConfig` and `StreamEdge`? Why not just one place? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedController.java ########## @@ -39,6 +40,8 @@ /** Controller for aligned checkpoints. */ @Internal public class AlignedController implements CheckpointBarrierBehaviourController { + private final boolean failOnUnalignedBarriers; Review comment: This will need to be reimplemented on top of @dawidwys changes :( ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java ########## @@ -871,13 +875,15 @@ public String toString() { private final LongCounter lostCounter = new LongCounter(); private final LongCounter duplicatesCounter = new LongCounter(); private final IntCounter numFailures = new IntCounter(); + private final Duration backpressureInterval; private ListState<State> stateList; protected transient State state; protected final long minCheckpoints; - protected boolean backpressure; + @Nullable private Deadline backpressureUntil; - protected VerifyingSinkBase(long minCheckpoints) { + protected VerifyingSinkBase(long minCheckpoints, long checkpointingInterval) { this.minCheckpoints = minCheckpoints; + this.backpressureInterval = Duration.ofMillis(checkpointingInterval); Review comment: I think in this commit you are changing too many independent things? I'm not following those back pressure changes, why do we need them? Also how is it supposed to be working? We are backpressuring only for the first checkpoint? Or are you bumping this backpressure after every snapshot state call? But in that case, I would be afraid backpressure is gone until we trigger next checkpoint, if there is even a small delay in for example checkpoint notification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org