[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +// sync offset to the target cluster only if the state of current consumer group is: +// (1) idle: because the consumer at target is not actively consuming the mirrored topic +// (2) dead: the new consumer that is recently created at source and never exist at target +if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) { +newConsumerGroup.add(group); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { +long latestDownstreamOffset = entry.getValue().offset(); +TopicPartition topicPartition = entry.getKey(); +if (!convertedUpstreamOffset.containsKey(topicPartition)) { +log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); +continue; +} + +// if translated offset from upstream is smaller than the current consumer offset +// in the target, skip updating the offset for that partition +long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); +if (latestDownstreamOffset >= convertedOffset) { +log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); +continue; +} Review comment: I would like to propose the following changes to sync the consumer group changes on source side ```suggestion for (Map.Entry convertedEntry : convertedUpstreamOffset.entrySet()) { TopicPartition topicPartition = convertedEntry.getKey(); for (Entry idleEntry : group.getValue()) { if (idleEntry.getKey() == topicPartition) { long latestDownstreamOffset = idleEntry.getValue().offset(); // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); if (latestDownstreamOffset >= convertedOffset) { log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " + "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); continue; } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For querie
[GitHub] [kafka] dajac commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
dajac commented on a change in pull request #8609: URL: https://github.com/apache/kafka/pull/8609#discussion_r419946491 ## File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala ## @@ -590,7 +569,26 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, stateChangeLog.info(s"Sending StopReplica request for ${partitionStates.size} " + s"replicas to broker $brokerId") - sendStopReplicaRequest(brokerId, brokerEpoch, false, stopReplicaTopicState) + val stopReplicaRequestBuilder = new StopReplicaRequest.Builder( +stopReplicaRequestVersion, controllerId, controllerEpoch, brokerEpoch, +false, stopReplicaTopicState.values.toBuffer.asJava) + + sendRequest(brokerId, stopReplicaRequestBuilder, (r: AbstractResponse) => { +val stopReplicaResponse = r.asInstanceOf[StopReplicaResponse] Review comment: I agree as well. Passing a function is a really good idea. I should have thought about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
dajac commented on pull request #8609: URL: https://github.com/apache/kafka/pull/8609#issuecomment-623928516 @lbradstreet @hachikuji Thanks for your feedback. I have updated the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8598: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used
dajac commented on a change in pull request #8598: URL: https://github.com/apache/kafka/pull/8598#discussion_r419970261 ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -245,7 +245,17 @@ object TopicCommand extends Logging { newTopic.configs(configsMap) val createResult = adminClient.createTopics(Collections.singleton(newTopic)) -createResult.all().get() +try { + createResult.all().get() +} catch { + case e: ExecutionException => { +val cause = e.getCause +if (cause.isInstanceOf[TopicExistsException] || topic.ifTopicDoesntExist()) { Review comment: Shouldn't we re-throw all exceptions except `TopicExistsException` if `topic.ifTopicDoesntExist`? ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -245,7 +245,17 @@ object TopicCommand extends Logging { newTopic.configs(configsMap) val createResult = adminClient.createTopics(Collections.singleton(newTopic)) -createResult.all().get() +try { + createResult.all().get() +} catch { + case e: ExecutionException => { +val cause = e.getCause +if (cause.isInstanceOf[TopicExistsException] || topic.ifTopicDoesntExist()) { + throw e +} + } +} + println(s"Created topic ${topic.name}.") } else { throw new IllegalArgumentException(s"Topic ${topic.name} already exists") Review comment: Apparently, we already verify the existence of the topic prior to creating it. Should we also handle `--if-not-exists` in this case? ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -257,21 +267,50 @@ object TopicCommand extends Logging { } override def alterTopic(opts: TopicCommandOptions): Unit = { - val topic = new CommandTopicPartition(opts) - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic) - val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values() - adminClient.createPartitions(topics.map {topicName => -if (topic.hasReplicaAssignment) { - val startPartitionId = topicsInfo.get(topicName).get().partitions().size() - val newAssignment = { -val replicaMap = topic.replicaAssignment.get.drop(startPartitionId) -new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]] + if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) { +throw new RuntimeException("Using --config or --delete-config is not supported " + + "when altering a topic via the broker API. Use kafka-configs.sh instead.") Review comment: nit: Extra space after the first `.`. ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -245,7 +245,17 @@ object TopicCommand extends Logging { newTopic.configs(configsMap) val createResult = adminClient.createTopics(Collections.singleton(newTopic)) -createResult.all().get() +try { + createResult.all().get() Review comment: nit: the parenthesis are not necessary. there is other cases when they can be removed. ## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ## @@ -257,21 +267,50 @@ object TopicCommand extends Logging { } override def alterTopic(opts: TopicCommandOptions): Unit = { - val topic = new CommandTopicPartition(opts) - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic) - val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values() - adminClient.createPartitions(topics.map {topicName => -if (topic.hasReplicaAssignment) { - val startPartitionId = topicsInfo.get(topicName).get().partitions().size() - val newAssignment = { -val replicaMap = topic.replicaAssignment.get.drop(startPartitionId) -new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]] + if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) { +throw new RuntimeException("Using --config or --delete-config is not supported " + + "when altering a topic via the broker API. Use kafka-configs.sh instead.") + } + val tp = new CommandTopicPartition(opts) + if (tp.hasPartitions) { +println("WARNING: If partitions are increased for a topic that has a key, the partition " + + "logic or ordering of the messages will be affected") +val topicDescription = try { + adminClient.describeTopics(Collections.singleton(tp.name)). +all().get().get(tp.name) Review comment: nit: unneces
[GitHub] [kafka] dajac commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
dajac commented on pull request #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-623957188 @tombentley thanks for the addition. we could perhaps add a small unit test for the sake of completeness. we already cover all the other cases but this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099743#comment-17099743 ] Mateusz Jadczyk commented on KAFKA-9891: [~bchen225242] Hi, is any follow-up planned for this? > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp
[GitHub] [kafka] nizhikov commented on pull request #8592: KAFKA-3184: Add Checkpoint for In-memory State Store
nizhikov commented on pull request #8592: URL: https://github.com/apache/kafka/pull/8592#issuecomment-623993236 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure
Eric Ward created KAFKA-9957: Summary: Kafka Controller doesn't failover during hardware failure Key: KAFKA-9957 URL: https://issues.apache.org/jira/browse/KAFKA-9957 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 2.5.0, 2.2.0 Reporter: Eric Ward Attachments: kafka-threaddump.out On a couple different production environments we've run into an issue where a hardware failure has hung up the controller and prevented controller and topic leadership from changing to a healthy broker. When the issue happens we see this repeated in the logs at regular intervals for the other brokers (the affected broker can’t write to disk, so no logging occurs there): {noformat} [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 2 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) at scala.Option.foreach(Option.scala:274) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) {noformat} This issue appears to be similar to KAFKA-7870, though that issue was purportedly fixed by KAFKA-7697. Once we encounter this error any partitions whose leadership is on the affected node are unavailable until we force that broker out of the cluster – that is to say, kill the node. When we initially hit the issue we were running on version 2.2.0, though I've been able to reproduce this in an environment running 2.5.0 as well. To simulate the hardware failure I'm using the xfs_freeze utility to suspend access to the filesystem. Zookeeper failover is also part of the mix. In all instances where we’ve seen this the ZK leader and Kafka Controller were on the same node and both affected by the hardware issue. Zookeeper is able to successfully failover, which it does rather quickly. Reproduction steps are pretty straightforward: # Spin up a 3 node cluster # Ensure that the Kafka Controller and Zookeeper Leader are on the same node. # xfs_freeze the filesystem on the node that the controller is running on This reproduces 100% of the time for me. I’ve left it running for well over an hour without any Kafka failover happening. Unfreezing the node will allow the cluster to heal itself. I’ve attached a thread dump from an environment running 2.5.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings
viktorsomogyi commented on a change in pull request #8591: URL: https://github.com/apache/kafka/pull/8591#discussion_r420040567 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -35,16 +35,7 @@ object Json { */ def parseFull(input: String): Option[JsonValue] = Review comment: I looks like `parseFull` is used in a couple of places but I can perhaps just delegate this to `parseBytes` like this: ``` def parseFull(input: String): Option[JsonValue] = parseBytes(input.getBytes(Charset.defaultCharset())) ``` Or shall I just get rid of this code and use `parseBytes` everywhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API
[ https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099806#comment-17099806 ] Gopikrishna commented on KAFKA-9909: i can understand on rebalance, but you mentioned "streams should NOT commit anything until it was closed". Not clear on what was "it" closed, did you mean that application is closed? if you mean that way, if the application closes due to an error, will it commit the offset? I handled DeserializationExceptionHandler if any ill-formatted message is received, i meant skipping offsets to avoid context.commit() on messages that cannot be processed during that time. With traditional kafka consumer, i can acknowledge the offset, but i dont have an option not to acknowledge except avoiding context.commit(). *here is the code snippet to explain the scenario, i am talking about:* public class CustomProcessor implements Processor { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { /* i am throwing a runtime exception to come out of process method without explicitly committing the offset. */ try { if (value.contains("hello")) { System.out.println("Skipping offset : "+context.offset()); throw new RuntimeException("Hello raising exception!"); } System.out.println("offset : "+context.offset()+ " partition : "+context.partition()); context.commit(); }catch (Exception e) { System.out.println("Log & Continue exception: "+e.getMessage()); } } } once the process method completes with exception (without context.commit()), the offset is still committed. > Kafka Streams : offset control to Streams API > - > > Key: KAFKA-9909 > URL: https://issues.apache.org/jira/browse/KAFKA-9909 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 > Environment: All >Reporter: Gopikrishna >Priority: Minor > Labels: Offset, commit > > Hello team, really inspired the way streams api is running today. I would > like to have a feature to be flexible regarding the offset. when we write the > processor api, processor context object can be used to commit the offset. > this is not effective. but streams are controlling the offset. the moment the > process method executed or scheduled window completed, the offset is > committed automatically by streams internally. > Like traditional kafka consumer, its better the context object should have > complete control over the offset whether to commit or not. This will give > more control to the api to handle failovers and especially when message > cannot be processed, context should not commit the offset. Appreciate this > can be implemented. > > h4. enable.auto.commit is by default false, but streams are committing > automatically the offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings
viktorsomogyi commented on a change in pull request #8591: URL: https://github.com/apache/kafka/pull/8591#discussion_r420040567 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -35,16 +35,7 @@ object Json { */ def parseFull(input: String): Option[JsonValue] = Review comment: I looks like `parseFull` is used in a couple of places as Json mentioned but I can perhaps just delegate this to `parseBytes` like this below and can continue using that in the mentioned callers. ``` def parseFull(input: String): Option[JsonValue] = parseBytes(input.getBytes(Charset.defaultCharset())) ``` Or shall I just get rid of this code and use `parseBytes` everywhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation is in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. I guess what we really would need is a way to check if a group stabilized. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned, because in that specific rebalance the corresponding partitions were revoked from the other processors. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: We can leave it because it verifies whether the assignment was triggered in the assignor. However, it does not give us any guarantee that the rebalance took actually place. I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processor had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-5948) EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException
[ https://issues.apache.org/jira/browse/KAFKA-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-5948: Assignee: Bruno Cadonna > EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException > > > Key: KAFKA-5948 > URL: https://issues.apache.org/jira/browse/KAFKA-5948 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Bruno Cadonna >Priority: Major > > Seem to be a test setup race condition: > {noformat} > kafka.common.TopicAlreadyMarkedForDeletionException: topic > singlePartitionThroughTopic is already marked for deletion > at kafka.admin.AdminUtils$.deleteTopic(AdminUtils.scala:340) > at kafka.admin.AdminUtils.deleteTopic(AdminUtils.scala) > at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:200) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:268) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:256) > at > org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:102) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics
[ https://issues.apache.org/jira/browse/KAFKA-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-9924: Assignee: Bruno Cadonna > Add RocksDB Memory Consumption to RocksDB Metrics > -- > > Key: KAFKA-9924 > URL: https://issues.apache.org/jira/browse/KAFKA-9924 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: needs-kip > > RocksDB's memory consumption should be added to the RocksDB metrics. > RocksDB's memory consumption can be retrieved with the following class: > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java > The memory consumption metrics should be added on client level and should be > recorded on INFO level. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: We can leave it because it verifies whether the assignment was triggered in the assignor, which is better than nothing. However, it does not give us any guarantee that the rebalance took actually place. I guess what we really would need is a way to check if a group stabilized and if the assignment is valid. We try to do that by verifying that the generations of the processors are synced. However, I ran into cases where all processors had the same generation, but one processor did not have any tasks assigned. So we would actually need to check if they have the highest generation in sync across the processors AND if all processors have at least one task assigned (AND if all tasks were assigned). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #7609: KAFKA-9059: Implement ReassignmentMaxLag
viktorsomogyi commented on pull request #7609: URL: https://github.com/apache/kafka/pull/7609#issuecomment-624066323 @hachikuji I can refactor it, allow me a couple of days to do it. Did you mean to move the lag tracking too from `Replica` to the fetcher side, or should I leave these at their current place and just move the metric itself? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings
ijuma commented on a change in pull request #8591: URL: https://github.com/apache/kafka/pull/8591#discussion_r420124856 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -35,16 +35,7 @@ object Json { */ def parseFull(input: String): Option[JsonValue] = Review comment: This code is fine as it is. We are suggesting that we don't need `parseBytesWithAclFallback` in `AclEntry`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7450) "Handshake message sequence violation" related ssl handshake failure leads to high cpu usage
[ https://issues.apache.org/jira/browse/KAFKA-7450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099898#comment-17099898 ] Karel Kotmel commented on KAFKA-7450: - Hi Team, I ran to the similar SSL issue with extendedKeyUsage attribute, our company policy does not support to have certificate, which has the extendedKeyUsage = serverAuth and extendedKeyUsage =clientAuth at the same time. Would it be possible to separate client and server certificate for interbroker communication? The client connection is not an issue. At this moment I got error _Invalid value javax.net.ssl.SSLHandshakeException: Extended key usage does not permit use for TLS server authentication for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings_ Regards Karel > "Handshake message sequence violation" related ssl handshake failure leads to > high cpu usage > > > Key: KAFKA-7450 > URL: https://issues.apache.org/jira/browse/KAFKA-7450 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Yu Yang >Priority: Major > > After updating security.inter.broker.protocol to SSL for our cluster, we > observed that the controller can get into almost 100% cpu usage from time to > time. > {code:java} > listeners=PLAINTEXT://:9092,SSL://:9093 > security.inter.broker.protocol=SSL > {code} > There is no obvious error in server.log. But in controller.log, there is > repetitive SSL handshare failure error as below: > {code:java} > [2018-09-28 05:53:10,821] WARN [RequestSendThread controllerId=6042] > Controller 6042's connection to broker datakafka06176.ec2.pin220.com:9093 > (id: 6176 rack: null) was unsuccessful (kafka.controller.RequestSendThread) > org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake > failed > Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, 2 > at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1487) > at > sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) > at > sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813) > at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781) > at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:468) > at > org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331) > at > org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73) > at > kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, 2 > at > sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:196) > at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:966) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:963) > at java.security.AccessController.doPrivileged(Native Method) > at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416) > at > org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473) > ... 10 more > {code} > {code:java} > [2018-09-30 00:30:13,609] WARN [ReplicaFetcher replicaId=59, leaderId=66, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=59, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={the_test_topic-18=(offset=462333447, logStartOffset=462286948, > maxBytes=4194304), the_test_topic-58=(offset=462312762, > logStartOffset=462295078, maxBytes=4194304)}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1991153671, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > org.apache.kafk
[jira] [Resolved] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation
[ https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-9731. Resolution: Fixed > Increased fetch request rate with leader selector due to HW propagation > --- > > Key: KAFKA-9731 > URL: https://issues.apache.org/jira/browse/KAFKA-9731 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.0, 2.4.1 >Reporter: Vahid Hashemian >Assignee: Ismael Juma >Priority: Major > Fix For: 2.6.0 > > Attachments: image-2020-03-17-10-19-08-987.png > > > KIP-392 adds high watermark propagation to followers as a means to better > sync up followers HW with leader. The issue we have noticed after trying out > 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case > (leader), that does not really require this high watermark propagation: > !image-2020-03-17-10-19-08-987.png|width=811,height=354! > This spike causes an increase in resource allocation (CPU) on the brokers. > An easy solution would be to disable this propagation (at least) for the > default leader selector case to improve the backward compatibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation
[ https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-9731: --- Fix Version/s: 2.6.0 > Increased fetch request rate with leader selector due to HW propagation > --- > > Key: KAFKA-9731 > URL: https://issues.apache.org/jira/browse/KAFKA-9731 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.0, 2.4.1 >Reporter: Vahid Hashemian >Assignee: Ismael Juma >Priority: Major > Fix For: 2.6.0 > > Attachments: image-2020-03-17-10-19-08-987.png > > > KIP-392 adds high watermark propagation to followers as a means to better > sync up followers HW with leader. The issue we have noticed after trying out > 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case > (leader), that does not really require this high watermark propagation: > !image-2020-03-17-10-19-08-987.png|width=811,height=354! > This spike causes an increase in resource allocation (CPU) on the brokers. > An easy solution would be to disable this propagation (at least) for the > default leader selector case to improve the backward compatibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure
[ https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099925#comment-17099925 ] Ismael Juma commented on KAFKA-9957: Looking at the thread dump, it seems like there are writes happening. For example: {quote}"data-plane-kafka-request-handler-0" #51 daemon prio=5 os_prio=0 tid=0x7fefce144000 nid=0x1a7 runnable [0x7fee9c6f]"data-plane-kafka-request-handler-0" #51 daemon prio=5 os_prio=0 tid=0x7fefce144000 nid=0x1a7 runnable [0x7fee9c6f] java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) - locked <0xc9a0f258> (a java.lang.Object) at org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:88) at org.apache.kafka.common.record.FileRecords.append(FileRecords.java:167) at kafka.log.LogSegment.append(LogSegment.scala:158) at kafka.log.Log.$anonfun$append$2(Log.scala:1171) - locked <0xc9a0eef0> (a java.lang.Object) at kafka.log.Log.append(Log.scala:2322) at kafka.log.Log.appendAsLeader(Log.scala:1002) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:969) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:957) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:794) at kafka.server.ReplicaManager$$Lambda$756/1564821644.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$15/875016237.apply(Unknown Source) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashMap$$Lambda$25/1337335626.apply(Unknown Source) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:782) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:506) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:582) at kafka.server.KafkaApis.handle(KafkaApis.scala:128) {quote} Unless the writes throw an exception, Kafka won't failover. Reading xfs freeze, it sounds like it blocks writes but doesn't necessarily fail them? > Kafka Controller doesn't failover during hardware failure > - > > Key: KAFKA-9957 > URL: https://issues.apache.org/jira/browse/KAFKA-9957 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.2.0, 2.5.0 >Reporter: Eric Ward >Priority: Critical > Attachments: kafka-threaddump.out > > > On a couple different production environments we've run into an issue where a > hardware failure has hung up the controller and prevented controller and > topic leadership from changing to a healthy broker. When the issue happens > we see this repeated in the logs at regular intervals for the other brokers > (the affected broker can’t write to disk, so no logging occurs there): > {noformat} > [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThr
[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure
[ https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099967#comment-17099967 ] Eric Ward commented on KAFKA-9957: -- That is correct. xfs_freeze will not fail the writes, the writes will just hang indefinitely. In the case of our production failures we saw similar behavior, which is what lead us to using xfs_freeze to reproduce. Any attempt to interact with the affected filesystem would hang indefinitely, both reads and writes. > Kafka Controller doesn't failover during hardware failure > - > > Key: KAFKA-9957 > URL: https://issues.apache.org/jira/browse/KAFKA-9957 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.2.0, 2.5.0 >Reporter: Eric Ward >Priority: Critical > Attachments: kafka-threaddump.out > > > On a couple different production environments we've run into an issue where a > hardware failure has hung up the controller and prevented controller and > topic leadership from changing to a healthy broker. When the issue happens > we see this repeated in the logs at regular intervals for the other brokers > (the affected broker can’t write to disk, so no logging occurs there): > {noformat} > [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {noformat} > This issue appears to be similar to KAFKA-7870, though that issue was > purportedly fixed by KAFKA-7697. > Once we encounter this error any partitions whose leadership is on the > affected node are unavailable until we force that broker out of the cluster – > that is to say, kill the node. > When we initially hit the issue we were running on version 2.2.0, though I've > been able to reproduce this in an environment running 2.5.0 as well. To > simulate the hardware failure I'm using the xfs_freeze utility to suspend > access to the filesystem. Zookeeper failover is also part of the mix. In > all instances where we’ve seen this the ZK leader and Kafka Controller were > on the same node and both affected by the hardware issue. Zookeeper is able > to successfully failover, which it does rather quickly. > Reproduction steps are pretty straightforward: > # Spin up a 3 node cluster > # Ensure that the Kafka Controller and Zookeeper Leader are on the same node. > # xfs_freeze the filesystem on the node that the controller is running on > This reproduces 100% of the time for me. I’ve left it running for well over > an hour without any Kafka failover happening. Unfreezing the node will allow > the cluster to heal itself. > I’ve attached a thread dump from an environment running 2.5.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bdbyrne commented on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change
bdbyrne commented on pull request #8615: URL: https://github.com/apache/kafka/pull/8615#issuecomment-624115331 Hi Cheng - what error are you see when trying to modify the "users" quotas? This should be supported, and if not, there's likely another bug that's also affecting "clients". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne removed a comment on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change
bdbyrne removed a comment on pull request #8615: URL: https://github.com/apache/kafka/pull/8615#issuecomment-624115331 Hi Cheng - what error are you see when trying to modify the "users" quotas? This should be supported, and if not, there's likely another bug that's also affecting "clients". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9954) Config command didn't validate the unsupported user config change
[ https://issues.apache.org/jira/browse/KAFKA-9954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1702#comment-1702 ] Brian Byrne commented on KAFKA-9954: Hi Cheng - what error are you see when trying to modify the "users" quotas? This should be supported, and if not, there's likely another bug that's also affecting "clients". > Config command didn't validate the unsupported user config change > - > > Key: KAFKA-9954 > URL: https://issues.apache.org/jira/browse/KAFKA-9954 > Project: Kafka > Issue Type: Bug >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > > {quote}bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter > --add-config producer_byte_rate=4 --entity-type users --entity-default > {quote} > > will say that the alternation is complete. However, we don't support the > alternation yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lbradstreet commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
lbradstreet commented on pull request #8609: URL: https://github.com/apache/kafka/pull/8609#issuecomment-624122999 @dajac thanks! Looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure
[ https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100018#comment-17100018 ] Ismael Juma commented on KAFKA-9957: Can you configure a timeout at the OS level? For example, see the following documentation from Amazon: {quote}Most operating systems specify a timeout for I/O operations submitted to NVMe devices. The default timeout is 30 seconds and can be changed using the {{nvme_core.io_timeout}} boot parameter. With Linux kernels earlier than version 4.6, this parameter is {{nvme.io_timeout}}. {quote} [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html] > Kafka Controller doesn't failover during hardware failure > - > > Key: KAFKA-9957 > URL: https://issues.apache.org/jira/browse/KAFKA-9957 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.2.0, 2.5.0 >Reporter: Eric Ward >Priority: Critical > Attachments: kafka-threaddump.out > > > On a couple different production environments we've run into an issue where a > hardware failure has hung up the controller and prevented controller and > topic leadership from changing to a healthy broker. When the issue happens > we see this repeated in the logs at regular intervals for the other brokers > (the affected broker can’t write to disk, so no logging occurs there): > {noformat} > [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {noformat} > This issue appears to be similar to KAFKA-7870, though that issue was > purportedly fixed by KAFKA-7697. > Once we encounter this error any partitions whose leadership is on the > affected node are unavailable until we force that broker out of the cluster – > that is to say, kill the node. > When we initially hit the issue we were running on version 2.2.0, though I've > been able to reproduce this in an environment running 2.5.0 as well. To > simulate the hardware failure I'm using the xfs_freeze utility to suspend > access to the filesystem. Zookeeper failover is also part of the mix. In > all instances where we’ve seen this the ZK leader and Kafka Controller were > on the same node and both affected by the hardware issue. Zookeeper is able > to successfully failover, which it does rather quickly. > Reproduction steps are pretty straightforward: > # Spin up a 3 node cluster > # Ensure that the Kafka Controller and Zookeeper Leader are on the same node. > # xfs_freeze the filesystem on the node that the controller is running on > This reproduces 100% of the time for me. I’ve left it running for well over > an hour without any Kafka failover happening. Unfreezing the node will allow > the cluster to heal itself. > I’ve attached a thread dump from an environment running 2.5.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100020#comment-17100020 ] Boyang Chen commented on KAFKA-9891: We had some offline discussion, and plan to revise the integration test PR as well. We want to test out on trunk first, and then backport to 2.4 to ensure the regression doesn't carry over. > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100021#comment-17100021 ] Boyang Chen commented on KAFKA-9891: [~mateuszjadczyk] on the other hand, if you are willing to work on this ticket, feel free to take over, as I realize that you have better context of this problem in 2.4 than I do. > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.i
[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
dhruvilshah3 commented on a change in pull request #8609: URL: https://github.com/apache/kafka/pull/8609#discussion_r420238891 ## File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala ## @@ -550,6 +550,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1 else 0 +def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => Boolean) +(response: AbstractResponse): Unit = { + val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse] + val partitionErrorsForDeletingTopics = mutable.Map.empty[TopicPartition, Errors] + stopReplicaResponse.partitionErrors.asScala.foreach { pe => +val tp = new TopicPartition(pe.topicName, pe.partitionIndex) +if (controllerContext.isTopicDeletionInProgress(pe.topicName) && +isPartitionDeleted(tp)) { + partitionErrorsForDeletingTopics += tp -> Errors.forCode(pe.errorCode) Review comment: nit: `partitionErrorsForDeletingTopics` seems a bit ambiguous and makes it sound like it only includes partitions for which StopReplicaRequest failed. Perhaps something like `partitionToError` is better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest
[ https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Ambroff-Kao resolved KAFKA-6468. - Resolution: Fixed > Replication high watermark checkpoint file read for every LeaderAndIsrRequest > - > > Key: KAFKA-6468 > URL: https://issues.apache.org/jira/browse/KAFKA-6468 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > The high watermark for each partition in a given log directory is written to > disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This > checkpoint file is used to create replicas when joining the cluster. > [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180] > Unfortunately this file is read every time > kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this > isn't a big deal, but for a small cluster with lots of partitions all of the > reads of this file really add up. > On my local test cluster of three brokers with around 40k partitions, the > initial LeaderAndIsrRequest refers to every partition in the cluster, and it > can take 20 to 30 minutes to create all of the replicas because the > _replication-offset-checkpoint_ is nearly 2MB. > Changing this code so that we only read this file once on startup reduces the > time to create all replicas to around one minute. > Credit to [~onurkaraman] for finding this one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest
[ https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100047#comment-17100047 ] Kyle Ambroff-Kao commented on KAFKA-6468: - Linking as a duplicate of KAFKA-8333, which has a patch that fixes this problem. > Replication high watermark checkpoint file read for every LeaderAndIsrRequest > - > > Key: KAFKA-6468 > URL: https://issues.apache.org/jira/browse/KAFKA-6468 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > The high watermark for each partition in a given log directory is written to > disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This > checkpoint file is used to create replicas when joining the cluster. > [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180] > Unfortunately this file is read every time > kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this > isn't a big deal, but for a small cluster with lots of partitions all of the > reads of this file really add up. > On my local test cluster of three brokers with around 40k partitions, the > initial LeaderAndIsrRequest refers to every partition in the cluster, and it > can take 20 to 30 minutes to create all of the replicas because the > _replication-offset-checkpoint_ is nearly 2MB. > Changing this code so that we only read this file once on startup reduces the > time to create all replicas to around one minute. > Credit to [~onurkaraman] for finding this one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ambroff commented on pull request #4468: KAFKA-6468 Read replication-offset-checkpoint once
ambroff commented on pull request #4468: URL: https://github.com/apache/kafka/pull/4468#issuecomment-624160139 Abandoning this PR since it looks like PR #6800 fixed this problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +// sync offset to the target cluster only if the state of current consumer group is: +// (1) idle: because the consumer at target is not actively consuming the mirrored topic +// (2) dead: the new consumer that is recently created at source and never exist at target +if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) { +newConsumerGroup.add(group); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { +long latestDownstreamOffset = entry.getValue().offset(); +TopicPartition topicPartition = entry.getKey(); +if (!convertedUpstreamOffset.containsKey(topicPartition)) { +log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); +continue; +} + +// if translated offset from upstream is smaller than the current consumer offset +// in the target, skip updating the offset for that partition +long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); +if (latestDownstreamOffset >= convertedOffset) { +log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); +continue; +} Review comment: I would like to propose the following changes to sync the consumer group changes on source side ```suggestion for (Map.Entry convertedEntry : convertedUpstreamOffset.entrySet()) { TopicPartition topicPartition = convertedEntry.getKey(); for (Entry idleEntry : group.getValue()) { if (idleEntry.getKey() == topicPartition) { long latestDownstreamOffset = idleEntry.getValue().offset(); // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); if (latestDownstreamOffset >= convertedOffset) { log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " + "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); continue; } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For querie
[GitHub] [kafka] C0urante commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions
C0urante commented on a change in pull request #8618: URL: https://github.com/apache/kafka/pull/8618#discussion_r420248205 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ## @@ -856,6 +857,49 @@ public void run() { PowerMock.verifyAll(); } +@Test +public void testSinkTasksHandleCloseErrors() throws Exception { +createTask(initialState); +expectInitializeTask(); +expectTaskGetTopic(true); + +// Put one message through the task to get some offsets to commit +expectConsumerPoll(1); +expectConversionAndTransformation(1); +sinkTask.put(EasyMock.anyObject()); +PowerMock.expectLastCall().andVoid(); + +// Throw an exception on the next put to trigger shutdown behavior +// This exception is the true "cause" of the failure +expectConsumerPoll(1); +expectConversionAndTransformation(1); +Throwable a = new RuntimeException(); Review comment: If we're going to refer to this later while making assertions, a more descriptive name might help readability. Something like `putFailure` here and `closeFailure` below, maybe? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ## @@ -856,6 +857,49 @@ public void run() { PowerMock.verifyAll(); } +@Test +public void testSinkTasksHandleCloseErrors() throws Exception { +createTask(initialState); +expectInitializeTask(); +expectTaskGetTopic(true); + +// Put one message through the task to get some offsets to commit +expectConsumerPoll(1); +expectConversionAndTransformation(1); +sinkTask.put(EasyMock.anyObject()); +PowerMock.expectLastCall().andVoid(); + +// Throw an exception on the next put to trigger shutdown behavior +// This exception is the true "cause" of the failure +expectConsumerPoll(1); +expectConversionAndTransformation(1); +Throwable a = new RuntimeException(); +sinkTask.put(EasyMock.anyObject()); +PowerMock.expectLastCall().andThrow(a); + +// Throw another exception while closing the task's assignment +EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())) +.andStubReturn(Collections.emptyMap()); +Throwable b = new RuntimeException(); +sinkTask.close(EasyMock.anyObject()); +PowerMock.expectLastCall().andThrow(b); + +PowerMock.replayAll(); + +workerTask.initialize(TASK_CONFIG); +try { +workerTask.execute(); +fail(); +} catch (Throwable t) { +PowerMock.verifyAll(); +// The exception from close should not shadow the exception from put. Review comment: Might be better to use this text as the message for the assertions instead of putting it here as a comment? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ## @@ -856,6 +857,49 @@ public void run() { PowerMock.verifyAll(); } +@Test +public void testSinkTasksHandleCloseErrors() throws Exception { +createTask(initialState); +expectInitializeTask(); +expectTaskGetTopic(true); + +// Put one message through the task to get some offsets to commit +expectConsumerPoll(1); +expectConversionAndTransformation(1); +sinkTask.put(EasyMock.anyObject()); +PowerMock.expectLastCall().andVoid(); + +// Throw an exception on the next put to trigger shutdown behavior +// This exception is the true "cause" of the failure +expectConsumerPoll(1); +expectConversionAndTransformation(1); +Throwable a = new RuntimeException(); +sinkTask.put(EasyMock.anyObject()); +PowerMock.expectLastCall().andThrow(a); + +// Throw another exception while closing the task's assignment +EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())) +.andStubReturn(Collections.emptyMap()); +Throwable b = new RuntimeException(); +sinkTask.close(EasyMock.anyObject()); +PowerMock.expectLastCall().andThrow(b); + +PowerMock.replayAll(); + +workerTask.initialize(TASK_CONFIG); +try { +workerTask.execute(); +fail(); +} catch (Throwable t) { Review comment: The call to `fail()` above is going to get caught here, isn't it? Think that might make this difficult to debug if a future change causes this to fail for some reason. Based on the `deliverMessages` method where `SinkTask::put` is invoked, it looks like we might be able to narrow this down to a `ConnectException`. This is an automated message from the Apache Git Service. To r
[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r420261901 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +// sync offset to the target cluster only if the state of current consumer group is: +// (1) idle: because the consumer at target is not actively consuming the mirrored topic +// (2) dead: the new consumer that is recently created at source and never exist at target +if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) { +newConsumerGroup.add(group); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { +long latestDownstreamOffset = entry.getValue().offset(); +TopicPartition topicPartition = entry.getKey(); +if (!convertedUpstreamOffset.containsKey(topicPartition)) { +log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); +continue; +} + +// if translated offset from upstream is smaller than the current consumer offset +// in the target, skip updating the offset for that partition +long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); +if (latestDownstreamOffset >= convertedOffset) { +log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " ++ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); +continue; +} +offsetToSync.put(entry.getKey(), convertedUpstreamOffset.get(topicPartition)); Review comment: I would like to propose the following change to take care of the source consumer group changes ```suggestion for (Map.Entry convertedEntry : convertedUpstreamOffset.entrySet()) { TopicPartition topicPartition = convertedEntry.getKey(); for (Entry idleEntry : group.getValue()) { if (idleEntry.getKey() == topicPartition) { long latestDownstreamOffset = idleEntry.getValue().offset(); // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); if (latestDownstreamOffset >= convertedOffset) { log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " + "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); continue; } } } offsetToSync.put(convertedEntry.getKey(), convertedUpstreamOffset.get(topicPartition)); ``` ---
[GitHub] [kafka] ableegoldman commented on pull request #8496: KAFKA-9748: Add Streams eos-beta integration test
ableegoldman commented on pull request #8496: URL: https://github.com/apache/kafka/pull/8496#issuecomment-624181714 > Java 11: `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` @mjsax Should we use @ignore rather than merge known flaky tests while they're still under investigation? That's what we did for some of the KIP-441 tests (granted they were failing at almost 100%, but still 🙂) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #8496: KAFKA-9748: Add Streams eos-beta integration test
ableegoldman edited a comment on pull request #8496: URL: https://github.com/apache/kafka/pull/8496#issuecomment-624181714 > Java 11: `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` @mjsax Should we use @ignore rather than merge known flaky tests while they're still under investigation? That's what we did for some of the KIP-441 tests (granted they were failing at almost 100%, but still 🙂) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
ijuma commented on pull request #8609: URL: https://github.com/apache/kafka/pull/8609#issuecomment-624186102 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
tombentley commented on pull request #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-624187386 @dajac sure. I added tests for the create and delete topics cases too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions
gharris1727 commented on pull request #8618: URL: https://github.com/apache/kafka/pull/8618#issuecomment-624196656 @rhauch @kkonstantine This is ready for committer review. Thanks for taking a look! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
[ https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100107#comment-17100107 ] Matthias J. Sax commented on KAFKA-9798: Failed again: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/] > Flaky test: > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses > > > Key: KAFKA-9798 > URL: https://issues.apache.org/jira/browse/KAFKA-9798 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test, test > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
[ https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100107#comment-17100107 ] Matthias J. Sax edited comment on KAFKA-9798 at 5/5/20, 5:31 PM: - Failed again: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/] {quote}java.lang.AssertionError: Did not receive all 1 records from topic output-concurrent-QueryableStateIntegrationTestshouldAllowConcurrentAccesses within 12 ms Expected: is a value equal to or greater than <1> but: <0> was less than <1> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinValuesRecordsReceived$6(IntegrationTestUtils.java:747) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:743) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.waitUntilAtLeastNumRecordProcessed(QueryableStateIntegrationTest.java:1159) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses(QueryableStateIntegrationTest.java:650){quote} was (Author: mjsax): Failed again: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/] > Flaky test: > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses > > > Key: KAFKA-9798 > URL: https://issues.apache.org/jira/browse/KAFKA-9798 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test, test > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100108#comment-17100108 ] Matthias J. Sax commented on KAFKA-9831: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_/] {quote}java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105), KeyValue(0, 55), KeyValue(0, 66)]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:280) at org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:481){quote} > Failing test: > EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta] > -- > > Key: KAFKA-9831 > URL: https://issues.apache.org/jira/browse/KAFKA-9831 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: John Roesler >Assignee: Matthias J. Sax >Priority: Major > Attachments: one.stdout.txt, two.stdout.txt > > > I've seen this fail twice in a row on the same build, but with different > errors. Stacktraces follow; stdout is attached. > One: > {noformat} > java.lang.AssertionError: Did not receive all 40 records from topic > singlePartitionOutputTopic within 6 ms > Expected: is a value equal to or greater than <40> > but: <39> was less than <40> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491) > at > org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766) > at > org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >
[jira] [Commented] (KAFKA-9949) Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin
[ https://issues.apache.org/jira/browse/KAFKA-9949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100110#comment-17100110 ] Matthias J. Sax commented on KAFKA-9949: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6143/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/] > Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin > > > Key: KAFKA-9949 > URL: https://issues.apache.org/jira/browse/KAFKA-9949 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/248/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > waiting for final values at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at > org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableIntegrationTest.java:175){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8600: KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest
mjsax commented on pull request #8600: URL: https://github.com/apache/kafka/pull/8600#issuecomment-624199715 Java 14 passed. Java 8: ``` org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once] org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses ``` Java 11: `org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ncliang commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions
ncliang commented on a change in pull request #8618: URL: https://github.com/apache/kafka/pull/8618#discussion_r420288853 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -193,13 +194,11 @@ public void transitionTo(TargetState state) { @Override public void execute() { initializeAndStart(); -try { +// Make sure any uncommitted data has been committed and the task has +// a chance to clean up its state +try (QuietClosable ignored = this::closePartitions) { Review comment: I am not sure I understand how the suppressed exception is logged and not just silently swallowed? Do we need to call ` getSuppressed` and log those somewhere or use one of those `closeQuietly` methods on `Utils`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)
vvcephei commented on pull request #8616: URL: https://github.com/apache/kafka/pull/8616#issuecomment-624208471 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100135#comment-17100135 ] Mateusz Jadczyk commented on KAFKA-9891: I don't think I have enough knowledge to propose a fix, I'm afraid. One thing is debugging, another one making it work :) > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoin
[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions
gharris1727 commented on a change in pull request #8618: URL: https://github.com/apache/kafka/pull/8618#discussion_r420298081 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -193,13 +194,11 @@ public void transitionTo(TargetState state) { @Override public void execute() { initializeAndStart(); -try { +// Make sure any uncommitted data has been committed and the task has +// a chance to clean up its state +try (QuietClosable ignored = this::closePartitions) { Review comment: This PR is not changing any of the printing logic, and that's still handled by the caller, `WorkerTask::doRun`. This is roughly what a suppressed exception looks like when it gets logged (from the test setup, not a live connector): ``` org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:569) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) Suppressed: java.lang.RuntimeException at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46) at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101) at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97) at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.close() at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:402) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:599) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202) ... 57 more Caused by: java.lang.RuntimeException at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46) at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101) at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:68) at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.put( ) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:547) ... 60 more ``` Suppressed exceptions are a native Java feature, and log4j supports printing their stacktraces. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-4696: -- Fix Version/s: 2.6.0 Assignee: Sophie Blee-Goldman (was: Richard Yu) Forgot about this ticket when submitting the PR, but this ended up being fixed via [https://github.com/apache/kafka/pull/8147] > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.11.0.0, 0.10.2.0 >Reporter: Damian Guy >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
vvcephei commented on a change in pull request #8613: URL: https://github.com/apache/kafka/pull/8613#discussion_r420306701 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) -else: -log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 8 and trigger new rebalance.", +log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", Review comment: Thanks, all. This doesn't seem like the best way to verify what we're trying to verify, but it also seems about the same as before. I'm happy to leave this here for now. If/when the test breaks again, I'd prefer for us to put in a more reliable and direct mechanism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
vvcephei commented on pull request #8613: URL: https://github.com/apache/kafka/pull/8613#issuecomment-624218014 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)
vvcephei commented on pull request #8616: URL: https://github.com/apache/kafka/pull/8616#issuecomment-624218222 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)
vvcephei commented on pull request #8616: URL: https://github.com/apache/kafka/pull/8616#issuecomment-624218152 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100156#comment-17100156 ] Boyang Chen commented on KAFKA-9891: [~mateuszjadczyk] Lol, no worry, thanks a lot for the insight you provided so far, will revise the integration test according to them. > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp check
[GitHub] [kafka] hachikuji commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
hachikuji commented on pull request #8609: URL: https://github.com/apache/kafka/pull/8609#issuecomment-624221761 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
hachikuji commented on pull request #8609: URL: https://github.com/apache/kafka/pull/8609#issuecomment-62436 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
bdbyrne commented on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624225725 So I believe the error to be here: https://github.com/apache/kafka/pull/8610/files#diff-faf8cea6a3a0fab5e056ad5fee22ff3eR369-R375 It should be translating the default entity type into `null`, but it's actually passing the name along as-is. The server-side logic should be correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
bdbyrne edited a comment on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624225725 So I believe the error to be here: https://github.com/apache/kafka/pull/8610/files#diff-faf8cea6a3a0fab5e056ad5fee22ff3eR369-R375 (In case the link doesn't work, it's where the ConfigCommand constructs the ClientQuotaEntity.) It should be translating the default entity type into `null`, but it's actually passing the name along as-is. The server-side logic should be correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
hachikuji commented on a change in pull request #8609: URL: https://github.com/apache/kafka/pull/8609#discussion_r420319459 ## File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala ## @@ -550,6 +550,22 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1 else 0 +def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => Boolean) +(response: AbstractResponse): Unit = { + val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse] + val partitionErrorsForDeletingTopics = mutable.Map.empty[TopicPartition, Errors] + stopReplicaResponse.partitionErrors.asScala.foreach { pe => +val tp = new TopicPartition(pe.topicName, pe.partitionIndex) +if (controllerContext.isTopicDeletionInProgress(pe.topicName) && +isPartitionDeleted(tp)) { + partitionErrorsForDeletingTopics += tp -> Errors.forCode(pe.errorCode) Review comment: I think the name seems ok. To me it means that the map includes the errors of all topic being deleted. It might be nice if it could reflect that this is only covering partitions which were also requested to be deleted in the StopReplica request, but that name probably becomes unwieldy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8434: KAFKA-7599: Don't throttle in Trogdor when targetMessagesPerSec is 0
ijuma commented on pull request #8434: URL: https://github.com/apache/kafka/pull/8434#issuecomment-624230380 `0` to mean `infinite` is a bit confusing. Maybe we can use `-1` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
[ https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100179#comment-17100179 ] Guozhang Wang commented on KAFKA-9798: -- This is due to a known issue which is fixed in a hotfix --- let's see if it fails again from now on. > Flaky test: > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses > > > Key: KAFKA-9798 > URL: https://issues.apache.org/jira/browse/KAFKA-9798 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test, test > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API
[ https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100186#comment-17100186 ] Guozhang Wang commented on KAFKA-9909: -- I think the reason here is that you catch the exception and continue, i.e. let's say you have three records {{a,b,c}} with offset {{1,2,3}} where {{b}} is ill-formatted: 1. {{a}} is processed normally, and then committed. 2. {{b}} found ill-formatted, and then skipped. 3. {{c}} is still processed normally, and then committed. Now we would commit up to {{3}} which would include {{b}}'s offset {{2}}. Even in consumer, we do not support things like "offset 1 and 3 are committed, but offset 2 is skipped". I.e. if you do not want to commit offset {{2}}, you'd have to either send the record to a queue to bookkeep it, or just stop the app immediately and do not continue to process and commit {{c}}. You can read this section https://docs.confluent.io/current/streams/faq.html#failure-and-exception-handling for some more suggestions. > Kafka Streams : offset control to Streams API > - > > Key: KAFKA-9909 > URL: https://issues.apache.org/jira/browse/KAFKA-9909 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 > Environment: All >Reporter: Gopikrishna >Priority: Minor > Labels: Offset, commit > > Hello team, really inspired the way streams api is running today. I would > like to have a feature to be flexible regarding the offset. when we write the > processor api, processor context object can be used to commit the offset. > this is not effective. but streams are controlling the offset. the moment the > process method executed or scheduled window completed, the offset is > committed automatically by streams internally. > Like traditional kafka consumer, its better the context object should have > complete control over the offset whether to commit or not. This will give > more control to the api to handle failovers and especially when message > cannot be processed, context should not commit the offset. Appreciate this > can be implemented. > > h4. enable.auto.commit is by default false, but streams are committing > automatically the offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] scott-hendricks commented on pull request #8434: KAFKA-7599: Don't throttle in Trogdor when targetMessagesPerSec is 0
scott-hendricks commented on pull request #8434: URL: https://github.com/apache/kafka/pull/8434#issuecomment-624246201 > `0` to mean `infinite` is a bit confusing. Maybe we can use `-1` instead? This is anything 0 or less though. I would prefer 0 because our current clients do calculations before submitting the values and -1 would make this incredibly difficult. 0 is not a valid input for producers in the current code, which is why we figured 0 would be OK here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r418108095 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3641,37 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); -Call findCoordinatorCall = getFindCoordinatorCall(context, -() -> getRemoveMembersFromGroupCall(context)); +Call findCoordinatorCall; +if (options.removeAll()) { +List members = getMembersFromGroup(groupId); +findCoordinatorCall = getFindCoordinatorCall(context, +() -> getRemoveMembersFromGroupCall(context, members)); Review comment: could we pass the members into the context? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ## @@ -32,12 +32,23 @@ public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions { private Set members; Review comment: Could we just make members to be `Optional>` so that we don't need a separate removeAll parameter? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); +ex.printStackTrace(); +} + +List memberToRemove = new ArrayList<>(); +for (MemberDescription member: members) { Review comment: style error here. I would recommend doing a self style check like: `./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest spotbugsScoverage compileTestJava` otherwise we still need to fix those failures after we do jenkins build. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); Review comment: Remove print statements ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); +ex.printStackTrace(); Review comment: Curious why we are still continuing in this case, as the member lookup already fails. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #8619: MINOR: Improve TopologyTestDriver JavaDocs
mjsax opened a new pull request #8619: URL: https://github.com/apache/kafka/pull/8619 Call for review @bbejeck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
cadonna commented on pull request #8613: URL: https://github.com/apache/kafka/pull/8613#issuecomment-624258204 Just in case, I re-run the system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3929/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py
vvcephei commented on pull request #8613: URL: https://github.com/apache/kafka/pull/8613#issuecomment-624270635 Thanks @cadonna , Let's see how those tests play out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)
vvcephei commented on pull request #8616: URL: https://github.com/apache/kafka/pull/8616#issuecomment-624270869 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs
vvcephei commented on a change in pull request #8603: URL: https://github.com/apache/kafka/pull/8603#discussion_r420377361 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval, void forward(final K1 key, final V1 value, final String childName); /** - * Requests a commit + * Requests a commit. */ void commit(); /** * Returns the topic name of the current input record; could be null if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the topic name */ String topic(); /** * Returns the partition id of the current input record; could be -1 if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the partition id */ int partition(); /** * Returns the offset of the current input record; could be -1 if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the offset */ long offset(); /** - * Returns the headers of the current input record; could be null if it is not available + * Returns the headers of the current input record; could be null if it is not + * available (for example, if this method is invoked from the punctuate call). + * * @return the headers */ Headers headers(); /** * Returns the current timestamp. * - * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from + * If it is triggered while processing a record streamed from the source processor, + * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. * - * If it is triggered while processing a record generated not from the source processor (for example, + * If it is triggered while processing a record generated not from the source processor (for example, * if this method is invoked from the punctuate call), timestamp is defined as the current - * task's stream time, which is defined as the smallest among all its input stream partition timestamps. + * task's stream time, which is defined as the largest among all its input stream partition timestamps. Review comment: I just took another look at the definition of streamTime, and it actually looks like it might be computed wrongly. The way it works is that the "stream time" for a task is computed most of the time in `org.apache.kafka.streams.processor.internals.PartitionGroup#nextRecord`, i.e., it's the max timestamp of any record _polled from the PartitionGroup_. However, when we commit, we commit the "partition time" for each TopicPartition, which is set when we move a record into the head position for that queue. During restoration, we read these committed timestamps for each TopicPartition, and we (incorrectly) set the "stream time" to be the maximum over the "partition time" of each partition in the PartitionGroup (aka Task). This is incorrect in two ways: 1. it should be the minimum, not the maximum (since we would choose the record with the minimum timestamp to process next) 2. the timestamp of the _head enqueued_ record (partition time) is not the timestamp of the _last dequeued_ record (stream time). I'll file a Jira ticket capturing all this. In the mean time, I'd suggest that we just update the docs to reflect the correct definition of "stream time": `which is defined as the largest timestamp of any record processed by the task`. Then, we can fix the code to make this true all the time. Currently, it's only true in steady state, not immediately after restoration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9958) Add a new method to AuthorizerServerInfo Interface
Jeff Huang created KAFKA-9958: - Summary: Add a new method to AuthorizerServerInfo Interface Key: KAFKA-9958 URL: https://issues.apache.org/jira/browse/KAFKA-9958 Project: Kafka Issue Type: Improvement Components: core Reporter: Jeff Huang Fix For: 2.6.0 More details here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-608+-+Add+a+new+method+to+AuthorizerServerInfo+Interface] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8600: KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest
guozhangwang commented on pull request #8600: URL: https://github.com/apache/kafka/pull/8600#issuecomment-624284868 I still see the following issue locally: ``` java.lang.AssertionError: Condition not met within timeout 3. waiting for final values expected: {a=1+F, b=2+G, c=3+H, d=4+I, e=5+J} received: {a=1+A, b=2+G, c=3+H, d=4+I, e=5+J} at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$17(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:205) ``` In addition, sometimes the test will hang as well (i.e. the above verification would not fail, the test just runs forever); I tried using different assignor via `INTERNAL_TASK_ASSIGNOR_CLASS` but the same hanging issue still exists. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7472) Implement KIP-145 transformations
[ https://issues.apache.org/jira/browse/KAFKA-7472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100228#comment-17100228 ] Loic DIVAD commented on KAFKA-7472: --- Thank you [~rhauch]! :) > Implement KIP-145 transformations > -- > > Key: KAFKA-7472 > URL: https://issues.apache.org/jira/browse/KAFKA-7472 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Loic DIVAD >Priority: Critical > > As part of > [KIP-145|https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect], > several SMTs were described and approved. However, they were never > implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
d8tltanc commented on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593 Hi @bdbyrne , thanks for the comment. The link seems not working. I guess you mean this part we should replace the empty string "" by null? `private[admin] def entityNames(): List[String] = { val namesIterator = options.valuesOf(entityName).iterator options.specs.asScala .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++ entityFlags .filter(entity => options.has(entity._1)) .map(entity => options.valueOf(entity._1)) ++ entityDefaultsFlags .filter(entity => options.has(entity._1)) .map(_ => "") }` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
d8tltanc edited a comment on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593 Hi @bdbyrne , thanks for the comment. The link seems not working. I guess you mean this part we should replace the empty string "" by null? private[admin] def entityNames(): List[String] = { val namesIterator = options.valuesOf(entityName).iterator options.specs.asScala .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++ entityFlags .filter(entity => options.has(entity._1)) .map(entity => options.valueOf(entity._1)) ++ entityDefaultsFlags .filter(entity => options.has(entity._1)) .map(_ => "") } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
d8tltanc edited a comment on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593 Hi @bdbyrne , thanks for the comment. The link seems not working. I guess you mean this part we should replace the empty string "" by null? > private[admin] def entityNames(): List[String] = { > val namesIterator = options.valuesOf(entityName).iterator > options.specs.asScala > .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) > .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++ > entityFlags > .filter(entity => options.has(entity._1)) > .map(entity => options.valueOf(entity._1)) ++ > entityDefaultsFlags > .filter(entity => options.has(entity._1)) > .map(_ => "") > } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8294: KAFKA-9718; Don't log passwords for AlterConfigs in request logs
cmccabe commented on pull request #8294: URL: https://github.com/apache/kafka/pull/8294#issuecomment-624309696 backported to 2.5.x This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8260: KAFKA-9625: Fixing IncrementalAlterConfigs with respect to Broker Configs
cmccabe commented on pull request #8260: URL: https://github.com/apache/kafka/pull/8260#issuecomment-624312027 backported to 2.5.x This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9718) Don't log passwords for AlterConfigs requests in request logs
[ https://issues.apache.org/jira/browse/KAFKA-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-9718: Fix Version/s: 2.5.1 > Don't log passwords for AlterConfigs requests in request logs > - > > Key: KAFKA-9718 > URL: https://issues.apache.org/jira/browse/KAFKA-9718 > Project: Kafka > Issue Type: Bug >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > We currently avoid logging passwords in log files by logging only parsed > values were passwords are logged as `[hidden]`. But for AlterConfigs requests > in request logs, we log all entries since they just appear as string entries. > Since we allow altering password configs like SSL key passwords and JAAS > config, we shouldn't include these in log files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9625) Unable to Describe broker configurations that have been set via IncrementalAlterConfigs
[ https://issues.apache.org/jira/browse/KAFKA-9625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-9625: Fix Version/s: 2.5.1 > Unable to Describe broker configurations that have been set via > IncrementalAlterConfigs > --- > > Key: KAFKA-9625 > URL: https://issues.apache.org/jira/browse/KAFKA-9625 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Colin McCabe >Assignee: Sanjana Kaundinya >Priority: Critical > Fix For: 2.6.0, 2.5.1 > > > There seem to be at least two bugs in the broker configuration APIs and/or > logic: > 1. Broker throttles are incorrectly marked as sensitive configurations. This > includes leader.replication.throttled.rate, > follower.replication.throttled.rate, > replica.alter.log.dirs.io.max.bytes.per.second. This means that their values > cannot be read back by DescribeConfigs after they are set. > 2. When we clear the broker throttles via incrementalAlterConfigs, > DescribeConfigs continues returning the old throttles indefinitely. In other > words, the clearing is not reflected in the Describe API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
bdbyrne commented on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624318293 > Hi @bdbyrne , thanks for the comment. The link seems not working. I guess you mean this part we should replace the empty string "" by null? > > > ``` > > private[admin] def entityNames(): List[String] = { > > val namesIterator = options.valuesOf(entityName).iterator > > options.specs.asScala > > .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) > > .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++ > > entityFlags > > .filter(entity => options.has(entity._1)) > > .map(entity => options.valueOf(entity._1)) ++ > > entityDefaultsFlags > > .filter(entity => options.has(entity._1)) > > .map(_ => "") > > } > > ``` That's correct, yes. You could do it in the provided code, however keep in mind that the ZK path currently uses it as well and would expect the empty string. The alternative is to do the substitution (`"" -> null`) when creating the `ClientQuotaEntity`, which is exclusive to the `AdminClient` path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient
bdbyrne commented on pull request #8610: URL: https://github.com/apache/kafka/pull/8610#issuecomment-624318905 > > Hi @bdbyrne , thanks for the comment. The link seems not working. I guess you mean this part we should replace the empty string "" by null? > > > ``` > > > private[admin] def entityNames(): List[String] = { > > > val namesIterator = options.valuesOf(entityName).iterator > > > options.specs.asScala > > > .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) > > > .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "").toList ++ > > > entityFlags > > > .filter(entity => options.has(entity._1)) > > > .map(entity => options.valueOf(entity._1)) ++ > > > entityDefaultsFlags > > > .filter(entity => options.has(entity._1)) > > > .map(_ => "") > > > } > > > ``` > > That's correct, yes. You could do it in the provided code, however keep in mind that the ZK path currently uses it as well and would expect the empty string. The alternative is to do the substitution (`"" -> null`) when creating the `ClientQuotaEntity`, which is exclusive to the `AdminClient` path. Oh wait, there's also other places where the default is used and expected to be the empty string (broker config), so it may be best to only make the change at the `ClientQuotaEntity`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications
ambroff commented on a change in pull request #4540: URL: https://github.com/apache/kafka/pull/4540#discussion_r420431031 ## File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala ## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.MockTime +import org.junit.Test +import org.junit.Assert._ + +class ReplicaManagerTest extends ZooKeeperTestHarness { Review comment: Good point I moved the test to the existing `ReplicaManagerTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications
ambroff commented on a change in pull request #4540: URL: https://github.com/apache/kafka/pull/4540#discussion_r420430926 ## File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala ## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.MockTime +import org.junit.Test +import org.junit.Assert._ + +class ReplicaManagerTest extends ZooKeeperTestHarness { + @Test + def testMaybePropagateIsrChanges(): Unit = { +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect)) +val mockTime = new MockTime(1) +val server = TestUtils.createServer(config, mockTime) + +// Topic name which is the maximum length of 249. +val largeTopicName = ("topic-" padTo(249, "x")).mkString + +// 5000 partitions. Large partition numbers chosen to make the serialized values as long as possible. +(1 to 15000) + .map(n => new TopicPartition(largeTopicName, n)) + .foreach(server.replicaManager.recordIsrChange) + +server.replicaManager.maybePropagateIsrChanges() + +val isrChangeNotificationQueuePath = "/isr_change_notification" +val node0 = "isr_change_00" +val node1 = "isr_change_01" + +assertEquals( + zkClient.getChildren(isrChangeNotificationQueuePath).toSet, + Set(node0, node1)) + +val (_, stat0) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + "/" + node0) +val (_, stat1) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + "/" + node1) + +assertEquals(840028, stat0.getDataLength) +assertEquals(560308, stat1.getDataLength) Review comment: Good point I moved the test to the existing `ReplicaManagerTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications
ambroff commented on a change in pull request #4540: URL: https://github.com/apache/kafka/pull/4540#discussion_r420430926 ## File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala ## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.MockTime +import org.junit.Test +import org.junit.Assert._ + +class ReplicaManagerTest extends ZooKeeperTestHarness { + @Test + def testMaybePropagateIsrChanges(): Unit = { +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect)) +val mockTime = new MockTime(1) +val server = TestUtils.createServer(config, mockTime) + +// Topic name which is the maximum length of 249. +val largeTopicName = ("topic-" padTo(249, "x")).mkString + +// 5000 partitions. Large partition numbers chosen to make the serialized values as long as possible. +(1 to 15000) + .map(n => new TopicPartition(largeTopicName, n)) + .foreach(server.replicaManager.recordIsrChange) + +server.replicaManager.maybePropagateIsrChanges() + +val isrChangeNotificationQueuePath = "/isr_change_notification" +val node0 = "isr_change_00" +val node1 = "isr_change_01" + +assertEquals( + zkClient.getChildren(isrChangeNotificationQueuePath).toSet, + Set(node0, node1)) + +val (_, stat0) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + "/" + node0) +val (_, stat1) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + "/" + node1) + +assertEquals(840028, stat0.getDataLength) +assertEquals(560308, stat1.getDataLength) Review comment: I just made it a threshold instead. Assert the serialized size is always < 900KiB. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications
ambroff commented on a change in pull request #4540: URL: https://github.com/apache/kafka/pull/4540#discussion_r420434068 ## File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala ## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.server + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.MockTime +import org.junit.Test +import org.junit.Assert._ + +class ReplicaManagerTest extends ZooKeeperTestHarness { + @Test + def testMaybePropagateIsrChanges(): Unit = { +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect)) +val mockTime = new MockTime(1) Review comment: maybePropagateIsrChanges() won't do anything unless ReplicaManager.IsrChangePropagationInterval milliseconds have elapsed since the last call to recordIsrChange. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9959) leastLoadedNode() does not provide a node fairly
[ https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan reassigned KAFKA-9959: Assignee: Cheng Tan > leastLoadedNode() does not provide a node fairly > > > Key: KAFKA-9959 > URL: https://issues.apache.org/jira/browse/KAFKA-9959 > Project: Kafka > Issue Type: Bug >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9959) leastLoadedNode() does not provide a node fairly
Cheng Tan created KAFKA-9959: Summary: leastLoadedNode() does not provide a node fairly Key: KAFKA-9959 URL: https://issues.apache.org/jira/browse/KAFKA-9959 Project: Kafka Issue Type: Bug Reporter: Cheng Tan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9929) Support reverse iterator on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100291#comment-17100291 ] Jorge Esteban Quilcate Otoya commented on KAFKA-9929: - Thanks [~guozhang]. Looking into o.a.k.s.state.internals, I've found Iterators based on RocksIterator (e.g. SingleColumnFamilyAccessor) and TreeMap (e.g. MemoryNavigableLRUCache). After a quick review both APIs support reverse query: {code:java} final RocksIterator iter = db.newIterator(); iter.seekToFirst(); iter.next(); final RocksIterator reverse = db.newIterator(); reverse.seekToLast(); reverse.prev(); {code} and TreeMap: {code:java} final TreeMap map = new TreeMap<>(); final NavigableSet nav = map.navigableKeySet(); final NavigableSet rev = map.descendingKeySet();{code} Haven't found any performance note on the APIs to worry about. > Support reverse iterator on WindowStore > --- > > Key: KAFKA-9929 > URL: https://issues.apache.org/jira/browse/KAFKA-9929 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > > Currently, WindowStore fetch operations return an iterator sorted from > earliest to latest result: > ``` > * For each key, the iterator guarantees ordering of windows, starting from > the oldest/earliest > * available window to the newest/latest window. > ``` > > We have a use-case where traces are stored in a WindowStore > and use Kafka Streams to create a materialized view of traces. A query > request comes with a time range (e.g. now-1h, now) and want to return the > most recent results, i.e. fetch from this period of time, iterate and pattern > match latest/most recent traces, and if enough results, then reply without > moving further on the iterator. > Same store is used to search for previous traces. In this case, it search a > key for the last day, if found traces, we would also like to iterate from the > most recent. > RocksDb seems to support iterating backward and forward: > [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound] > > For reference: This in some way extracts some bits from this previous issue: > https://issues.apache.org/jira/browse/KAFKA-4212: > > > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via > > segment dropping, but it stores multiple items per key, based on their > > timestamp. But this store can be repurposed as a cache by fetching the > > items in reverse chronological order and returning the first item found. > > Would like to know if there is any impediment on RocksDb or WindowStore to > support this. > Adding an argument to reverse in current fetch methods would be great: > ``` > WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD) > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #8619: MINOR: Improve TopologyTestDriver JavaDocs
bbejeck commented on pull request #8619: URL: https://github.com/apache/kafka/pull/8619#issuecomment-624338625 only javadoc is update, so merging this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9959) leastLoadedNode() does not provide a node fairly
[ https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9959: - Issue Type: Improvement (was: Bug) > leastLoadedNode() does not provide a node fairly > > > Key: KAFKA-9959 > URL: https://issues.apache.org/jira/browse/KAFKA-9959 > Project: Kafka > Issue Type: Improvement >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > > Currently, leastLoadedNode() provides a node with the following criteria: > # Provide the connected node with least number of inflight requests > # If no connected node exists, provide the connecting node with the largest > index in the cached list of nodes. > # If no connected or connecting node exists, provide the disconnected node > which respects the reconnect backoff with the largest index in the cached > list of nodes. > However, criteria 2 and 3 may cause issues. > > Criteria 2: Since the timeoutCallsToSend() does not change the connection > status, the node will remain a connecting status after the request time out. > If no connected node exists, leastLoadedNode() will provide this same node > until the socket timeout reached. It would be better to overlook the > connecting node if any request has timed out on it. > > Criteria3: If the time interval between two invokes of leastLoadedNode() is > greater than the reconnect.backoff.ms, the same disconnected node may be > provided. We also want to pick a node with the least number of failed times. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9959) leastLoadedNode() does not provide a node fairly
[ https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9959: - Description: Currently, leastLoadedNode() provides a node with the following criteria: # Provide the connected node with least number of inflight requests # If no connected node exists, provide the connecting node with the largest index in the cached list of nodes. # If no connected or connecting node exists, provide the disconnected node which respects the reconnect backoff with the largest index in the cached list of nodes. However, criteria 2 and 3 may cause issues. Criteria 2: Since the timeoutCallsToSend() does not change the connection status, the node will remain a connecting status after the request time out. If no connected node exists, leastLoadedNode() will provide this same node until the socket timeout reached. It would be better to overlook the connecting node if any request has timed out on it. Criteria3: If the time interval between two invokes of leastLoadedNode() is greater than the reconnect.backoff.ms, the same disconnected node may be provided. We also want to pick a node with the least number of failed times. > leastLoadedNode() does not provide a node fairly > > > Key: KAFKA-9959 > URL: https://issues.apache.org/jira/browse/KAFKA-9959 > Project: Kafka > Issue Type: Bug >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > > Currently, leastLoadedNode() provides a node with the following criteria: > # Provide the connected node with least number of inflight requests > # If no connected node exists, provide the connecting node with the largest > index in the cached list of nodes. > # If no connected or connecting node exists, provide the disconnected node > which respects the reconnect backoff with the largest index in the cached > list of nodes. > However, criteria 2 and 3 may cause issues. > > Criteria 2: Since the timeoutCallsToSend() does not change the connection > status, the node will remain a connecting status after the request time out. > If no connected node exists, leastLoadedNode() will provide this same node > until the socket timeout reached. It would be better to overlook the > connecting node if any request has timed out on it. > > Criteria3: If the time interval between two invokes of leastLoadedNode() is > greater than the reconnect.backoff.ms, the same disconnected node may be > provided. We also want to pick a node with the least number of failed times. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeffhuang26 opened a new pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.
jeffhuang26 opened a new pull request #8620: URL: https://github.com/apache/kafka/pull/8620 Added supporting customized HTTP Response Headers for Kafka Connect REST server. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs
mjsax commented on a change in pull request #8603: URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval, void forward(final K1 key, final V1 value, final String childName); /** - * Requests a commit + * Requests a commit. */ void commit(); /** * Returns the topic name of the current input record; could be null if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the topic name */ String topic(); /** * Returns the partition id of the current input record; could be -1 if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the partition id */ int partition(); /** * Returns the offset of the current input record; could be -1 if it is not - * available (for example, if this method is invoked from the punctuate call) + * available (for example, if this method is invoked from the punctuate call). * * @return the offset */ long offset(); /** - * Returns the headers of the current input record; could be null if it is not available + * Returns the headers of the current input record; could be null if it is not + * available (for example, if this method is invoked from the punctuate call). + * * @return the headers */ Headers headers(); /** * Returns the current timestamp. * - * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from + * If it is triggered while processing a record streamed from the source processor, + * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. * - * If it is triggered while processing a record generated not from the source processor (for example, + * If it is triggered while processing a record generated not from the source processor (for example, * if this method is invoked from the punctuate call), timestamp is defined as the current - * task's stream time, which is defined as the smallest among all its input stream partition timestamps. + * task's stream time, which is defined as the largest among all its input stream partition timestamps. Review comment: Sound like a bug :) But it seems to be a one line fix that I can piggy-back on this PR. We advance the "partition time" too early. If we advance it when the return the record for processing, all should be fixed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.
rhauch commented on a change in pull request #8620: URL: https://github.com/apache/kafka/pull/8620#discussion_r420464820 ## File path: checkstyle/checkstyle.xml ## @@ -132,7 +132,7 @@ - + Review comment: Why change the setting instead of modifying `suppressions.xml` to exclude certain classes from this rule? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -244,6 +244,14 @@ + "user requests to reset the set of active topics per connector."; protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; +/** + * @link "https://www.eclipse.org/jetty/documentation/current/header-filter.html"; + * @link "https://www.eclipse.org/jetty/javadoc/9.4.28.v20200408/org/eclipse/jetty/servlets/HeaderFilter.html"; + **/ +public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config"; +public static final String RESPONSE_HTTP_HEADERS_DOC = "Set values for Jetty HTTP response headers"; Review comment: I don't think we should expose `Jetty` here. Yes, we're following the Jetty grammar and format for these, but let's not unnecessarily expose the internals. ```suggestion public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers"; ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ## @@ -461,4 +469,18 @@ public static String urlJoin(String base, String path) { return base + path; } +/** + * Register header filter to ServletContextHandler. + * @param context The serverlet context handler + */ +protected void configureHttpResponsHeaderFilter(ServletContextHandler context) { +String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG); +log.debug("headerConfig : " + headerConfig); Review comment: Is this line really necessary? Isn't the `response.http.headers.config` property already logged at INFO level when the worker starts up, via the WorkerConfig (or rather DistributedConfig or StandaloneConfig) constructor? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -400,6 +410,52 @@ public WorkerConfig(ConfigDef definition, Map props) { logInternalConverterDeprecationWarnings(props); } +public static void validateHttpResponseHeaderConfig(String config) { Review comment: Why not implement these as a `ConfigDef.Validator` implementation, similar to the existing `AdminListenersValidator` below? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java ## @@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException { Assert.assertEquals(404, response.getStatusLine().getStatusCode()); } +@Test +public void TestValidCustomizedHttpResponseHeaders() throws IOException { +String headerConfig = +"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\""; +Map expectedHeaders = new HashMap<>(); +expectedHeaders.put("X-XSS-Protection", "1; mode=block"); +expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate"); +checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders); +} + +@Test +public void TestDefaultCustomizedHttpResponseHeaders() throws IOException { Review comment: Nit: ```suggestion public void testDefaultCustomizedHttpResponseHeaders() throws IOException { ``` ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java ## @@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException { Assert.assertEquals(404, response.getStatusLine().getStatusCode()); } +@Test +public void TestValidCustomizedHttpResponseHeaders() throws IOException { Review comment: Nit: ```suggestion public void testValidCustomizedHttpResponseHeaders() throws IOException { ``` ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java ## @@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException { Assert.assertEquals(404, response.getStatusLine().getStatusCode()); } +@Test +public void TestValidCustomizedHttpResponseHeaders() throws IOException { +String headerConfig = +"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\""; +Map expectedHeaders = new HashMap<>(); +expectedHeaders.put("X-XSS-Protection", "1; mode=block"); +expectedHeaders.put("Cache-Contro