[GitHub] [kafka] C0urante commented on a change in pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect
C0urante commented on a change in pull request #11986: URL: https://github.com/apache/kafka/pull/11986#discussion_r841016535 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -366,6 +369,29 @@ public WorkerConfig(ConfigDef definition, Map props) { super(definition, props); logInternalConverterRemovalWarnings(props); logPluginPathConfigProviderWarning(props); +ignoreSubConfigs(); +} + +private void ignoreSubConfigs() { +subConfigPrefixes().forEach(this::ignoreAllWithPrefixes); +Arrays.asList( +KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG, HEADER_CONVERTER_CLASS_CONFIG +).forEach(this::ignore); +} + +protected List subConfigPrefixes() { +return new ArrayList<>(Arrays.asList( +KEY_CONVERTER_CLASS_CONFIG + ".", +VALUE_CONVERTER_CLASS_CONFIG + ".", +HEADER_CONVERTER_CLASS_CONFIG + ".", Review comment: We could potentially replace this with actual instantiation and configuration of the key, value, and header converters specified in the worker config, but that may be wasteful of resources (especially since the `Converter` interface doesn't extend `Closeable` yet) and it's unclear how we'd want to handle failures encountered during that process (aborting worker startup is not an option as converter instantiation may fail due to transient errors). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516226#comment-17516226 ] Chris Egerton commented on KAFKA-7509: -- Alright, I dug a little deeper and it looks like this has been discussed in even greater detail in KAFKA-6793 and [KIP-552|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=142641934]. The progress there stalled after the concept of the {{RecordingMap}} mechanism was brought up. I think that that approach has some merit and have put together a proof of concept here: [https://github.com/apache/kafka/pull/11986] This may still require a KIP (possibly just alterations to the existing KIP-552), but it's borderline. [~mjsax] does it look like this same approach might work with Streams too? > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Chris Egerton >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] C0urante opened a new pull request #11986: KAFKA-7509: Clean up incorrect warnings logged by Connect
C0urante opened a new pull request #11986: URL: https://github.com/apache/kafka/pull/11986 [Jira](https://issues.apache.org/jira/browse/KAFKA-7509) **NOTE: This may require a KIP. Please do not merge until it has been explicitly confirmed that a KIP is not required, or a KIP for these changes has been published and approved.** ### Summary of changes - Skip the calls to `AbstractConfig::logUnused` made by [KafkaConsumer](https://github.com/apache/kafka/blob/62ea4c46a9be7388baeaef1c505d3e5798a9066f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L815), [KafkaProducer](https://github.com/apache/kafka/blob/62ea4c46a9be7388baeaef1c505d3e5798a9066f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L432), and [KafkaAdminClient](https://github.com/apache/kafka/blob/62ea4c46a9be7388baeaef1c505d3e5798a9066f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L595) instances when the original config map is an instance of a [RecordingMap](https://github.com/apache/kafka/blob/62ea4c46a9be7388baeaef1c505d3e5798a9066f/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L608-L612) - Modify `ConsumerConfig::appendDeserializerToConfig` and `ProducerConfig::appendSerializerToConfig` to preserve `RecordingMap` instances passed in to their constructors (or more precisely, create clones of those instances that retain the "recording" behavior of the original) so that all properties used by those consumers/producers are marked as used with the original `RecordingMap` - Use `WorkerConfig::originals` as the baseline when constructing configs to pass to Kafka clients that are used by the worker to manage its internal Kafka topics, so that all properties in the worker config that are used by those Kafka clients are marked as used in the `WorkerConfig` - Ignore all properties in the worker config that are transparently passed through to configurations for other components that: - - Perform their own logging for unused properties (such as producers and consumers used by connector instances, whose properties can be specified in a worker config with the `producer.` and `consumer.` prefixes, respectively) - - Are used transparently by the worker without accessing via either `AbstractConfig::get` (or one of its strongly-typed variants) or by invoking `Map::get` on the result of `AbstractConfig::originals` (or one of its prefixed variants) (such as internal topic settings) - - Are not constructed during worker startup, but instead brought up later (such as the default key, value, and header converters, which are instantiated on a case-by-case basis when bringing up connectors) - Log warnings for all unused (and non-ignored) properties in the `WorkerConfig` after worker startup has taken place - Disable all warnings for unused properties when constructing admin clients used by connectors as those include the top-level worker config, which is guaranteed to contain properties like `key.converter` that are not used by the admin client - Permit all warnings for unused properties when constructing producers and consumers used by connectors as those do not include the top-level worker config and unused properties should not be expected in these cases - Automatically ignore all automatically-injected metrics context properties that are added by the Connect framework when configuring Kafka clients since these are always provided (when Connect brings up Kafka clients) but are not always used I also fixed a bug introduced in https://github.com/apache/kafka/pull/8455 that causes a spurious warning to be logged when the worker config doesn't include a value for the `plugin.path` property. ### Testing I've verified this locally with a variety of cases including typos in the worker config (`gorup.id` instead of `group.id`), typos in connector client properties included in the worker config (`producer.clinet.id` instead of `producer.client.id`), correctly-skipped connector client properties included in the worker config (`consumer.max.poll.records`), connector client interceptor properties included in the worker config (`producer.interceptor.classes`, `some.interceptor.property.that.is.used`, `some.interceptor.property.that.is.not.used`), use of the DLQ topic in a sink connector, and use of automatic topic creation in a source connector. If this approach looks reasonable, I can automate these tests, probably by capturing logging output during an integration test run and asserting that warnings were issued only for the expected set of properties. ### Edge cases Note that the `RecordingMap` class is subtly broken at the moment in that it doesn't take into account calls to `Map::forEach`, `Map::entrySet`, `Map::keySet`, `Map::values`, `Map::getOrDefault`, `Map::compute`, `Map::computeIfPresent`, etc. This comes into play
[GitHub] [kafka] RivenSun2 commented on pull request #11985: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type`
RivenSun2 commented on pull request #11985: URL: https://github.com/apache/kafka/pull/11985#issuecomment-1086549836 Hi @showuon @dajac please help to review the PR . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 opened a new pull request #11985: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type`
RivenSun2 opened a new pull request #11985: URL: https://github.com/apache/kafka/pull/11985 Supplement the description of `Valid Values` in the documentation of `compression.type` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-7509: Assignee: Chris Egerton > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Chris Egerton >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ijuma opened a new pull request #11984: MINOR: Upgrade build and test dependencies
ijuma opened a new pull request #11984: URL: https://github.com/apache/kafka/pull/11984 * gradle: 7.3.3 -> 7.4.2 TBD https://docs.gradle.org/7.4.2/release-notes.html * dependencycheck gradle plugin: 6.5.3 -> 7.0.3 Minor fixes. * spotbugs gradle plugin: 5.0.5 -> 5.0.6 Minor fixes. https://github.com/spotbugs/spotbugs-gradle-plugin/releases/tag/5.0.6 * jmh: 1.34 -> 1.35 Fixes and profiler improvements. https://mail.openjdk.java.net/pipermail/jmh-dev/2022-March/003422.html * jqwik: 1.6.3 -> 1.6.5 TBD https://github.com/jlink/jqwik/releases/tag/1.6.4 https://github.com/jlink/jqwik/releases/tag/1.6.5 * mockito: 4.3.1 -> 4.4.0 TBD https://github.com/mockito/mockito/releases/tag/v4.4.0 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11974: KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor
C0urante commented on pull request #11974: URL: https://github.com/apache/kafka/pull/11974#issuecomment-1086232261 Filed https://github.com/apache/kafka/pull/11983 as a more aggressive follow-up that touches on the `IncrementalCooperativeAssignor` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante opened a new pull request #11983: URL: https://github.com/apache/kafka/pull/11983 [Jira](https://issues.apache.org/jira/browse/KAFKA-13763) Builds on the changes from https://github.com/apache/kafka/pull/11974, which exclusively touched on the `IncrementalCooperativeAssignorTest` test suite. The goals here include: 1. Create an overloaded variant of the `IncrementalCooperativeAssignor::performTaskAssignment` method that is more testing friendly by: 1. Returning the pre-serialization allocation and revocation of connectors and tasks across the cluster in a newly-introduced `ClusterAssignment` class, which eliminates the current pattern of creating a mock `IncrementalCooperativeAssignor` class, spying on one of its private methods, and capturing the argument passed to that spied-upon method 2. Accepting new parameters for the current snapshot of the config topic, the last-completed generation ID, and the current generation ID, which eliminates the need to create and manage a mocked `WorkerCoordinator` instance during testing 3. Not requiring parameters for the leader, config topic offset, or protocol version as these do not affect the logic for allocating connectors and tasks across a cluster 4. Only requires a `Map` for the set of currently-running connectors and tasks across the cluster, instead of a `Map`, which contains unnecessary information like the leader, leader URL, protocol version, and config topic offset 2. Simplify the parameter list for the `IncrementalCooperativeAssignor::handleLostAssignments` method, which in turn simplifies the logic for testing this class 3. Capture repeated Java 8 streams logic in simple, reusable, easily-verifiable utility methods added to the `ConnectUtils` class ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13787) Failed to delete state store directory for it is not empty
[ https://issues.apache.org/jira/browse/KAFKA-13787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516068#comment-17516068 ] Matthias J. Sax commented on KAFKA-13787: - `StateDirectory` should have all relevant code. Not sure why it does not delete the `kafka-streams-process-metadata` file – for the `.lock` file it's ok, as we hold the lock all the time and it will be deleted when the directory is deleted. [~vvcephei] might now more about the file in question? > Failed to delete state store directory for it is not empty > -- > > Key: KAFKA-13787 > URL: https://issues.apache.org/jira/browse/KAFKA-13787 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Nico Pommerening >Priority: Major > Attachments: bug-log.txt > > > On Kafka Streams shutdown the Cleanup of state directories seems not to work, > since the lock and metadata file seem not to be deleted. > Relevant WARN logging: > 2022-03-31 10:34:41,689 WARN [SpringApplicationShutdownHook] > org.apache.kafka.streams.processor.internals.StateDirectory: stream-thread > [SpringApplicationShutdownHook] Failed to delete state store directory of > /kafka-streams-statestore/555b9965-95e3-4c92-b467-1d283428da5d/test-test-run-kpi > for it is not empty > > Left over files in directory: > * .lock > * kafka-streams-process-metadata > > I'm not sure what the consequences of a unclean state cleanup are, but I > would like to get rid of the Warning. > I attached a bigger log extract and I've already patched the StateDirectory > implementation which I'll try to contribute. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cmccabe merged pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
cmccabe merged pull request #11941: URL: https://github.com/apache/kafka/pull/11941 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13782) Producer may fail to add the correct partition to transaction
[ https://issues.apache.org/jira/browse/KAFKA-13782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516001#comment-17516001 ] Tom Bentley commented on KAFKA-13782: - This is currently the last remaining blocker for 3.1.1. [~hachikuji], not meaning to hassle you, but any idea when a fix might be available? > Producer may fail to add the correct partition to transaction > - > > Key: KAFKA-13782 > URL: https://issues.apache.org/jira/browse/KAFKA-13782 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.2.0, 3.1.1 > > > In KAFKA-13412, we changed the logic to add partitions to transactions in the > producer. The intention was to ensure that the partition is added in > `TransactionManager` before the record is appended to the > `RecordAccumulator`. However, this does not take into account the possibility > that the originally selected partition may be changed if `abortForNewBatch` > is set in `RecordAppendResult` in the call to `RecordAccumulator.append`. > When this happens, the partitioner can choose a different partition, which > means that the `TransactionManager` would be tracking the wrong partition. > I think the consequence of this is that the batches sent to this partition > would get stuck in the `RecordAccumulator` until they timed out because we > validate before sending that the partition has been added correctly to the > transaction. > Note that KAFKA-13412 has not been included in any release, so there are no > affected versions. > Thanks to [~alivshits] for identifying the bug. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mumrah commented on a change in pull request #11941: KAFKA-13749: CreateTopics in KRaft must return configs
mumrah commented on a change in pull request #11941: URL: https://github.com/apache/kafka/pull/11941#discussion_r840670498 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -530,21 +543,38 @@ private ApiError createTopic(CreatableTopic topic, " time(s): " + e.getMessage()); } ApiError error = maybeCheckCreateTopicPolicy(() -> { -Map configs = new HashMap<>(); -topic.configs().forEach(config -> configs.put(config.name(), config.value())); return new CreateTopicPolicy.RequestMetadata( -topic.name(), numPartitions, replicationFactor, null, configs); +topic.name(), numPartitions, replicationFactor, null, creationConfigs); }); if (error.isFailure()) return error; } Uuid topicId = Uuid.randomUuid(); -successes.put(topic.name(), new CreatableTopicResult(). +CreatableTopicResult result = new CreatableTopicResult(). setName(topic.name()). setTopicId(topicId). -setErrorCode((short) 0). -setErrorMessage(null). -setNumPartitions(newParts.size()). -setReplicationFactor((short) newParts.get(0).replicas.length)); +setErrorCode(NONE.code()). +setErrorMessage(null); +if (includeConfigs) { +Map effectiveConfig = configurationControl. +computeEffectiveTopicConfigs(creationConfigs); +List configNames = new ArrayList<>(effectiveConfig.keySet()); +configNames.sort(String::compareTo); +for (String configName : configNames) { +ConfigEntry entry = effectiveConfig.get(configName); +result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs(). +setName(entry.name()). +setValue(entry.isSensitive() ? null : entry.value()). +setReadOnly(entry.isReadOnly()). + setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()). +setIsSensitive(entry.isSensitive())); +} +result.setNumPartitions(newParts.size()); +result.setReplicationFactor((short) newParts.get(0).replicas.length); Review comment: Thanks for the explanation. I agree changing this is out of scope for this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 opened a new pull request #11982: MINOR: Adjustments for jacoco
akatona84 opened a new pull request #11982: URL: https://github.com/apache/kafka/pull/11982 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11910: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode
dengziming commented on pull request #11910: URL: https://github.com/apache/kafka/pull/11910#issuecomment-1085768659 Thank you @cmccabe for the reminders, I added a TimelineMap here, I'm not sure should we use a TimelineMap> since the user may already create some topics colliding with each other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #9340: Improving Fetch Session Caching for KAFKA-10558
kamalcph commented on pull request #9340: URL: https://github.com/apache/kafka/pull/9340#issuecomment-1085675534 gentle reminder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ddrid commented on a change in pull request #11979: MINOR: Clean up of TransactionManager and RecordAccumulator
ddrid commented on a change in pull request #11979: URL: https://github.com/apache/kafka/pull/11979#discussion_r840399831 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -790,7 +781,6 @@ private void adjustSequencesDueToFailedBatch(ProducerBatch batch) { throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() + " for partition " + batch.topicPartition + " is going to become negative: " + newSequence); -log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, newSequence); Review comment: Cause `inFlightBatch.resetProducerState` will be called immediately. I think this log is duplicate in `resetProducerState` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ddrid commented on a change in pull request #11979: MINOR: Clean up of TransactionManager and RecordAccumulator
ddrid commented on a change in pull request #11979: URL: https://github.com/apache/kafka/pull/11979#discussion_r840397657 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -113,16 +113,7 @@ private TopicPartitionEntry getPartition(TopicPartition topicPartition) { } private TopicPartitionEntry getOrCreatePartition(TopicPartition topicPartition) { -TopicPartitionEntry ent = topicPartitions.get(topicPartition); -if (ent == null) { -ent = new TopicPartitionEntry(); -topicPartitions.put(topicPartition, ent); -} -return ent; -} - -private void addPartition(TopicPartition topicPartition) { -this.topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); +return topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); Review comment: Thanks for reminding me of that! I've confused it with `computeIfAbsent`. How about using `computeIfAbsent` as I modified? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13783) Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest
[ https://issues.apache.org/jira/browse/KAFKA-13783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515805#comment-17515805 ] Tom Bentley commented on KAFKA-13783: - [~dajac] do you want to update the KIP? > Remove reason prefixing in JoinGroupRequest and LeaveGroupRequest > - > > Key: KAFKA-13783 > URL: https://issues.apache.org/jira/browse/KAFKA-13783 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.2.0 > > > KIP-800 introduced a mechanism to pass a reason in the join group request and > in the leaver group request. A default reason is used unless one is provided > by the user. In this case, the custom reason is prefixed by the default one. > When we tried to used this in Kafka Streams, we noted a significant > degradation of the performances, see > https://github.com/apache/kafka/pull/11873. It is not clear wether the > prefixing is the root cause of the issue or not. To be on the safe side, I > think that we should remove the prefixing. It does not bring much anyway as > we are still able to distinguish a custom reason from the default one on the > broker side. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11979: MINOR: Clean up of TransactionManager and RecordAccumulator
showuon commented on a change in pull request #11979: URL: https://github.com/apache/kafka/pull/11979#discussion_r840353943 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -378,12 +378,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // producer id. We will not attempt to reorder messages if the producer id has changed, we will throw an // IllegalStateException instead. private void insertInSequenceOrder(Deque deque, ProducerBatch batch) { -// When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence. +// When we are re-enqueueing and have enabled idempotence, the re-enqueued batch must always have a sequence. if (batch.baseSequence() == RecordBatch.NO_SEQUENCE) throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even " + "though idempotency is enabled."); -if (transactionManager.nextBatchBySequence(batch.topicPartition) == null) +if (!transactionManager.hasInflightBatches(batch.topicPartition)) Review comment: Nice cleanup. This way we don't have to return the first element in queue. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -790,7 +781,6 @@ private void adjustSequencesDueToFailedBatch(ProducerBatch batch) { throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() + " for partition " + batch.topicPartition + " is going to become negative: " + newSequence); -log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, newSequence); Review comment: why should we remove this log? ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -113,16 +113,7 @@ private TopicPartitionEntry getPartition(TopicPartition topicPartition) { } private TopicPartitionEntry getOrCreatePartition(TopicPartition topicPartition) { -TopicPartitionEntry ent = topicPartitions.get(topicPartition); -if (ent == null) { -ent = new TopicPartitionEntry(); -topicPartitions.put(topicPartition, ent); -} -return ent; -} - -private void addPartition(TopicPartition topicPartition) { -this.topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); +return topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); Review comment: I don't think this refactor is correct. `putIfAbsent` will return **previous** value of the key. That is, if the current partition is null, after `getOrCreatePartition`, it'll return `null`, which is not what we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11978: KAFKA-13786: Optimized documentation for control.plane.listener.name parameter
showuon merged pull request #11978: URL: https://github.com/apache/kafka/pull/11978 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13787) Failed to delete state store directory for it is not empty
[ https://issues.apache.org/jira/browse/KAFKA-13787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515781#comment-17515781 ] Nico Pommerening commented on KAFKA-13787: -- Hi [~mjsax] : I'm experiencing the warnings on MacOs locally and Linux on Jenkins build server. I noticed the issue with deleting stuff under Windows, but I don't think this one relates since when I'm actually deleting the files with the PR, it works as expected. I did wonder, why this issue appears though and tried to find the part of code that was intended to delete these two files, but couldn't identify the clear spot in StateDirectory logics. Maybe you can also point me to the spot that should delete these two files, so I can dig deeper on why it's not working in our setup? > Failed to delete state store directory for it is not empty > -- > > Key: KAFKA-13787 > URL: https://issues.apache.org/jira/browse/KAFKA-13787 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Nico Pommerening >Priority: Major > Attachments: bug-log.txt > > > On Kafka Streams shutdown the Cleanup of state directories seems not to work, > since the lock and metadata file seem not to be deleted. > Relevant WARN logging: > 2022-03-31 10:34:41,689 WARN [SpringApplicationShutdownHook] > org.apache.kafka.streams.processor.internals.StateDirectory: stream-thread > [SpringApplicationShutdownHook] Failed to delete state store directory of > /kafka-streams-statestore/555b9965-95e3-4c92-b467-1d283428da5d/test-test-run-kpi > for it is not empty > > Left over files in directory: > * .lock > * kafka-streams-process-metadata > > I'm not sure what the consequences of a unclean state cleanup are, but I > would like to get rid of the Warning. > I attached a bigger log extract and I've already patched the StateDirectory > implementation which I'll try to contribute. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ddrid commented on pull request #11979: MINOR: Clean up of TransactionManager and RecordAccumulator
ddrid commented on pull request #11979: URL: https://github.com/apache/kafka/pull/11979#issuecomment-1085512862 Hi, @showuon @dajac, could you please take a look at this easy one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming closed pull request #9770: MINOR: Add ByteBufferAccessorTest
dengziming closed pull request #9770: URL: https://github.com/apache/kafka/pull/9770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org