[jira] [Commented] (KAFKA-10694) Implement zero copy for FetchSnapshot
[ https://issues.apache.org/jira/browse/KAFKA-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229029#comment-17229029 ] lqjacklee commented on KAFKA-10694: --- [~jagsancio] Can I take the task? > Implement zero copy for FetchSnapshot > - > > Key: KAFKA-10694 > URL: https://issues.apache.org/jira/browse/KAFKA-10694 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Priority: Major > > Change the _RawSnapshotWriter_ and _RawSnapshotReader_ interfaces to allow > sending and receiving _FetchSnapshotResponse_ with minimal memory copies. > This could be implemented by making the following changes > {code:java} > interface RawSnapshotWriter { > ... > public void append(MemoryRecords records) throws IOException; > } {code} > {code:java} > interface RawSnapshotReader { > ... > public BaseRecords slice(long position) throws IOException; > }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] iprithv commented on a change in pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields
iprithv commented on a change in pull request #9204: URL: https://github.com/apache/kafka/pull/9204#discussion_r520336051 ## File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala ## @@ -257,8 +257,11 @@ object DumpLogSegments { } lastOffset = record.offset -print(s"$RecordIndent offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " + - s"keysize: ${record.keySize} valuesize: ${record.valueSize}") +print(s"$RecordIndent offset: ${record.offset} baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${} ${batch.baseSequence}" + + s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch} position: ${validBytes} " + + s" ${batch.timestampType}: ${record.timestamp} isvalid: ${record.isValid}"+ + s" keysize: ${record.keySize} valuesize: ${record.valueSize} size: ${batch.sizeInBytes} magic: ${batch.magic} " + Review comment: @chia7712 Have seperated the record details from batch details. Also changed the entries to camel case. Please review this. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
chia7712 commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-724472427 Could you offer test to make sure ```None``` is included. Personally, the implementations of ```errorCounts``` are almost same. Maybe it should be implemented by auto-generated protocol so the consistency (code style and behavior) can be protected. @hachikuji WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields
chia7712 commented on a change in pull request #9204: URL: https://github.com/apache/kafka/pull/9204#discussion_r520297478 ## File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala ## @@ -257,8 +257,11 @@ object DumpLogSegments { } lastOffset = record.offset -print(s"$RecordIndent offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " + - s"keysize: ${record.keySize} valuesize: ${record.valueSize}") +print(s"$RecordIndent offset: ${record.offset} baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${} ${batch.baseSequence}" + + s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch} position: ${validBytes} " + + s" ${batch.timestampType}: ${record.timestamp} isvalid: ${record.isValid}"+ + s" keysize: ${record.keySize} valuesize: ${record.valueSize} size: ${batch.sizeInBytes} magic: ${batch.magic} " + Review comment: isvalid -> "isValid" keysize -> "keySize" valuesize -> "valueSize" compresscodec -> "compressionType" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r520293926 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map originals, Map throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = resolveConfigVariables(configProviderProps, (Map) originals); -this.values = definition.parse(this.originals); +// pass a copy to definition.parse. Otherwise, the definition.parse adds all keys of definitions to "used" group +// since definition.parse needs to call "RecordingMap#get" when checking all definitions. +this.values = definition.parse(new HashMap<>(this.originals)); Review comment: > I tried running console-producer with/without this PR. It doesn't seem to WARN any unused SSL configs in either test. Do you know why? @junrao this is the root cause. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7908) retention.ms and message.timestamp.difference.max.ms are tied
[ https://issues.apache.org/jira/browse/KAFKA-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228975#comment-17228975 ] nandini commented on KAFKA-7908: The bug relates to - https://issues.apache.org/jira/browse/KAFKA-4340 > retention.ms and message.timestamp.difference.max.ms are tied > - > > Key: KAFKA-7908 > URL: https://issues.apache.org/jira/browse/KAFKA-7908 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Ciprian Pascu >Priority: Minor > Fix For: 2.3.0, 2.4.0 > > > When configuring retention.ms for a topic, following warning will be printed: > _retention.ms for topic X is set to 180. It is smaller than > message.timestamp.difference.max.ms's value 9223372036854775807. This may > result in frequent log rolling. (kafka.log.Log)_ > > message.timestamp.difference.max.ms has not been configured explicitly, so it > has the default value of 9223372036854775807; I haven't seen anywhere > mentioned that this parameter needs to be configured also, if retention.ms is > configured; also, if we look at the default values for these parameters, they > are also so, that retention.ms < message.timestamp.difference.max.ms; so, > what is the purpose of this warning, in this case? > The warning is generated from this code > (core/src/main/scala/kafka/log/Log.scala): > _def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {_ > _*if ((updatedKeys.contains(LogConfig.RetentionMsProp)*_ > *_|| > updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))_* > _&& topicPartition.partition == 0 // generate warnings only for one > partition of each topic_ > _&& newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)_ > _warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} > is set to ${newConfig.retentionMs}. It is smaller than " +_ > _s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value > ${newConfig.messageTimestampDifferenceMaxMs}. " +_ > _s"This may result in frequent log rolling.")_ > _this.config = newConfig_ > _}_ > > Shouldn't the || operand in the bolded condition be replaced with &&? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding
abbccdda commented on pull request #9558: URL: https://github.com/apache/kafka/pull/9558#issuecomment-724447477 @mumrah for a review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic
ableegoldman opened a new pull request #9582: URL: https://github.com/apache/kafka/pull/9582 Needed to fix this on the side in order to more easily set up some experiments, so here's the PR. Allows a user to create multiple KStreams from the same topic, collection of topics, or pattern. The one exception is when the KStreams are subscribed to an overlapping-but-unequal collection of topics, which I left as future work (with a TODO in the comments describing a possible solution). If the offset reset policy doesn't match we just throw a TopologyException. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9751) Auto topic creation should go to controller
[ https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9751: --- Summary: Auto topic creation should go to controller (was: Internal topic creation should go to controller) > Auto topic creation should go to controller > --- > > Key: KAFKA-9751 > URL: https://issues.apache.org/jira/browse/KAFKA-9751 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > For use cases to create internal topics through FindCoordinator or Metadata > request, receiving broker should route the topic creation request to the > controller instead of handling by itself. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10346) Propagate topic creation policy violation to the clients
[ https://issues.apache.org/jira/browse/KAFKA-10346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10346: Description: At the moment, topic creation policy is not enforced on auto topic creation path for Metadata/FindCoordinator. After the topic creation fully goes to adminManager instead of zkManager, we may actually have the policy violation for those auto create topics, and inform client side the situation by bumping both RPCs version to include POLICY_VIOLATION error. (was: In the bridge release broker, the CreatePartition should be redirected to the active controller instead of relying on admin client discovery.) Summary: Propagate topic creation policy violation to the clients (was: Redirect CreatePartition to the controller) > Propagate topic creation policy violation to the clients > > > Key: KAFKA-10346 > URL: https://issues.apache.org/jira/browse/KAFKA-10346 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > > At the moment, topic creation policy is not enforced on auto topic creation > path for Metadata/FindCoordinator. After the topic creation fully goes to > adminManager instead of zkManager, we may actually have the policy violation > for those auto create topics, and inform client side the situation by bumping > both RPCs version to include POLICY_VIOLATION error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
vamossagar12 commented on pull request #9508: URL: https://github.com/apache/kafka/pull/9508#issuecomment-724420818 @cadonna , The failing tests here https://github.com/apache/kafka/pull/9508/checks?check_run_id=1376108664 don't seem to be related to this PR. Would it be possible to retest? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r520237060 ## File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ## @@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, } // Visibility for testing -protected static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { -Map parsedConfigs; +@SuppressWarnings("unchecked") +static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { +Map parsedConfigs; if (listenerName == null) -parsedConfigs = config.values(); +parsedConfigs = (Map) config.values(); Review comment: ```java if (listenerName == null) parsedConfigs = (Map) config.values(); else parsedConfigs = config.valuesWithPrefixOverride(listenerName.configPrefix()); ``` the method ```config.valuesWithPrefixOverride``` also returns ```RecordingMap so it is ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG
ableegoldman commented on pull request #9489: URL: https://github.com/apache/kafka/pull/9489#issuecomment-724391259 That's fair. Ok I'll go forward with demoting this and just add a new INFO-level log that's less frequent, maybe at the max poll interval This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r520225611 ## File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ## @@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, } // Visibility for testing -protected static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { -Map parsedConfigs; +@SuppressWarnings("unchecked") +static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { +Map parsedConfigs; if (listenerName == null) -parsedConfigs = config.values(); +parsedConfigs = (Map) config.values(); Review comment: Does this cover the case when listenerName is not null? I guess that can only happen on the server side and since we don't log unused configs on the server, so maybe this is ok for now? ## File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ## @@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, } // Visibility for testing -protected static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { -Map parsedConfigs; +@SuppressWarnings("unchecked") +static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { +Map parsedConfigs; if (listenerName == null) -parsedConfigs = config.values(); +parsedConfigs = (Map) config.values(); else parsedConfigs = config.valuesWithPrefixOverride(listenerName.configPrefix()); -// include any custom configs from original configs -Map configs = new HashMap<>(parsedConfigs); config.originals().entrySet().stream() .filter(e -> !parsedConfigs.containsKey(e.getKey())) // exclude already parsed configs // exclude already parsed listener prefix configs .filter(e -> !(listenerName != null && e.getKey().startsWith(listenerName.configPrefix()) && parsedConfigs.containsKey(e.getKey().substring(listenerName.configPrefix().length() // exclude keys like `{mechanism}.some.prop` if "listener.name." prefix is present and key `some.prop` exists in parsed configs. .filter(e -> !(listenerName != null && parsedConfigs.containsKey(e.getKey().substring(e.getKey().indexOf('.') + 1 -.forEach(e -> configs.put(e.getKey(), e.getValue())); -return configs; +.forEach(e -> parsedConfigs.put(e.getKey(), e.getValue())); +// The callers may add new elements to return map so we should not wrap it to a immutable map. Otherwise, +// the callers have to create a new map to carry more elements and then following Get ops are not recorded. Review comment: (1) "so we should not wrap it to a immutable map": It's kind of weird to have a comment on what we don't do. (2) to return map => to returned map ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -582,6 +582,13 @@ public int hashCode() { return originals.hashCode(); } +/** + * @return true if the input map is a recording map. otherwise, false + */ +public static boolean isRecording(Map map) { Review comment: common/config/* is part of the public interface. This method seems internal. So, could we not expose it publicly to the end user? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228890#comment-17228890 ] Richard Yu commented on KAFKA-10575: [~guozhang] I'm interested in picking this one up. May I try my hand at it? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] scanterog commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments
scanterog commented on pull request #9545: URL: https://github.com/apache/kafka/pull/9545#issuecomment-724379519 > Groups created on the target cluster by KAFKA-9076 are "simple groups" as there's no member information. > Not entirely sure why these were explicitly filtered. I can't immediately come up with a reason. > > @scanterog Can you add a test as well? Sure. Do we want a test for `findConsumerGroups` or what are you looking to test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10703) Document that default configs are not supported for TOPIC entities
Colin McCabe created KAFKA-10703: Summary: Document that default configs are not supported for TOPIC entities Key: KAFKA-10703 URL: https://issues.apache.org/jira/browse/KAFKA-10703 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe We should better document that default configs are not supported for TOPIC entities. Currently an attempt to set them gets confusing error messages. Using admin client's incrementalAlterConfigs with {type=TOPIC, name=""} gives a cryptic error stack trace: Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=TOPIC, name=''): Path must not end with / character at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:91) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=TOPIC, name=''): Path must not end with / character Similarly, kafka-configs.sh is not very clear about this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on pull request #9581: KAFKA-10500: Add thread
wcarlson5 commented on pull request #9581: URL: https://github.com/apache/kafka/pull/9581#issuecomment-724347812 https://github.com/apache/kafka/pull/9572 need to get this merged first This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #9581: KAFKA-10500: Add thread
wcarlson5 opened a new pull request #9581: URL: https://github.com/apache/kafka/pull/9581 Can add stream threads now ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228844#comment-17228844 ] Guozhang Wang commented on KAFKA-10688: --- Normally the repartition topic should never have an invalid offset after setting it initially, since the repartition topic's retention should be infinity and we only truncate it via the delete-records; this ticket is for guarding against abnormal cases e.g. if users accidentally truncated the repartition topics. > Handle accidental truncation of repartition topics as exceptional failure > - > > Key: KAFKA-10688 > URL: https://issues.apache.org/jira/browse/KAFKA-10688 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we always handle InvalidOffsetException from the main consumer by the > resetting policy assuming they are for source topics. But repartition topics > are also source topics and should never be truncated and hence cause > InvalidOffsetException. > We should differentiate these repartition topics from external source topics > and treat the InvalidOffsetException from repartition topics as fatal and > close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
mimaison commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-724323408 @ning2008wisc I've not forgotten this PR, I just haven't had time to do reviews yet :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments
mimaison commented on pull request #9545: URL: https://github.com/apache/kafka/pull/9545#issuecomment-724322506 Groups created on the target cluster by KAFKA-9076 are "simple groups" as there's no member information. Not entirely sure why these were explicitly filtered. I can't immediately come up with a reason. @scanterog Can you add a test as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10661) Add resigned state to raft state machine to preserve leader/epoch information
[ https://issues.apache.org/jira/browse/KAFKA-10661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10661. - Resolution: Fixed > Add resigned state to raft state machine to preserve leader/epoch information > - > > Key: KAFKA-10661 > URL: https://issues.apache.org/jira/browse/KAFKA-10661 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > While working on KAFKA-10655, I realized we have a bug in the existing raft > state initialization logic when the process shuts down as leader. After > reinitializing, we retain the current epoch, but we discard the current > leader status. This means that it is possible for the node to vote for > another node in the same epoch that it was the leader of. > To fix this problem I think we should add a separate "resigned" state. When > re-initializing after being shutdown as leader, we can enter the "resigned" > state. This prevents us from voting for another candidate while still > ensuring that a new election needs to be held. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
hachikuji merged pull request #9531: URL: https://github.com/apache/kafka/pull/9531 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9580: KAFKA-10350: add forwarding manager implementation with metrics
abbccdda opened a new pull request #9580: URL: https://github.com/apache/kafka/pull/9580 Add metric for forwarding request tracking. Note that the implementation is slightly diverged from the KIP, where we decide to get rid of the client.id tag since most admin clients would only have one inflight forwarding request. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r520104884 ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: https://github.com/apache/kafka/pull/9273#discussion_r486597512 I originally had it at int32, but @vvcephei suggested int16, now it is int8. would you be good with int16 or do you think int32 is the way? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r520104884 ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: https://github.com/apache/kafka/pull/9273#discussion_r486597512 I originally had it at int32, but john suggested int16, now it is int8. would you be good with int16 or do you think int32 is the way? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
C0urante commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE("move"), +COPY("copy"); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case "move": +return MOVE; +case "copy": +return COPY; +default: +throw new IllegalArgumentException(); +} +} + +public String toString() { +return name; +} +} + +private List fields; + +private List headers; + +private Operation operation; + +@Override +public R apply(R record) { +Object operatingValue = operatingValue(record); +Schema operatingSchema = operatingSchema(record); + +if (operatingSchema == null) { +return applySchemaless(record, operatingValue); +} else { +return applyWithSchema(record, operatingValue, operatingSchema); +} +} + +private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) { +Headers updatedHeaders = record.headers().duplicate(); Review comment: Why duplicate headers here? According to the [Header class's Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html), the
[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler
ableegoldman commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r520077048 ## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ## @@ -57,6 +57,11 @@ "name": "uniqueField", "versions": "8+", "type": "int8" +}, +{ + "name": "shutdownRequested", + "versions": "9+", + "type": "int8" Review comment: I'm not really worried that we'd run out of space, I just think it sends a signal that the Assignment and Subscription error codes are semantically distinct and don't refer to the same underlying concept. So it seems better to go with the simpler approach than over-optimize to save an occasional three bytes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
C0urante commented on pull request #9549: URL: https://github.com/apache/kafka/pull/9549#issuecomment-724213528 Thanks for reaching out @tombentley! Happy to take a look. For reference, maybe you could add a link to https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-Transformations to the description? Will begin reviewing shortly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10702) Slow replication of empty transactions
Jason Gustafson created KAFKA-10702: --- Summary: Slow replication of empty transactions Key: KAFKA-10702 URL: https://issues.apache.org/jira/browse/KAFKA-10702 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We hit a case in which we had to re-replicate a compacted topic from the beginning of the log. Some portions of the log consisted mostly of transaction markers, which were extremely slow to replicate. The problem is that `ProducerStateManager` adds all of these empty transactions to its internal collection of `ongoingTxns` before immediately removing them. There could be tens of thousands of empty transactions in the worst case from a single `Fetch` response, so this can create a huge amount of pressure on the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
guozhangwang commented on pull request #9531: URL: https://github.com/apache/kafka/pull/9531#issuecomment-724186172 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
guozhangwang commented on a change in pull request #9531: URL: https://github.com/apache/kafka/pull/9531#discussion_r520020029 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -21,18 +21,21 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; /** - * This class is responsible for managing the current state of this node and ensuring only - * valid state transitions. + * This class is responsible for managing the current state of this node and ensuring + * only valid state transitions. Below we define the possible state transitions and + * how they are triggered: * - * Unattached => + * Unattached|Resigned => Review comment: Okay now I remembered what we discussed before. What I was wondering is, say with quorum size 6, we would need 4 votes to elect leader; if the current leader shutdown and before it is restarted, the quorum size is 5 so logically we only need 3 votes --- but as long as we require that during this transition we still require 4 votes even with 5 alive quorum members we are fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9751) Internal topic creation should go to controller
[ https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9751: -- Assignee: Boyang Chen > Internal topic creation should go to controller > --- > > Key: KAFKA-9751 > URL: https://issues.apache.org/jira/browse/KAFKA-9751 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > For use cases to create internal topics through FindCoordinator or Metadata > request, receiving broker should route the topic creation request to the > controller instead of handling by itself. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda opened a new pull request #9579: KAFKA-9751: Forward FindCoordinator request when topic creation is needed
abbccdda opened a new pull request #9579: URL: https://github.com/apache/kafka/pull/9579 This PR forward the entire FindCoordinator request to the active controller when the internal topic being queried is not ready to be served yet. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10699) Add system test coverage for group coordinator emigration
[ https://issues.apache.org/jira/browse/KAFKA-10699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-10699: --- Assignee: feyman > Add system test coverage for group coordinator emigration > - > > Key: KAFKA-10699 > URL: https://issues.apache.org/jira/browse/KAFKA-10699 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we > believe that it is important to add system test coverage for the group > coordinator migration to verify consumer behaviors are correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r519983517 ## File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java ## @@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static EnvelopeRequest parse(ByteBuffer buffer, short version) { return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, buffer), version); } + +public EnvelopeRequestData data() { +return data; +} + +@Override +public Send toSend(String destination, RequestHeader header) { Review comment: #7409 might be a good opportunity to complete this. What do you think @ijuma ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy edited a comment on pull request #9556: MINOR: Update jetty to 9.4.33
omkreddy edited a comment on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-724131363 Merging to trunk and older branches (2.7, 2.6, 2.5, 2.4) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r519971686 ## File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java ## @@ -1579,58 +1566,58 @@ private void generateVariableLengthFieldSize(FieldSpec field, } if (tagged) { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); +buffer.printf("int _arraySize = _size.totalSize() - _sizeBeforeArray;%n"); buffer.printf("_cache.setArraySizeInBytes(%s, _arraySize);%n", field.camelCaseName()); -buffer.printf("_size += _arraySize + ByteUtils.sizeOfUnsignedVarint(_arraySize);%n"); -} else { -buffer.printf("_size += _arraySize;%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize));%n"); } } else if (field.type().isBytes()) { +buffer.printf("int _sizeBeforeBytes = _size.totalSize();%n"); Review comment: Good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
vamossagar12 commented on pull request #9508: URL: https://github.com/apache/kafka/pull/9508#issuecomment-724142512 > @vamossagar12, Thank you for the PR! > > Here my feedback! @cadonna thanks i made the requeiste changes barring 1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
vamossagar12 commented on a change in pull request #9508: URL: https://github.com/apache/kafka/pull/9508#discussion_r519969912 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksIterator; + +import java.nio.ByteBuffer; +import java.util.Set; + +class RocksDBPrefixIterator extends RocksDbIterator { Review comment: This one, do I need explicit unit tests? I noticed that something like RocksDbRangeIterator also doesn't have unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
hachikuji commented on a change in pull request #9531: URL: https://github.com/apache/kafka/pull/9531#discussion_r519969208 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws IOException { } private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException { -if (state.hasFetchTimeoutExpired(currentTimeMs)) { +GracefulShutdown shutdown = this.shutdown.get(); +if (shutdown != null) { +// If we are a follower, then we can shutdown immediately. We want to +// skip the transition to candidate in any case. +return 0; Review comment: If we are a follower, then there is no election in progress to help with. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
hachikuji commented on a change in pull request #9531: URL: https://github.com/apache/kafka/pull/9531#discussion_r519968145 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -21,18 +21,21 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; /** - * This class is responsible for managing the current state of this node and ensuring only - * valid state transitions. + * This class is responsible for managing the current state of this node and ensuring + * only valid state transitions. Below we define the possible state transitions and + * how they are triggered: * - * Unattached => + * Unattached|Resigned => Review comment: In the future, when we have reassignment, we will still have to protect every quorum change with a majority of the current nodes. The proposal we had previously only allowed single-node changes, which meant that any majority was a majority before and after the state was applied. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33
niteshmor commented on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-724139651 Thanks @omkreddy For older releases, feel free to use the commits noted in https://github.com/apache/kafka/pull/9556#issuecomment-721898016 as a reference. Git cherry-picking didn't work for me, and older versions required jersey upgrade anyways. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
hachikuji commented on a change in pull request #9531: URL: https://github.com/apache/kafka/pull/9531#discussion_r519965008 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -21,18 +21,21 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; /** - * This class is responsible for managing the current state of this node and ensuring only - * valid state transitions. + * This class is responsible for managing the current state of this node and ensuring + * only valid state transitions. Below we define the possible state transitions and + * how they are triggered: * - * Unattached => + * Unattached|Resigned => Review comment: Hmm.. The quorum size does not change because a leader resigns. If there are 2N nodes in the cluster, then we always need N + 1 votes, so I don't think this case is possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #9556: MINOR: Update jetty to 9.4.33
omkreddy closed pull request #9556: URL: https://github.com/apache/kafka/pull/9556 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
hachikuji commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r519955199 ## File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java ## @@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static EnvelopeRequest parse(ByteBuffer buffer, short version) { return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, buffer), version); } + +public EnvelopeRequestData data() { +return data; +} + +@Override +public Send toSend(String destination, RequestHeader header) { Review comment: Yeah, I think so. And looks like we're almost there. After your patch for `Produce`, the only remaining unconverted API that I see is `OffsetsForLeaderEpoch`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #9556: MINOR: Update jetty to 9.4.33
omkreddy commented on pull request #9556: URL: https://github.com/apache/kafka/pull/9556#issuecomment-724131363 Merging to trunk and older branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
splett2 commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1894,10 +1894,10 @@ class ReplicaManagerTest { // each replica manager is for a broker val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0, - new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""), + new AtomicBoolean(false), quotaManager, brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1, - new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""), + new AtomicBoolean(false), quotaManager, Review comment: the two replica managers are intended to be separate. I used the same quota manager for both to avoid polluting the method signature/diff. The tests using this utility function don't rely on any quota manager behavior, so I felt this was okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
splett2 commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1894,10 +1894,10 @@ class ReplicaManagerTest { // each replica manager is for a broker val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0, - new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""), + new AtomicBoolean(false), quotaManager, brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1, - new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""), + new AtomicBoolean(false), quotaManager, Review comment: the two replica managers are intended to be separate. I used the same quota manager for both to avoid polluting the method signature/diff. The tests using this utility function don't rely on the quota managers for testing, so I felt this was okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected
stanislavkozlovski commented on pull request #7498: URL: https://github.com/apache/kafka/pull/7498#issuecomment-724117161 > guess this is a nitpick, but I'd rather not construct an error message when I might or might not need it (and won't in the common, error-free case.) @cmccabe we construct it only when the error message is not null and not empty cc @dajac could you help review this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
dajac commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519922038 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1506,7 +1508,7 @@ class ReplicaManagerTest { purgatoryName = "ElectLeader", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 -val quota = QuotaFactory.instantiate(config, metrics, time, "") +val quota = quotaManager Review comment: Could we rename `quotaManager` used in `createReplicaFetcherManager` to `replicationQuotaManager` and use `quotaManager` as the global one everywhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
splett2 commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1894,10 +1894,10 @@ class ReplicaManagerTest { // each replica manager is for a broker val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0, - new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""), + new AtomicBoolean(false), quotaManager, brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1, - new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""), + new AtomicBoolean(false), quotaManager, Review comment: in this case, the two replica managers are meant to be separate. I used the same quota manager for both to avoid polluting the response. The test doesn't rely on the quota managers for functionality, so I felt this was okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
splett2 commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1894,10 +1894,10 @@ class ReplicaManagerTest { // each replica manager is for a broker val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr0, - new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""), + new AtomicBoolean(false), quotaManager, brokerTopicStats1, metadataCache0, new LogDirFailureChannel(config0.logDirs.size), alterIsrManager) val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr1, - new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""), + new AtomicBoolean(false), quotaManager, Review comment: the two replica managers are intended to be separate. I used the same quota manager for both to avoid polluting the method signature/diff. The test doesn't rely on the quota managers for functionality, so I felt this was okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-724077206 @chia7712 is there anything more you needed on this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
splett2 commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519873954 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1506,7 +1508,7 @@ class ReplicaManagerTest { purgatoryName = "ElectLeader", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 -val quota = QuotaFactory.instantiate(config, metrics, time, "") +val quota = quotaManager Review comment: this is needed to avoid method variable name collision with the `ReplicationQuotaManager` in the overriden `createReplicaFetcherManager` a little bit further down in the test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10701) First line of detailed stats from consumer-perf-test.sh incorrect
[ https://issues.apache.org/jira/browse/KAFKA-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-10701: - Description: When running the console perf test with {{--show-detailed-stats}}, the first line out of output has incorrect results {code} $ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test --messages 1000 --reporting-interval 1000 --show-detailed-stats time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 1604681820723, -1604681819723, 0., 0. 2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 676.7578, 709632. 2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 702.1484, 736256. 2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 837.8448, 878544. 2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 911.1605, 955421. 2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 800.8547, 839757. 2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 542.9735, 569349. 2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 535.0990, 561092. 2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 542.1467, 568482. {code} This seems to be due to incorrect initialization of the {{joinStart}} variable in the consumer perf test code. was: When running the console perf test with {{--show-detailed-stats}}, the first line out of output has incorrect results {code} $ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test --messages 1000 --reporting-interval 1000 --show-detailed-stats time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 1604681820723, -1604681819723, 0., 0. 2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 676.7578, 709632. 2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 702.1484, 736256. 2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 837.8448, 878544. 2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 911.1605, 955421. 2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 800.8547, 839757. 2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 542.9735, 569349. 2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 535.0990, 561092. 2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 542.1467, 568482. {code} This seems to be due to incorrect initialization of the {joinStart} variable. > First line of detailed stats from consumer-perf-test.sh incorrect > - > > Key: KAFKA-10701 > URL: https://issues.apache.org/jira/browse/KAFKA-10701 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: David Arthur >Priority: Minor > Labels: newbie > > When running the console perf test with {{--show-detailed-stats}}, the first > line out of output has incorrect results > {code} > $ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic > test --messages 1000 --reporting-interval 1000 --show-detailed-stats > time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, > rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec > 2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., > 1604681820723, -1604681819723, 0., 0. > 2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, > 676.7578, 709632. > 2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, > 1000, 702.1484, 736256. > 2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, > 1000, 837.8448, 878544. > 2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, > 1000, 911.1605, 955421. > 2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, > 1000, 800.8547, 839757. > 2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, > 1000, 542.9735, 569349. > 2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, > 1000, 535.0990, 561092. > 2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, > 1000, 542.1467, 568482. > {code} >
[jira] [Created] (KAFKA-10701) First line of detailed stats from consumer-perf-test.sh incorrect
David Arthur created KAFKA-10701: Summary: First line of detailed stats from consumer-perf-test.sh incorrect Key: KAFKA-10701 URL: https://issues.apache.org/jira/browse/KAFKA-10701 Project: Kafka Issue Type: Bug Components: tools Reporter: David Arthur When running the console perf test with {{--show-detailed-stats}}, the first line out of output has incorrect results {code} $ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test --messages 1000 --reporting-interval 1000 --show-detailed-stats time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 1604681820723, -1604681819723, 0., 0. 2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 676.7578, 709632. 2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 702.1484, 736256. 2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 837.8448, 878544. 2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 911.1605, 955421. 2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 800.8547, 839757. 2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 542.9735, 569349. 2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 535.0990, 561092. 2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 542.1467, 568482. {code} This seems to be due to incorrect initialization of the {joinStart} variable. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram opened a new pull request #9578: MINOR: Log resource pattern of ACL updates at INFO level
rajinisivaram opened a new pull request #9578: URL: https://github.com/apache/kafka/pull/9578 At the moment, we have one log entry for ACL updates that says: ``` Processing notification(s) to /kafka-acl-changes ``` For other updates like broker configuration updates, we have an additional entry at INFO level that shows what was updated when the change notification was processed. Since every resource pattern may have 100s or 1000s of access control entries associated with it, we don't want to log the entire contents on ACL update at INFO level. But it would be useful to log the resource pattern. This shows that we processed the notification in AclAuthorizer and gives the resource and version that was refreshed from ZK. This PR adds an additional INFO-level log entry for ACL updates in AclAuthorizer and retains the existing DEBUG level entry with full details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter
showuon commented on pull request #9104: URL: https://github.com/apache/kafka/pull/9104#issuecomment-723981295 @kkonstantine , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove all the unnecessary parameters from the tests which are using TopologyTestDriver
showuon commented on pull request #9507: URL: https://github.com/apache/kafka/pull/9507#issuecomment-723974389 @chia7712 @vvcephei , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9576: KAFKA-10685: strictly parsing the date/time format
showuon commented on pull request #9576: URL: https://github.com/apache/kafka/pull/9576#issuecomment-723973014 @mikebin @manijndl7 @mjsax , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
[ https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228542#comment-17228542 ] Luke Chen commented on KAFKA-10685: --- Thanks for the suggestion. It turns out that the *SimpleDateFormat.setLenient(false)* can also strictly parse the timestamp and throw parseException if milliseconds is more than 3 digits. Thanks. > --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong > - > > Key: KAFKA-10685 > URL: https://issues.apache.org/jira/browse/KAFKA-10685 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0 >Reporter: Russell Sayers >Assignee: Luke Chen >Priority: Minor > > If you pass more than 3 decimal places for the fractional seconds of the > datetime, the microseconds get interpreted as milliseconds. > {{kafka-consumer-groups --bootstrap-server kafka:9092 }} > {{--reset-offsets }} > {{--group webserver-avro }} > {{--topic driver-positions-avro }} > {{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }} > {{ {{--dry-run > Relevant code > [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304]. > The datetime is being turned into Nov 5, 2020 1:24:05.400 because > SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48. > Experimenting with getDateTime: > * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000 > * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the > formatting string allows for ZZZ timezones > * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this > ends with 123 milliseconds. > The pattern string is "-MM-dd'T'HH:mm:ss.SSS". So SimpleDateFormat > interprets "000123" as 123 milliseconds. See the stackoverflow answer > [here|https://stackoverflow.com/a/21235602/109102]. > The fix? Remove any digits after more than 3 characters after the decimal > point, or raise an exception. The code would still need to allow the RFC822 > timezone, i.e Sign TwoDigitHours Minutes. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
jeqo commented on a change in pull request #9078: URL: https://github.com/apache/kafka/pull/9078#discussion_r519741816 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ## @@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() { for (Map.Entry entry : this.metrics.entrySet()) { String attribute = entry.getKey(); KafkaMetric metric = entry.getValue(); +String metricType = double.class.getName(); + +try { +metricType = metric.metricValue().getClass().getName(); +} catch (NullPointerException e) { Review comment: @rgroothuijsen this could be a bug on the DistributedHerder side. Seems that `DistributedHerder#herderMetrics` is initialized [too early](https://github.com/apache/kafka/blob/8e211eb72f9a45897cc37fed394a38096aa47feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L216). Could you give a try moving it later in the constructor, [maybe after config](https://github.com/apache/kafka/blob/8e211eb72f9a45897cc37fed394a38096aa47feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L249), to check if the same exception happens again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on pull request #9382: URL: https://github.com/apache/kafka/pull/9382#issuecomment-723946383 Ran system tests on the latest version, there were 7 failures which look like flaky tests that also fail on trunk (one TransactionTest and 6 variants of ConnectDistributedTest). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
dengziming commented on pull request #9577: URL: https://github.com/apache/kafka/pull/9577#issuecomment-723936352 @mumrah @hachikuji @bbejeck Hi, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7908) retention.ms and message.timestamp.difference.max.ms are tied
[ https://issues.apache.org/jira/browse/KAFKA-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228514#comment-17228514 ] nandini commented on KAFKA-7908: This applies to older versions too. Just found this in 0.11.0. The *Affects Version/s:* needs to be updated. > retention.ms and message.timestamp.difference.max.ms are tied > - > > Key: KAFKA-7908 > URL: https://issues.apache.org/jira/browse/KAFKA-7908 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Ciprian Pascu >Priority: Minor > Fix For: 2.3.0, 2.4.0 > > > When configuring retention.ms for a topic, following warning will be printed: > _retention.ms for topic X is set to 180. It is smaller than > message.timestamp.difference.max.ms's value 9223372036854775807. This may > result in frequent log rolling. (kafka.log.Log)_ > > message.timestamp.difference.max.ms has not been configured explicitly, so it > has the default value of 9223372036854775807; I haven't seen anywhere > mentioned that this parameter needs to be configured also, if retention.ms is > configured; also, if we look at the default values for these parameters, they > are also so, that retention.ms < message.timestamp.difference.max.ms; so, > what is the purpose of this warning, in this case? > The warning is generated from this code > (core/src/main/scala/kafka/log/Log.scala): > _def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {_ > _*if ((updatedKeys.contains(LogConfig.RetentionMsProp)*_ > *_|| > updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))_* > _&& topicPartition.partition == 0 // generate warnings only for one > partition of each topic_ > _&& newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)_ > _warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} > is set to ${newConfig.retentionMs}. It is smaller than " +_ > _s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value > ${newConfig.messageTimestampDifferenceMaxMs}. " +_ > _s"This may result in frequent log rolling.")_ > _this.config = newConfig_ > _}_ > > Shouldn't the || operand in the bolded condition be replaced with &&? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming opened a new pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
dengziming opened a new pull request #9577: URL: https://github.com/apache/kafka/pull/9577 This patch implements [KIP-589](https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller), which introduces an asynchronous API for brokers to notifying the controller of log dir failure. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* 1. Unit test for LogDirEventManagerImpl 2. Integration test for new behavior ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10700) Support mutual TLS authentication for SASL_SSL listeners
Rajini Sivaram created KAFKA-10700: -- Summary: Support mutual TLS authentication for SASL_SSL listeners Key: KAFKA-10700 URL: https://issues.apache.org/jira/browse/KAFKA-10700 Project: Kafka Issue Type: New Feature Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.8.0 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-684+-+Support+mutual+TLS+authentication+on+SASL_SSL+listeners for details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #9576: KAFKA-10685: strictly parsing the date/time format
showuon opened a new pull request #9576: URL: https://github.com/apache/kafka/pull/9576 Strictly parsing the date/time format by setLenient(false). So it won't allow un-matched date/time format input to avoid the wrong parsing for microseconds/nanoseconds. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ghmulti edited a comment on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG
ghmulti edited a comment on pull request #9489: URL: https://github.com/apache/kafka/pull/9489#issuecomment-723911887 There is a lot of useful information on an INFO level, but as you mentioned - having that line from several threads with 100ms frequency makes it way too noisy (especially when container is running on a cluster with log forwarder, which leads to waste of resources). Even though it is not complicated to change the log level configuration for that particular class, it is kind of inconvenient and prevents from using out-of-the-box default configs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ghmulti commented on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG
ghmulti commented on pull request #9489: URL: https://github.com/apache/kafka/pull/9489#issuecomment-723911887 There is a lot of useful information on an INFO level, but as you mentioned - having that line from several threads with 100ms frequency makes it way too noisy (especially when container is running on a cluster with log forwarder, which leads to waste of resources). Even though it is not complicated to change the log level configuration for that particular class, it is kind of inconvenient and prevents from using out-of-the box default configs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228465#comment-17228465 ] Bruno Cadonna commented on KAFKA-10688: --- Maybe I misunderstood your previous comment. In your proposal in 1) and 2) aren't you proposing to reset repartition topics by using the global policy? When would a repartition topic not have a valid committed offset after an offset was committed for the first time (i.e. first commit after a fresh start of the Streams application)? Is not the fact that an repartitition topic does not have a valid committed offset enough to throw a fatal error? Why should we reset the repartition topics in point 1) and 2) in your proposal? > Handle accidental truncation of repartition topics as exceptional failure > - > > Key: KAFKA-10688 > URL: https://issues.apache.org/jira/browse/KAFKA-10688 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we always handle InvalidOffsetException from the main consumer by the > resetting policy assuming they are for source topics. But repartition topics > are also source topics and should never be truncated and hence cause > InvalidOffsetException. > We should differentiate these repartition topics from external source topics > and treat the InvalidOffsetException from repartition topics as fatal and > close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
dajac commented on pull request #9386: URL: https://github.com/apache/kafka/pull/9386#issuecomment-723887856 @splett2 Code does not compile: ``` [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9386/core/src/main/scala/kafka/network/SocketServer.scala:1456: value getOrDefault is not a member of scala.collection.mutable.Map[java.net.InetAddress,Int] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
chia7712 commented on pull request #9401: URL: https://github.com/apache/kafka/pull/9401#issuecomment-723875285 > Can we summarize the regression here for a real world workload? @ijuma I have attached benchmark result to description. I will loop more benchmark later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests
dajac commented on a change in pull request #9573: URL: https://github.com/apache/kafka/pull/9573#discussion_r519620665 ## File path: core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala ## @@ -62,14 +64,16 @@ class IsrExpirationTest { EasyMock.replay(logManager) alterIsrManager = TestUtils.createAlterIsrManager() +quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false), - QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats, new MetadataCache(configs.head.brokerId), + quotaManager, new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager) } @After def tearDown(): Unit = { replicaManager.shutdown(false) +quotaManager.shutdown() Review comment: nit: We should check that `quotaManager` is not null. We can also do it for `replicaManager` above. ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1506,7 +1508,7 @@ class ReplicaManagerTest { purgatoryName = "ElectLeader", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 -val quota = QuotaFactory.instantiate(config, metrics, time, "") +val quota = quotaManager Review comment: nit: Could we get rid of `quota`? ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -79,22 +81,24 @@ class ReplicaManagerTest { EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes() EasyMock.replay(kafkaZkClient) +val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) +config = KafkaConfig.fromProps(props) alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager]) +quotaManager = QuotaFactory.instantiate(config, metrics, time, "") } @After def tearDown(): Unit = { TestUtils.clearYammerMetrics() +quotaManager.shutdown() Review comment: Should we check that `quotaManager` is not null? ## File path: core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala ## @@ -112,4 +120,11 @@ class OffsetsForLeaderEpochTest { //Then assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp)) } + + @After + def tearDown(): Unit = { +replicaManager.shutdown(checkpointHW = false) +quotaManager.shutdown() Review comment: We should check non null in both cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org