[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562089223



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class PartitionGroup {
+private static final Logger LOG = 
LoggerFactory.getLogger(PartitionGroup.class);

Review comment:
   I can pass in the log context. I wouldn't pass the actual logger, 
though, because it would mess up common log4j usage patterns.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -78,15 +89,149 @@ RecordQueue queue() {
 }
 }
 
-PartitionGroup(final Map partitionQueues, 
final Sensor recordLatenessSensor) {
+PartitionGroup(final TaskId id,
+   final Map partitionQueues,
+   final Sensor recordLatenessSensor,
+   final Sensor enforcedProcessingSensor,
+   final long maxTaskIdleMs) {
+this.id = id;
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
 this.partitionQueues = partitionQueues;
+this.enforcedProcessingSensor = enforcedProcessingSensor;
+this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor = recordLatenessSensor;
 totalBuffered = 0;
 allBuffered = false;
 streamTime = RecordQueue.UNKNOWN;
 }
 
+public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+final Long lag = metadata.lag();
+if (lag != null) {
+LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+fetchedLags.put(partition, lag);
+}
+}
+
+public boolean readyToProcess(final long wallClockTime) {
+if (LOG.isTraceEnabled()) {
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+LOG.trace(
+"[{}] buffered/lag {}: {}/{}",
+id,
+entry.getKey(),
+entry.getValue().size(),
+fetchedLags.get(entry.getKey())
+);
+}
+}
+// Log-level strategy:
+//  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+//  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+//  DEBUG when we are ready for processing and didn't have to enforce 
processing
+//  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+final Set bufferedPartitions = new HashSet<>();
+final Set emptyPartitions = new HashSet<>();
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+if (entry.getValue().isEmpty()) {
+emptyPartitions.add(entry.getKey());
+} else {
+bufferedPartitions.add(entry.getKey());
+}
+}
+LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+  "\n\tThere may be out-of-order processing for 
this task as a result." +
+  "\n\tBuffered partitions: {}" +
+  "\n\tNon-buffered partitions: {}",
+  id,
+  bufferedPartitions,
+  emptyPartitions);
+}
+return true;

Review comment:
   Glad we agree ;) 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, 
final Iterable

[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562192140



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
##
@@ -73,28 +82,22 @@
 private final byte[] recordKey = intSerializer.serialize(null, 1);
 
 private final Metrics metrics = new Metrics();
+private final Sensor enforcedProcessingSensor = 
metrics.sensor(UUID.randomUUID().toString());
 private final MetricName lastLatenessValue = new 
MetricName("record-lateness-last-value", "", "", mkMap());
 
-private PartitionGroup group;
 
 private static Sensor getValueSensor(final Metrics metrics, final 
MetricName metricName) {
 final Sensor lastRecordedValue = metrics.sensor(metricName.name());
 lastRecordedValue.add(metricName, new Value());
 return lastRecordedValue;
 }
 
-@Before

Review comment:
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562190618



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -510,9 +516,7 @@ public StreamThread(final Time time,
 this.nextProbingRebalanceMs = nextProbingRebalanceMs;
 
 this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
-final int dummyThreadIdx = 1;
-this.maxPollTimeMs = new 
InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", 
"dummyClientId", dummyThreadIdx))
-.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+this.maxPollTimeMs = maxPollTimeMs;

Review comment:
   Ah, this was leftover from when I was using the poll interval as a 
heuristic. I'll revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562090811



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, 
final Iterable

[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562089518



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -78,15 +89,149 @@ RecordQueue queue() {
 }
 }
 
-PartitionGroup(final Map partitionQueues, 
final Sensor recordLatenessSensor) {
+PartitionGroup(final TaskId id,
+   final Map partitionQueues,
+   final Sensor recordLatenessSensor,
+   final Sensor enforcedProcessingSensor,
+   final long maxTaskIdleMs) {
+this.id = id;
 nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
 this.partitionQueues = partitionQueues;
+this.enforcedProcessingSensor = enforcedProcessingSensor;
+this.maxTaskIdleMs = maxTaskIdleMs;
 this.recordLatenessSensor = recordLatenessSensor;
 totalBuffered = 0;
 allBuffered = false;
 streamTime = RecordQueue.UNKNOWN;
 }
 
+public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+final Long lag = metadata.lag();
+if (lag != null) {
+LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+fetchedLags.put(partition, lag);
+}
+}
+
+public boolean readyToProcess(final long wallClockTime) {
+if (LOG.isTraceEnabled()) {
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+LOG.trace(
+"[{}] buffered/lag {}: {}/{}",
+id,
+entry.getKey(),
+entry.getValue().size(),
+fetchedLags.get(entry.getKey())
+);
+}
+}
+// Log-level strategy:
+//  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+//  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+//  DEBUG when we are ready for processing and didn't have to enforce 
processing
+//  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+final Set bufferedPartitions = new HashSet<>();
+final Set emptyPartitions = new HashSet<>();
+for (final Map.Entry entry : 
partitionQueues.entrySet()) {
+if (entry.getValue().isEmpty()) {
+emptyPartitions.add(entry.getKey());
+} else {
+bufferedPartitions.add(entry.getKey());
+}
+}
+LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+  "\n\tThere may be out-of-order processing for 
this task as a result." +
+  "\n\tBuffered partitions: {}" +
+  "\n\tNon-buffered partitions: {}",
+  id,
+  bufferedPartitions,
+  emptyPartitions);
+}
+return true;

Review comment:
   Glad we agree ;) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9840: KAFKA-10867: Improved task idling

2021-01-21 Thread GitBox


vvcephei commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r562089223



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class PartitionGroup {
+private static final Logger LOG = 
LoggerFactory.getLogger(PartitionGroup.class);

Review comment:
   I can pass in the log context. I wouldn't pass the actual logger, 
though, because it would mess up common log4j usage patterns.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org