This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7a109823a21150c7bea9ba41fc22203cbaf7094f Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Aug 17 18:31:54 2021 +0200 [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests. This closes #16867 --- .../source/coordinator/SourceCoordinator.java | 29 ++++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 5ba4160..85a767e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -163,19 +164,31 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> public void handleEventFromOperator(int subtask, OperatorEvent event) { runInEventLoop( () -> { - LOG.debug( - "Handling event from subtask {} of source {}: {}", - subtask, - operatorName, - event); if (event instanceof RequestSplitEvent) { + LOG.info( + "Source {} received split request from parallel task {}", + operatorName, + subtask); enumerator.handleSplitRequest( subtask, ((RequestSplitEvent) event).hostName()); } else if (event instanceof SourceEventWrapper) { - enumerator.handleSourceEvent( - subtask, ((SourceEventWrapper) event).getSourceEvent()); + final SourceEvent sourceEvent = + ((SourceEventWrapper) event).getSourceEvent(); + LOG.debug( + "Source {} received custom event from parallel task {}: {}", + operatorName, + subtask, + sourceEvent); + enumerator.handleSourceEvent(subtask, sourceEvent); } else if (event instanceof ReaderRegistrationEvent) { - handleReaderRegistrationEvent((ReaderRegistrationEvent) event); + final ReaderRegistrationEvent registrationEvent = + (ReaderRegistrationEvent) event; + LOG.info( + "Source {} registering reader for parallel task {} @ {}", + operatorName, + subtask, + registrationEvent.location()); + handleReaderRegistrationEvent(registrationEvent); } else { throw new FlinkException("Unrecognized Operator Event: " + event); }