Izeren commented on code in PR #26900:
URL: https://github.com/apache/flink/pull/26900#discussion_r2398311404
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -191,6 +199,8 @@ void testToBuffer() throws IOException {
assertThat(buffer.getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
Review Comment:
Should we also need to test InputChannelState event explicitly?
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java:
##########
@@ -270,7 +270,7 @@ protected DataInputStatus processEvent(BufferOrEvent
bufferOrEvent, DataOutput<T
if (checkpointedInputGate.isFinished()) {
return DataInputStatus.END_OF_INPUT;
}
- } else if (event.getClass() == EndOfChannelStateEvent.class) {
+ } else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
Review Comment:
I assume Is it expected that we catch `OutputChannelStateEvent` for
TaskNetworkInput because input of the task is `outputBuffer`?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java:
##########
@@ -229,9 +229,9 @@ void testInputStatusAfterEndOfRecovery() throws Exception {
inputGate.sendElement(new StreamRecord<>(42L), 0);
assertThat(input.emitNext(output)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
- inputGate.sendEvent(EndOfChannelStateEvent.INSTANCE, 0);
+ inputGate.sendEvent(EndOfOutputChannelStateEvent.INSTANCE, 0);
Review Comment:
The test name says `InputStatusAfterEndOfRecovery` but we send OutputChannel
events. Is it expected?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java:
##########
@@ -408,7 +408,7 @@ public boolean isPartialRecord() {
public static DataType getDataType(AbstractEvent event, boolean
hasPriority) {
if (hasPriority) {
return PRIORITIZED_EVENT_BUFFER;
- } else if (event instanceof EndOfChannelStateEvent) {
+ } else if (event instanceof EndOfOutputChannelStateEvent) {
Review Comment:
Can this method `getDataType` receive EndOfInputChannelStateEvent? Is
`return EVENT_BUFFER` correct way to to handle this call for InputChannel
events?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java:
##########
@@ -161,6 +166,9 @@ void testToBufferConsumer() throws IOException {
assertThat(bufferConsumer.build().getDataType())
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
}
+ } else if (evt instanceof EndOfOutputChannelStateEvent) {
+ assertThat(bufferConsumer.build().getDataType())
+ .isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
Review Comment:
Should we assert InputChannelStateEvent return value explicitly? It would
better document the intention
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]