mjsax commented on code in PR #20693:
URL: https://github.com/apache/kafka/pull/20693#discussion_r2445957003
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -44,13 +44,15 @@ public class TaskExecutionMetadata {
private final Collection<Task> successfullyProcessed = new HashSet<>();
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata>
topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+ private final Logger log;
public TaskExecutionMetadata(final Set<String> allTopologyNames,
final Set<String> pausedTopologies,
final ProcessingMode processingMode) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 &&
allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
this.processingMode = processingMode;
+ this.log = new
LogContext("[TaskExecutionMetadata]").logger(TaskExecutionMetadata.class);
Review Comment:
Why do we pass the class name as log prefix, given that we set the class via
`logger(...)` anyway?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -65,12 +67,15 @@ public boolean canProcessTask(final Task task, final long
now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies
(needs KIP)
+ log.debug("Task {} processing check for unnamed topology '{}'",
task.id(), topologyName);
Review Comment:
Did we run with DEBUG enabled? This one might become very chatty?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -255,24 +257,32 @@ record = queue.poll(wallClockTime);
if (record != null) {
Review Comment:
Wondering if record be every be `null` here? -- Should we actually add an
`else` and log an ERROR (or WARN) log, or even throw `IllegalStateException` as
this should always be true?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -255,24 +257,32 @@ record = queue.poll(wallClockTime);
if (record != null) {
totalBuffered -= oldSize - queue.size();
+ logger.trace("Partition {} polling next record:, oldSize={},
newSize={}, totalBuffered={}, recordTimestamp={}",
+ queue.partition(), oldSize, queue.size(), totalBuffered,
record.timestamp);
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
+ logger.trace("Partition {} queue is now empty,
allBuffered=false", queue.partition());
} else {
nonEmptyQueuesByTime.offer(queue);
}
// always update the stream-time to the record's timestamp yet
to be processed if it is larger
if (record.timestamp > streamTime) {
+ final long oldStreamTime = streamTime;
streamTime = record.timestamp;
recordLatenessSensor.record(0, wallClockTime);
+ logger.trace("Partition {} stream time updated from {} to
{}", queue.partition(), oldStreamTime, streamTime);
} else {
- recordLatenessSensor.record(streamTime - record.timestamp,
wallClockTime);
+ final long lateness = streamTime - record.timestamp;
+ recordLatenessSensor.record(lateness, wallClockTime);
}
}
+ } else {
+ logger.trace("Partition pulling nextRecord: no queue available,
totalBuffered={}", totalBuffered);
Review Comment:
Should `totalBuffered` be zero each time we hit this log line?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -450,7 +450,7 @@ public Map<TopicPartition, OffsetAndMetadata>
prepareCommit(final boolean clean)
}
hasPendingTxCommit = eosEnabled;
- log.debug("Prepared {} task for committing", state());
+ log.debug("Prepared {} task {} for committing", state(),
id);
Review Comment:
Don't we have the task-id already in the log prefix? (Similar elsewhere)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -733,13 +733,15 @@ public boolean isProcessable(final long wallClockTime) {
if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the
commit but not process any records
// thus, the task is not processable, even if there is available
data in the record queue
+ log.debug("Stream task {} has a pending transaction commit, skip
processing it.", id());
return false;
}
final boolean readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
if (!readyToProcess) {
if (timeCurrentIdlingStarted.isEmpty()) {
timeCurrentIdlingStarted = Optional.of(wallClockTime);
}
+ log.debug("Task {} started idling at time {}", id,
timeCurrentIdlingStarted.get());
} else {
timeCurrentIdlingStarted = Optional.empty();
Review Comment:
Should we also add a "resumed" log similar to "started idling" ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]